From ad1936065efea19616852d099eb8bd2f1eba386a Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Wed, 25 Dec 2019 15:59:58 -0500 Subject: use "impl Stream" instead of the async stream response type --- ipfs-api/src/client.rs | 331 +++++++++++++++++++++++++++++-------------------- ipfs-api/src/lib.rs | 10 -- 2 files changed, 196 insertions(+), 145 deletions(-) diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 4a9871a..1072a33 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -10,12 +10,12 @@ use crate::{ read::{JsonLineDecoder, LineDecoder, StreamReader}, request::{self, ApiRequest}, response::{self, Error}, - AsyncStreamResponse, Client, Request, Response, + Client, Request, Response, }; #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; -use futures::{future, FutureExt, Stream, TryFutureExt, TryStreamExt}; +use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use http::{ uri::{InvalidUri, Uri}, StatusCode, @@ -246,76 +246,6 @@ impl IpfsClient { } } - /// Generic method for making a request that expects back a streaming - /// response. - /// - fn request_stream( - &self, - req: Req, - form: Option>, - process: F, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - F: 'static + Send + Fn(Response) -> AsyncStreamResponse, - { - let request = future::ready(self.build_base_request(req, form)); - - let response = { - #[cfg(feature = "hyper")] - { - let client = self.client.clone(); - - request - .and_then(move |req| client.request(req).err_into()) - .map_ok(move |res| { - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - body::to_bytes(res.into_body()) - .boxed() - .map(|maybe_body| match maybe_body { - Ok(body) => Err(Self::process_error_from_body(body)), - Err(e) => Err(e.into()), - }) - .into_stream(), - ), - } - }) - .try_flatten_stream() - } - #[cfg(feature = "actix")] - { - request - .and_then(|req| req.send().err_into()) - .map_ok(move |mut res| { - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.body() - .map(|maybe_body| match maybe_body { - Ok(body) => Err(Self::process_error_from_body(body)), - Err(e) => Err(e.into()), - }) - .into_stream(), - ), - } - }) - .try_flatten_stream() - } - }; - - Box::new(response) - } - /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// @@ -370,41 +300,92 @@ impl IpfsClient { _ => Err(Self::process_error_from_body(chunk)), } } +} - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw stream of bytes. +impl IpfsClient { + /// Generic method for making a request that expects back a streaming + /// response. /// - fn request_stream_bytes( + fn request_stream( &self, - req: Req, - form: Option>, - ) -> AsyncStreamResponse + req: Request, + process: F, + ) -> impl Stream> where - Req: ApiRequest + Serialize, + OutStream: Stream>, + F: 'static + Fn(Response) -> OutStream, { #[cfg(feature = "hyper")] { - self.request_stream(req, form, |res| Box::new(res.into_body().err_into())) + self.client + .request(req) + .err_into() + .map_ok(move |res| { + match res.status() { + StatusCode::OK => process(res).right_stream(), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => body::to_bytes(res.into_body()) + .boxed() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream() + .left_stream(), + } + }) + .try_flatten_stream() + } + #[cfg(feature = "actix")] + { + req.send() + .err_into() + .map_ok(move |mut res| { + match res.status() { + StatusCode::OK => process(res).right_stream(), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => res + .body() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream() + .left_stream(), + } + }) + .try_flatten_stream() + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// + fn request_stream_bytes(&self, req: Request) -> impl Stream> { + #[cfg(feature = "hyper")] + { + self.request_stream(req, |res| res.into_body().err_into()) } #[cfg(feature = "actix")] { - self.request_stream(req, form, |res| Box::new(res.err_into())) + self.request_stream(req, |res| res.err_into()) } } /// Generic method to return a streaming response of deserialized json /// objects delineated by new line separators. /// - fn request_stream_json( - &self, - req: Req, - form: Option>, - ) -> AsyncStreamResponse + fn request_stream_json(&self, req: Request) -> impl Stream> where - Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de> + Send, { - self.request_stream(req, form, |res| { + self.request_stream(req, |res| { let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { // Response has the Trailer header set. The StreamError trailer // is used to indicate that there was an error while streaming @@ -420,20 +401,39 @@ impl IpfsClient { // There was an unrecognized trailer value. If that is the case, // create a stream that immediately errors. // - return Box::new(future::err(err).into_stream()); + return future::err(err).into_stream().left_stream(); } } else { false }; - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) + IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) + .right_stream() }) } } +// Implements a call to the IPFS that returns a streaming body response. +// Implementing this in a macro is necessary because the Rust compiler +// can't reason about the lifetime of the request instance properly. It +// thinks that the request needs to live as long as the returned stream, +// but in reality, the request instance is only used to build the Hyper +// or Actix request. +// +macro_rules! impl_stream_api_response { + (($self:ident, $req:expr, $form:expr) => $call:ident) => { + impl_stream_api_response! { + ($self, $req, $form) |req| => { $self.$call(req) } + } + }; + (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => { + match $self.build_base_request($req, $form) { + Ok($var) => $impl.right_stream(), + Err(e) => return future::err(e).into_stream().left_stream(), + } + }; +} + impl IpfsClient { /// Add file to Ipfs. /// @@ -524,9 +524,9 @@ impl IpfsClient { } } - self.request_stream_json(request::Add, Some(form)) - .try_collect() - .await + let req = self.build_base_request(request::Add, Some(form))?; + + self.request_stream_json(req).try_collect().await } /// Returns the current ledger for a peer. @@ -639,7 +639,9 @@ impl IpfsClient { /// #[inline] pub fn block_get(&self, hash: &str) -> impl Stream> { - self.request_stream_bytes(request::BlockGet { hash }, None) + impl_stream_api_response! { + (self, request::BlockGet { hash }, None) => request_stream_bytes + } } /// Store input as an IPFS block. @@ -766,8 +768,10 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn cat(&self, path: &str) -> AsyncStreamResponse { - self.request_stream_bytes(request::Cat { path }, None) + pub fn cat(&self, path: &str) -> impl Stream> { + impl_stream_api_response! { + (self, request::Cat { path }, None) => request_stream_bytes + } } /// List available commands that the server accepts. @@ -882,8 +886,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse { - self.request_stream_json(request::DhtFindPeer { peer }, None) + pub fn dht_findpeer( + &self, + peer: &str, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::DhtFindPeer { peer }, None) => request_stream_json + } } /// Find peers in the DHT that can provide a specific value given a key. @@ -898,8 +907,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse { - self.request_stream_json(request::DhtFindProvs { key }, None) + pub fn dht_findprovs( + &self, + key: &str, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::DhtFindProvs { key }, None) => request_stream_json + } } /// Query the DHT for a given key. @@ -914,8 +928,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn dht_get(&self, key: &str) -> AsyncStreamResponse { - self.request_stream_json(request::DhtGet { key }, None) + pub fn dht_get( + &self, + key: &str, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::DhtGet { key }, None) => request_stream_json + } } /// Announce to the network that you are providing a given value. @@ -930,8 +949,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse { - self.request_stream_json(request::DhtProvide { key }, None) + pub fn dht_provide( + &self, + key: &str, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::DhtProvide { key }, None) => request_stream_json + } } /// Write a key/value pair to the DHT. @@ -945,8 +969,14 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse { - self.request_stream_json(request::DhtPut { key, value }, None) + pub fn dht_put( + &self, + key: &str, + value: &str, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::DhtPut { key, value }, None) => request_stream_json + } } /// Find the closest peer given the peer ID by querying the DHT. @@ -961,8 +991,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse { - self.request_stream_json(request::DhtQuery { peer }, None) + pub fn dht_query( + &self, + peer: &str, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::DhtQuery { peer }, None) => request_stream_json + } } /// Clear inactive requests from the log. @@ -1144,8 +1179,10 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn files_read(&self, path: &str) -> AsyncStreamResponse { - self.request_stream_bytes(request::FilesRead { path }, None) + pub fn files_read(&self, path: &str) -> impl Stream> { + impl_stream_api_response! { + (self, request::FilesRead { path }, None) => request_stream_bytes + } } /// Remove a file in MFS. @@ -1229,8 +1266,12 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn filestore_dups(&self) -> AsyncStreamResponse { - self.request_stream_json(request::FilestoreDups, None) + pub fn filestore_dups( + &self, + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::FilestoreDups, None) => request_stream_json + } } /// List objects in filestore. @@ -1248,8 +1289,10 @@ impl IpfsClient { pub fn filestore_ls( &self, cid: Option<&str>, - ) -> AsyncStreamResponse { - self.request_stream_json(request::FilestoreLs { cid }, None) + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::FilestoreLs { cid }, None) => request_stream_json + } } /// Verify objects in filestore. @@ -1265,8 +1308,10 @@ impl IpfsClient { pub fn filestore_verify( &self, cid: Option<&str>, - ) -> AsyncStreamResponse { - self.request_stream_json(request::FilestoreVerify { cid }, None) + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::FilestoreVerify{ cid }, None) => request_stream_json + } } /// Download Ipfs object. @@ -1279,8 +1324,10 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn get(&self, path: &str) -> AsyncStreamResponse { - self.request_stream_bytes(request::Get { path }, None) + pub fn get(&self, path: &str) -> impl Stream> { + impl_stream_api_response! { + (self, request::Get { path }, None) => request_stream_bytes + } } /// Returns information about a peer. @@ -1415,10 +1462,14 @@ impl IpfsClient { /// let res = client.log_tail(); /// ``` /// - pub fn log_tail(&self) -> AsyncStreamResponse { - self.request_stream(request::LogTail, None, |res| { - Box::new(IpfsClient::process_stream_response(res, LineDecoder)) - }) + pub fn log_tail(&self) -> impl Stream> { + impl_stream_api_response! { + (self, request::LogTail, None) |req| => { + self.request_stream(req, |res| { + IpfsClient::process_stream_response(res, LineDecoder) + }) + } + } } /// List the contents of an Ipfs multihash. @@ -1514,8 +1565,10 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn object_data(&self, key: &str) -> AsyncStreamResponse { - self.request_stream_bytes(request::ObjectData { key }, None) + pub fn object_data(&self, key: &str) -> impl Stream> { + impl_stream_api_response! { + (self, request::ObjectData { key }, None) => request_stream_bytes + } } /// Returns the diff of two Ipfs objects. @@ -1722,8 +1775,10 @@ impl IpfsClient { &self, peer: &str, count: Option, - ) -> AsyncStreamResponse { - self.request_stream_json(request::Ping { peer, count }, None) + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::Ping { peer, count }, None) => request_stream_json + } } /// List subscribed pubsub topics. @@ -1792,8 +1847,10 @@ impl IpfsClient { &self, topic: &str, discover: bool, - ) -> AsyncStreamResponse { - self.request_stream_json(request::PubsubSub { topic, discover }, None) + ) -> impl Stream> { + impl_stream_api_response! { + (self, request::PubsubSub { topic, discover }, None) => request_stream_json + } } /// Gets a list of local references. @@ -1806,8 +1863,10 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn refs_local(&self) -> AsyncStreamResponse { - self.request_stream_json(request::RefsLocal, None) + pub fn refs_local(&self) -> impl Stream> { + impl_stream_api_response! { + (self, request::RefsLocal, None) => request_stream_json + } } // TODO /repo/fsck @@ -1951,8 +2010,10 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse { - self.request_stream_bytes(request::TarCat { path }, None) + pub fn tar_cat(&self, path: &str) -> impl Stream> { + impl_stream_api_response! { + (self, request::TarCat { path }, None) => request_stream_bytes + } } /// Returns information about the Ipfs server version. diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs index bd1f8d0..44e06b5 100644 --- a/ipfs-api/src/lib.rs +++ b/ipfs-api/src/lib.rs @@ -165,20 +165,10 @@ pub mod response; #[cfg(feature = "actix")] use actix_http::{encoding, Payload, PayloadStream}; -use futures::Stream; #[cfg(feature = "hyper")] use hyper::{self, client::HttpConnector}; #[cfg(feature = "hyper")] use hyper_tls::HttpsConnector; -use response::Error; - -/// A future that returns a stream of responses. -/// -#[cfg(feature = "actix")] -pub(crate) type AsyncStreamResponse = Box> + Unpin + 'static>; -#[cfg(feature = "hyper")] -pub(crate) type AsyncStreamResponse = - Box> + Unpin + Send + 'static>; #[cfg(feature = "actix")] pub(crate) type Request = awc::ClientRequest; -- cgit v1.2.3