summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferris@navapbc.com>2019-12-25 15:59:58 -0500
committerFerris Tseng <ferris@navapbc.com>2019-12-25 15:59:58 -0500
commitad1936065efea19616852d099eb8bd2f1eba386a (patch)
treecc7c1a01388fc348b05b23f9446e2f107bacb979
parent7923508f7c7cd154290379f95e4966d393e4334d (diff)
use "impl Stream" instead of the async stream response type
-rw-r--r--ipfs-api/src/client.rs331
-rw-r--r--ipfs-api/src/lib.rs10
2 files changed, 196 insertions, 145 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 4a9871a..1072a33 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -10,12 +10,12 @@ use crate::{
read::{JsonLineDecoder, LineDecoder, StreamReader},
request::{self, ApiRequest},
response::{self, Error},
- AsyncStreamResponse, Client, Request, Response,
+ Client, Request, Response,
};
#[cfg(feature = "actix")]
use actix_multipart::client::multipart;
use bytes::Bytes;
-use futures::{future, FutureExt, Stream, TryFutureExt, TryStreamExt};
+use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use http::{
uri::{InvalidUri, Uri},
StatusCode,
@@ -246,76 +246,6 @@ impl IpfsClient {
}
}
- /// Generic method for making a request that expects back a streaming
- /// response.
- ///
- fn request_stream<Req, Res, F>(
- &self,
- req: Req,
- form: Option<multipart::Form<'static>>,
- process: F,
- ) -> AsyncStreamResponse<Res>
- where
- Req: ApiRequest + Serialize,
- F: 'static + Send + Fn(Response) -> AsyncStreamResponse<Res>,
- {
- let request = future::ready(self.build_base_request(req, form));
-
- let response = {
- #[cfg(feature = "hyper")]
- {
- let client = self.client.clone();
-
- request
- .and_then(move |req| client.request(req).err_into())
- .map_ok(move |res| {
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- body::to_bytes(res.into_body())
- .boxed()
- .map(|maybe_body| match maybe_body {
- Ok(body) => Err(Self::process_error_from_body(body)),
- Err(e) => Err(e.into()),
- })
- .into_stream(),
- ),
- }
- })
- .try_flatten_stream()
- }
- #[cfg(feature = "actix")]
- {
- request
- .and_then(|req| req.send().err_into())
- .map_ok(move |mut res| {
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- res.body()
- .map(|maybe_body| match maybe_body {
- Ok(body) => Err(Self::process_error_from_body(body)),
- Err(e) => Err(e.into()),
- })
- .into_stream(),
- ),
- }
- })
- .try_flatten_stream()
- }
- };
-
- Box::new(response)
- }
-
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
@@ -370,41 +300,92 @@ impl IpfsClient {
_ => Err(Self::process_error_from_body(chunk)),
}
}
+}
- /// Generic method for making a request to the Ipfs server, and getting
- /// back a raw stream of bytes.
+impl IpfsClient {
+ /// Generic method for making a request that expects back a streaming
+ /// response.
///
- fn request_stream_bytes<Req>(
+ fn request_stream<Res, F, OutStream>(
&self,
- req: Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Bytes>
+ req: Request,
+ process: F,
+ ) -> impl Stream<Item = Result<Res, Error>>
where
- Req: ApiRequest + Serialize,
+ OutStream: Stream<Item = Result<Res, Error>>,
+ F: 'static + Fn(Response) -> OutStream,
{
#[cfg(feature = "hyper")]
{
- self.request_stream(req, form, |res| Box::new(res.into_body().err_into()))
+ self.client
+ .request(req)
+ .err_into()
+ .map_ok(move |res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => body::to_bytes(res.into_body())
+ .boxed()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream()
+ }
+ #[cfg(feature = "actix")]
+ {
+ req.send()
+ .err_into()
+ .map_ok(move |mut res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => res
+ .body()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream()
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw stream of bytes.
+ ///
+ fn request_stream_bytes(&self, req: Request) -> impl Stream<Item = Result<Bytes, Error>> {
+ #[cfg(feature = "hyper")]
+ {
+ self.request_stream(req, |res| res.into_body().err_into())
}
#[cfg(feature = "actix")]
{
- self.request_stream(req, form, |res| Box::new(res.err_into()))
+ self.request_stream(req, |res| res.err_into())
}
}
/// Generic method to return a streaming response of deserialized json
/// objects delineated by new line separators.
///
- fn request_stream_json<Req, Res>(
- &self,
- req: Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Res>
+ fn request_stream_json<Res>(&self, req: Request) -> impl Stream<Item = Result<Res, Error>>
where
- Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
- self.request_stream(req, form, |res| {
+ self.request_stream(req, |res| {
let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
// Response has the Trailer header set. The StreamError trailer
// is used to indicate that there was an error while streaming
@@ -420,20 +401,39 @@ impl IpfsClient {
// There was an unrecognized trailer value. If that is the case,
// create a stream that immediately errors.
//
- return Box::new(future::err(err).into_stream());
+ return future::err(err).into_stream().left_stream();
}
} else {
false
};
- Box::new(IpfsClient::process_stream_response(
- res,
- JsonLineDecoder::new(parse_stream_error),
- ))
+ IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
+ .right_stream()
})
}
}
+// Implements a call to the IPFS that returns a streaming body response.
+// Implementing this in a macro is necessary because the Rust compiler
+// can't reason about the lifetime of the request instance properly. It
+// thinks that the request needs to live as long as the returned stream,
+// but in reality, the request instance is only used to build the Hyper
+// or Actix request.
+//
+macro_rules! impl_stream_api_response {
+ (($self:ident, $req:expr, $form:expr) => $call:ident) => {
+ impl_stream_api_response! {
+ ($self, $req, $form) |req| => { $self.$call(req) }
+ }
+ };
+ (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => {
+ match $self.build_base_request($req, $form) {
+ Ok($var) => $impl.right_stream(),
+ Err(e) => return future::err(e).into_stream().left_stream(),
+ }
+ };
+}
+
impl IpfsClient {
/// Add file to Ipfs.
///
@@ -524,9 +524,9 @@ impl IpfsClient {
}
}
- self.request_stream_json(request::Add, Some(form))
- .try_collect()
- .await
+ let req = self.build_base_request(request::Add, Some(form))?;
+
+ self.request_stream_json(req).try_collect().await
}
/// Returns the current ledger for a peer.
@@ -639,7 +639,9 @@ impl IpfsClient {
///
#[inline]
pub fn block_get(&self, hash: &str) -> impl Stream<Item = Result<Bytes, Error>> {
- self.request_stream_bytes(request::BlockGet { hash }, None)
+ impl_stream_api_response! {
+ (self, request::BlockGet { hash }, None) => request_stream_bytes
+ }
}
/// Store input as an IPFS block.
@@ -766,8 +768,10 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn cat(&self, path: &str) -> AsyncStreamResponse<Bytes> {
- self.request_stream_bytes(request::Cat { path }, None)
+ pub fn cat(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::Cat { path }, None) => request_stream_bytes
+ }
}
/// List available commands that the server accepts.
@@ -882,8 +886,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse<response::DhtFindPeerResponse> {
- self.request_stream_json(request::DhtFindPeer { peer }, None)
+ pub fn dht_findpeer(
+ &self,
+ peer: &str,
+ ) -> impl Stream<Item = Result<response::DhtFindPeerResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtFindPeer { peer }, None) => request_stream_json
+ }
}
/// Find peers in the DHT that can provide a specific value given a key.
@@ -898,8 +907,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse<response::DhtFindProvsResponse> {
- self.request_stream_json(request::DhtFindProvs { key }, None)
+ pub fn dht_findprovs(
+ &self,
+ key: &str,
+ ) -> impl Stream<Item = Result<response::DhtFindProvsResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtFindProvs { key }, None) => request_stream_json
+ }
}
/// Query the DHT for a given key.
@@ -914,8 +928,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn dht_get(&self, key: &str) -> AsyncStreamResponse<response::DhtGetResponse> {
- self.request_stream_json(request::DhtGet { key }, None)
+ pub fn dht_get(
+ &self,
+ key: &str,
+ ) -> impl Stream<Item = Result<response::DhtGetResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtGet { key }, None) => request_stream_json
+ }
}
/// Announce to the network that you are providing a given value.
@@ -930,8 +949,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> {
- self.request_stream_json(request::DhtProvide { key }, None)
+ pub fn dht_provide(
+ &self,
+ key: &str,
+ ) -> impl Stream<Item = Result<response::DhtProvideResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtProvide { key }, None) => request_stream_json
+ }
}
/// Write a key/value pair to the DHT.
@@ -945,8 +969,14 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse<response::DhtPutResponse> {
- self.request_stream_json(request::DhtPut { key, value }, None)
+ pub fn dht_put(
+ &self,
+ key: &str,
+ value: &str,
+ ) -> impl Stream<Item = Result<response::DhtPutResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtPut { key, value }, None) => request_stream_json
+ }
}
/// Find the closest peer given the peer ID by querying the DHT.
@@ -961,8 +991,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse<response::DhtQueryResponse> {
- self.request_stream_json(request::DhtQuery { peer }, None)
+ pub fn dht_query(
+ &self,
+ peer: &str,
+ ) -> impl Stream<Item = Result<response::DhtQueryResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtQuery { peer }, None) => request_stream_json
+ }
}
/// Clear inactive requests from the log.
@@ -1144,8 +1179,10 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn files_read(&self, path: &str) -> AsyncStreamResponse<Bytes> {
- self.request_stream_bytes(request::FilesRead { path }, None)
+ pub fn files_read(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::FilesRead { path }, None) => request_stream_bytes
+ }
}
/// Remove a file in MFS.
@@ -1229,8 +1266,12 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn filestore_dups(&self) -> AsyncStreamResponse<response::FilestoreDupsResponse> {
- self.request_stream_json(request::FilestoreDups, None)
+ pub fn filestore_dups(
+ &self,
+ ) -> impl Stream<Item = Result<response::FilestoreDupsResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::FilestoreDups, None) => request_stream_json
+ }
}
/// List objects in filestore.
@@ -1248,8 +1289,10 @@ impl IpfsClient {
pub fn filestore_ls(
&self,
cid: Option<&str>,
- ) -> AsyncStreamResponse<response::FilestoreLsResponse> {
- self.request_stream_json(request::FilestoreLs { cid }, None)
+ ) -> impl Stream<Item = Result<response::FilestoreLsResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::FilestoreLs { cid }, None) => request_stream_json
+ }
}
/// Verify objects in filestore.
@@ -1265,8 +1308,10 @@ impl IpfsClient {
pub fn filestore_verify(
&self,
cid: Option<&str>,
- ) -> AsyncStreamResponse<response::FilestoreVerifyResponse> {
- self.request_stream_json(request::FilestoreVerify { cid }, None)
+ ) -> impl Stream<Item = Result<response::FilestoreVerifyResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::FilestoreVerify{ cid }, None) => request_stream_json
+ }
}
/// Download Ipfs object.
@@ -1279,8 +1324,10 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn get(&self, path: &str) -> AsyncStreamResponse<Bytes> {
- self.request_stream_bytes(request::Get { path }, None)
+ pub fn get(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::Get { path }, None) => request_stream_bytes
+ }
}
/// Returns information about a peer.
@@ -1415,10 +1462,14 @@ impl IpfsClient {
/// let res = client.log_tail();
/// ```
///
- pub fn log_tail(&self) -> AsyncStreamResponse<String> {
- self.request_stream(request::LogTail, None, |res| {
- Box::new(IpfsClient::process_stream_response(res, LineDecoder))
- })
+ pub fn log_tail(&self) -> impl Stream<Item = Result<String, Error>> {
+ impl_stream_api_response! {
+ (self, request::LogTail, None) |req| => {
+ self.request_stream(req, |res| {
+ IpfsClient::process_stream_response(res, LineDecoder)
+ })
+ }
+ }
}
/// List the contents of an Ipfs multihash.
@@ -1514,8 +1565,10 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn object_data(&self, key: &str) -> AsyncStreamResponse<Bytes> {
- self.request_stream_bytes(request::ObjectData { key }, None)
+ pub fn object_data(&self, key: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::ObjectData { key }, None) => request_stream_bytes
+ }
}
/// Returns the diff of two Ipfs objects.
@@ -1722,8 +1775,10 @@ impl IpfsClient {
&self,
peer: &str,
count: Option<i32>,
- ) -> AsyncStreamResponse<response::PingResponse> {
- self.request_stream_json(request::Ping { peer, count }, None)
+ ) -> impl Stream<Item = Result<response::PingResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::Ping { peer, count }, None) => request_stream_json
+ }
}
/// List subscribed pubsub topics.
@@ -1792,8 +1847,10 @@ impl IpfsClient {
&self,
topic: &str,
discover: bool,
- ) -> AsyncStreamResponse<response::PubsubSubResponse> {
- self.request_stream_json(request::PubsubSub { topic, discover }, None)
+ ) -> impl Stream<Item = Result<response::PubsubSubResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::PubsubSub { topic, discover }, None) => request_stream_json
+ }
}
/// Gets a list of local references.
@@ -1806,8 +1863,10 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn refs_local(&self) -> AsyncStreamResponse<response::RefsLocalResponse> {
- self.request_stream_json(request::RefsLocal, None)
+ pub fn refs_local(&self) -> impl Stream<Item = Result<response::RefsLocalResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::RefsLocal, None) => request_stream_json
+ }
}
// TODO /repo/fsck
@@ -1951,8 +2010,10 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<Bytes> {
- self.request_stream_bytes(request::TarCat { path }, None)
+ pub fn tar_cat(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::TarCat { path }, None) => request_stream_bytes
+ }
}
/// Returns information about the Ipfs server version.
diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs
index bd1f8d0..44e06b5 100644
--- a/ipfs-api/src/lib.rs
+++ b/ipfs-api/src/lib.rs
@@ -165,20 +165,10 @@ pub mod response;
#[cfg(feature = "actix")]
use actix_http::{encoding, Payload, PayloadStream};
-use futures::Stream;
#[cfg(feature = "hyper")]
use hyper::{self, client::HttpConnector};
#[cfg(feature = "hyper")]
use hyper_tls::HttpsConnector;
-use response::Error;
-
-/// A future that returns a stream of responses.
-///
-#[cfg(feature = "actix")]
-pub(crate) type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + Unpin + 'static>;
-#[cfg(feature = "hyper")]
-pub(crate) type AsyncStreamResponse<T> =
- Box<dyn Stream<Item = Result<T, Error>> + Unpin + Send + 'static>;
#[cfg(feature = "actix")]
pub(crate) type Request = awc::ClientRequest;