summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferris@navapbc.com>2019-12-25 14:12:08 -0500
committerFerris Tseng <ferris@navapbc.com>2019-12-25 14:12:08 -0500
commit7923508f7c7cd154290379f95e4966d393e4334d (patch)
tree32af9e717482c05226fbdf2e3d2e1e17f105c482
parent9007b1765cf821e8b813cde38989bf7164fa84ee (diff)
refactoring for more async
-rw-r--r--ipfs-api/examples/pubsub.rs4
-rw-r--r--ipfs-api/src/client.rs76
2 files changed, 37 insertions, 43 deletions
diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs
index 2d24278..465b86b 100644
--- a/ipfs-api/examples/pubsub.rs
+++ b/ipfs-api/examples/pubsub.rs
@@ -41,7 +41,9 @@ async fn main() {
eprintln!();
eprintln!("publishing message...");
- publish_client.pubsub_pub(TOPIC, "Hello World!").boxed_local()
+ publish_client
+ .pubsub_pub(TOPIC, "Hello World!")
+ .boxed_local()
})
.boxed_local()
.fuse();
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index fda5e95..4a9871a 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -15,7 +15,7 @@ use crate::{
#[cfg(feature = "actix")]
use actix_multipart::client::multipart;
use bytes::Bytes;
-use futures::{future, Future, FutureExt, Stream, TryFutureExt, TryStreamExt};
+use futures::{future, FutureExt, Stream, TryFutureExt, TryStreamExt};
use http::{
uri::{InvalidUri, Uri},
StatusCode,
@@ -218,39 +218,31 @@ impl IpfsClient {
/// Generates a request, and returns the unprocessed response future.
///
- fn request_raw<Req>(
+ async fn request_raw<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<(StatusCode, Bytes), Error>>
+ ) -> Result<(StatusCode, Bytes), Error>
where
Req: ApiRequest + Serialize,
{
- let request = future::ready(self.build_base_request(req, form));
+ let req = self.build_base_request(req, form)?;
#[cfg(feature = "hyper")]
{
- let client = self.client.clone();
-
- request
- .and_then(move |req| client.request(req).err_into())
- .and_then(|res| {
- let status = res.status();
+ let res = self.client.request(req).await?;
+ let status = res.status();
+ let body = body::to_bytes(res.into_body()).await?;
- body::to_bytes(res.into_body())
- .map_ok(move |body| (status, body))
- .err_into()
- })
+ Ok((status, body))
}
#[cfg(feature = "actix")]
{
- request
- .and_then(|req| req.send().err_into())
- .and_then(|mut res| {
- let status = res.status();
+ let mut res = req.send().await?;
+ let status = res.status();
+ let body = res.body().await?;
- res.body().map_ok(move |body| (status, body)).err_into()
- })
+ Ok((status, body))
}
}
@@ -327,56 +319,56 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
- fn request<Req, Res>(
+ async fn request<Req, Res>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<Res, Error>>
+ ) -> Result<Res, Error>
where
Req: ApiRequest + Serialize,
- for<'de> Res: 'static + Deserialize<'de> + Send,
+ for<'de> Res: 'static + Deserialize<'de>,
{
- self.request_raw(req, form).map(|res| {
- res.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk))
- })
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ IpfsClient::process_json_response(status, chunk)
}
/// Generic method for making a request to the Ipfs server, and getting
/// back a response with no body.
///
- fn request_empty<Req>(
+ async fn request_empty<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<(), Error>>
+ ) -> Result<(), Error>
where
Req: ApiRequest + Serialize,
{
- self.request_raw(req, form).map(|res| {
- res.and_then(|(status, chunk)| match status {
- StatusCode::OK => Ok(()),
- _ => Err(Self::process_error_from_body(chunk)),
- })
- })
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
}
/// Generic method for making a request to the Ipfs server, and getting
/// back a raw String response.
///
- fn request_string<Req>(
+ async fn request_string<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<String, Error>>
+ ) -> Result<String, Error>
where
Req: ApiRequest + Serialize,
{
- self.request_raw(req, form).map(|res| {
- res.and_then(|(status, chunk)| match status {
- StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
- _ => Err(Self::process_error_from_body(chunk)),
- })
- })
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ match status {
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
}
/// Generic method for making a request to the Ipfs server, and getting