From 3065848300cc6fd19693ab86344d99c06a55a6fa Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sun, 22 Dec 2019 23:15:27 -0500 Subject: get all the core methods working! --- ipfs-api/src/client.rs | 191 +++++++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 95 deletions(-) diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 0e4c0c2..7ddc205 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -17,7 +17,7 @@ use actix_http::{encoding, Payload, PayloadStream}; use actix_multipart::client::multipart; use bytes::Bytes; use futures::{ - future, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt, + future, Future, FutureExt, Stream, TryFutureExt, TryStreamExt, }; use http::{ uri::{InvalidUri, Uri}, @@ -121,6 +121,7 @@ impl IpfsClient { } /// Creates a new `IpfsClient` for any given URI. + /// #[inline] pub fn new_from_uri(uri: &str) -> Result { let base_path = IpfsClient::build_base_path(uri)?; @@ -287,62 +288,66 @@ impl IpfsClient { req: Req, form: Option>, process: F, - ) -> impl Stream> + ) -> AsyncStreamResponse where Req: ApiRequest + Serialize, - F: 'static + Fn(Response) -> AsyncStreamResponse, + F: 'static + Send + Fn(Response) -> AsyncStreamResponse, { let request = future::ready(self.build_base_request(req, form)); - #[cfg(feature = "hyper")] - { - let client = self.client.clone(); + let response = { + #[cfg(feature = "hyper")] + { + let client = self.client.clone(); + + request + .and_then(move |req| client.request(req).err_into()) + .map_ok(move |res| { + match res.status() { + StatusCode::OK => process(res), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + body::to_bytes(res.into_body()) + .boxed() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream(), + ), + } + }) + .try_flatten_stream() + } + #[cfg(feature = "actix")] + { + request + .and_then(|req| req.send().err_into()) + .map_ok(move |mut res| { + match res.status() { + StatusCode::OK => process(res), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.body() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream(), + ), + } + }) + .try_flatten_stream() + } + }; - request - .and_then(move |req| client.request(req).err_into()) - .map_ok(move |res| { - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - body::to_bytes(res.into_body()) - .boxed() - .map(|maybe_body| match maybe_body { - Ok(body) => Err(Self::process_error_from_body(body)), - Err(e) => Err(e.into()), - }) - .into_stream(), - ), - } - }) - .try_flatten_stream() - } - #[cfg(feature = "actix")] - { - request - .and_then(|req| req.send().err_into()) - .map_ok(move |mut res| { - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.body() - .map(|maybe_body| match maybe_body { - Ok(body) => Err(Self::process_error_from_body(body)), - Err(e) => Err(e.into()), - }) - .into_stream(), - ), - } - }) - .try_flatten_stream() - } + Box::new(response) } /// Generic method for making a request to the Ipfs server, and getting @@ -407,7 +412,7 @@ impl IpfsClient { &self, req: Req, form: Option>, - ) -> impl Stream> + ) -> AsyncStreamResponse where Req: ApiRequest + Serialize, { @@ -421,50 +426,46 @@ impl IpfsClient { } } - /* - /// Generic method to return a streaming response of deserialized json - /// objects delineated by new line separators. - /// - fn request_stream_json( - &self, - req: &Req, - form: Option>, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de> + Send, - { - self.request_stream(req, form, |res| { - let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { - // Response has the Trailer header set. The StreamError trailer - // is used to indicate that there was an error while streaming - // data with Ipfs. - // - if trailer == "X-Stream-Error" { - true - } else { - let err = Error::UnrecognizedTrailerHeader( - String::from_utf8_lossy(trailer.as_ref()).into(), - ); - - // There was an unrecognized trailer value. If that is the case, - // create a stream that immediately errors. - // - return Box::new(stream::once(Err(err))); - } - } else { - false - }; - - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) - }) - } - } + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream_json( + &self, + req: Req, + form: Option>, + ) -> AsyncStreamResponse + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de> + Send, + { + self.request_stream(req, form, |res| { + let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + if trailer == "X-Stream-Error" { + true + } else { + let err = Error::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return Box::new(future::err(err).into_stream()); + } + } else { + false + }; - */ + Box::new(IpfsClient::process_stream_response( + res, + JsonLineDecoder::new(parse_stream_error), + )) + }) + } } impl IpfsClient { -- cgit v1.2.3