From 7923508f7c7cd154290379f95e4966d393e4334d Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Wed, 25 Dec 2019 14:12:08 -0500 Subject: refactoring for more async --- ipfs-api/examples/pubsub.rs | 4 ++- ipfs-api/src/client.rs | 76 ++++++++++++++++++++------------------------- 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs index 2d24278..465b86b 100644 --- a/ipfs-api/examples/pubsub.rs +++ b/ipfs-api/examples/pubsub.rs @@ -41,7 +41,9 @@ async fn main() { eprintln!(); eprintln!("publishing message..."); - publish_client.pubsub_pub(TOPIC, "Hello World!").boxed_local() + publish_client + .pubsub_pub(TOPIC, "Hello World!") + .boxed_local() }) .boxed_local() .fuse(); diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index fda5e95..4a9871a 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -15,7 +15,7 @@ use crate::{ #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; -use futures::{future, Future, FutureExt, Stream, TryFutureExt, TryStreamExt}; +use futures::{future, FutureExt, Stream, TryFutureExt, TryStreamExt}; use http::{ uri::{InvalidUri, Uri}, StatusCode, @@ -218,39 +218,31 @@ impl IpfsClient { /// Generates a request, and returns the unprocessed response future. /// - fn request_raw( + async fn request_raw( &self, req: Req, form: Option>, - ) -> impl Future> + ) -> Result<(StatusCode, Bytes), Error> where Req: ApiRequest + Serialize, { - let request = future::ready(self.build_base_request(req, form)); + let req = self.build_base_request(req, form)?; #[cfg(feature = "hyper")] { - let client = self.client.clone(); - - request - .and_then(move |req| client.request(req).err_into()) - .and_then(|res| { - let status = res.status(); + let res = self.client.request(req).await?; + let status = res.status(); + let body = body::to_bytes(res.into_body()).await?; - body::to_bytes(res.into_body()) - .map_ok(move |body| (status, body)) - .err_into() - }) + Ok((status, body)) } #[cfg(feature = "actix")] { - request - .and_then(|req| req.send().err_into()) - .and_then(|mut res| { - let status = res.status(); + let mut res = req.send().await?; + let status = res.status(); + let body = res.body().await?; - res.body().map_ok(move |body| (status, body)).err_into() - }) + Ok((status, body)) } } @@ -327,56 +319,56 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request( + async fn request( &self, req: Req, form: Option>, - ) -> impl Future> + ) -> Result where Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de> + Send, + for<'de> Res: 'static + Deserialize<'de>, { - self.request_raw(req, form).map(|res| { - res.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)) - }) + let (status, chunk) = self.request_raw(req, form).await?; + + IpfsClient::process_json_response(status, chunk) } /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// - fn request_empty( + async fn request_empty( &self, req: Req, form: Option>, - ) -> impl Future> + ) -> Result<(), Error> where Req: ApiRequest + Serialize, { - self.request_raw(req, form).map(|res| { - res.and_then(|(status, chunk)| match status { - StatusCode::OK => Ok(()), - _ => Err(Self::process_error_from_body(chunk)), - }) - }) + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => Ok(()), + _ => Err(Self::process_error_from_body(chunk)), + } } /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// - fn request_string( + async fn request_string( &self, req: Req, form: Option>, - ) -> impl Future> + ) -> Result where Req: ApiRequest + Serialize, { - self.request_raw(req, form).map(|res| { - res.and_then(|(status, chunk)| match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::process_error_from_body(chunk)), - }) - }) + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::process_error_from_body(chunk)), + } } /// Generic method for making a request to the Ipfs server, and getting -- cgit v1.2.3