diff options
-rw-r--r-- | ipfs-api/examples/pubsub.rs | 4 | ||||
-rw-r--r-- | ipfs-api/src/client.rs | 76 |
2 files changed, 37 insertions, 43 deletions
diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs index 2d24278..465b86b 100644 --- a/ipfs-api/examples/pubsub.rs +++ b/ipfs-api/examples/pubsub.rs @@ -41,7 +41,9 @@ async fn main() { eprintln!(); eprintln!("publishing message..."); - publish_client.pubsub_pub(TOPIC, "Hello World!").boxed_local() + publish_client + .pubsub_pub(TOPIC, "Hello World!") + .boxed_local() }) .boxed_local() .fuse(); diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index fda5e95..4a9871a 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -15,7 +15,7 @@ use crate::{ #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; -use futures::{future, Future, FutureExt, Stream, TryFutureExt, TryStreamExt}; +use futures::{future, FutureExt, Stream, TryFutureExt, TryStreamExt}; use http::{ uri::{InvalidUri, Uri}, StatusCode, @@ -218,39 +218,31 @@ impl IpfsClient { /// Generates a request, and returns the unprocessed response future. /// - fn request_raw<Req>( + async fn request_raw<Req>( &self, req: Req, form: Option<multipart::Form<'static>>, - ) -> impl Future<Output = Result<(StatusCode, Bytes), Error>> + ) -> Result<(StatusCode, Bytes), Error> where Req: ApiRequest + Serialize, { - let request = future::ready(self.build_base_request(req, form)); + let req = self.build_base_request(req, form)?; #[cfg(feature = "hyper")] { - let client = self.client.clone(); - - request - .and_then(move |req| client.request(req).err_into()) - .and_then(|res| { - let status = res.status(); + let res = self.client.request(req).await?; + let status = res.status(); + let body = body::to_bytes(res.into_body()).await?; - body::to_bytes(res.into_body()) - .map_ok(move |body| (status, body)) - .err_into() - }) + Ok((status, body)) } #[cfg(feature = "actix")] { - request - .and_then(|req| req.send().err_into()) - .and_then(|mut res| { - let status = res.status(); + let mut res = req.send().await?; + let status = res.status(); + let body = res.body().await?; - res.body().map_ok(move |body| (status, body)).err_into() - }) + Ok((status, body)) } } @@ -327,56 +319,56 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request<Req, Res>( + async fn request<Req, Res>( &self, req: Req, form: Option<multipart::Form<'static>>, - ) -> impl Future<Output = Result<Res, Error>> + ) -> Result<Res, Error> where Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de> + Send, + for<'de> Res: 'static + Deserialize<'de>, { - self.request_raw(req, form).map(|res| { - res.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)) - }) + let (status, chunk) = self.request_raw(req, form).await?; + + IpfsClient::process_json_response(status, chunk) } /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// - fn request_empty<Req>( + async fn request_empty<Req>( &self, req: Req, form: Option<multipart::Form<'static>>, - ) -> impl Future<Output = Result<(), Error>> + ) -> Result<(), Error> where Req: ApiRequest + Serialize, { - self.request_raw(req, form).map(|res| { - res.and_then(|(status, chunk)| match status { - StatusCode::OK => Ok(()), - _ => Err(Self::process_error_from_body(chunk)), - }) - }) + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => Ok(()), + _ => Err(Self::process_error_from_body(chunk)), + } } /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// - fn request_string<Req>( + async fn request_string<Req>( &self, req: Req, form: Option<multipart::Form<'static>>, - ) -> impl Future<Output = Result<String, Error>> + ) -> Result<String, Error> where Req: ApiRequest + Serialize, { - self.request_raw(req, form).map(|res| { - res.and_then(|(status, chunk)| match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::process_error_from_body(chunk)), - }) - }) + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::process_error_from_body(chunk)), + } } /// Generic method for making a request to the Ipfs server, and getting |