From e522d1612af5116f8143eff046eb5fa60b86d938 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sun, 22 Dec 2019 22:10:07 -0500 Subject: fleshing out almost all of the core functionality --- ipfs-api/src/client.rs | 365 ++++++++++++++++++++++++++----------------------- ipfs-api/src/lib.rs | 8 -- 2 files changed, 192 insertions(+), 181 deletions(-) diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 1a02fc7..65aba7e 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -43,19 +43,12 @@ use std::{ }; use tokio_util::codec::{Decoder, FramedRead}; -/// A response returned by the HTTP client. -/// -#[cfg(feature = "actix")] -type AsyncResponse = Box> + 'static>; -#[cfg(feature = "hyper")] -type AsyncResponse = Box> + Send + 'static>; - /// A future that returns a stream of responses. /// #[cfg(feature = "actix")] -type AsyncStreamResponse = Box> + 'static>; +type AsyncStreamResponse = Box> + Unpin + 'static>; #[cfg(feature = "hyper")] -type AsyncStreamResponse = Box> + Send + 'static>; +type AsyncStreamResponse = Box> + Unpin + Send + 'static>; #[cfg(feature = "actix")] type Request = awc::ClientRequest; @@ -165,39 +158,42 @@ impl IpfsClient { Req::PATH, ::serde_urlencoded::to_string(req)? ); + #[cfg(feature = "hyper")] - let req = url.parse::().map_err(From::from).and_then(move |url| { - let builder = http::Request::builder(); - let builder = builder.method(Req::METHOD.clone()).uri(url); + { + url.parse::().map_err(From::from).and_then(move |url| { + let builder = http::Request::builder(); + let builder = builder.method(Req::METHOD.clone()).uri(url); - let req = if let Some(form) = form { - form.set_body_convert::(builder) - } else { - builder.body(hyper::Body::empty()) - }; + let req = if let Some(form) = form { + form.set_body_convert::(builder) + } else { + builder.body(hyper::Body::empty()) + }; - req.map_err(From::from) - }); + req.map_err(From::from) + }) + } #[cfg(feature = "actix")] - let req = if let Some(form) = form { - Ok(self - .client - .request(Req::METHOD.clone(), url) - .content_type(form.content_type())) - } else { - Ok(self.client.request(Req::METHOD.clone(), url)) - }; - - req + { + if let Some(form) = form { + Ok(self + .client + .request(Req::METHOD.clone(), url) + .content_type(form.content_type())) + } else { + Ok(self.client.request(Req::METHOD.clone(), url)) + } + } } /// Builds an Api error from a response body. /// #[inline] - fn build_error_from_body(chunk: Bytes) -> Error { - match serde_json::from_slice(&chunk) { + fn process_error_from_body(body: Bytes) -> Error { + match serde_json::from_slice(&body) { Ok(e) => Error::Api(e), - Err(_) => match String::from_utf8(chunk.to_vec()) { + Err(_) => match String::from_utf8(body.to_vec()) { Ok(s) => Error::Uncategorized(s), Err(e) => e.into(), }, @@ -207,13 +203,13 @@ impl IpfsClient { /// Processes a response that expects a json encoded body, returning an /// error or a deserialized json response. /// - fn process_json_response(status: StatusCode, chunk: Bytes) -> Result + fn process_json_response(status: StatusCode, body: Bytes) -> Result where for<'de> Res: 'static + Deserialize<'de>, { match status { - StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), + StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), + _ => Err(Self::process_error_from_body(body)), } } @@ -229,12 +225,13 @@ impl IpfsClient { Res: 'static, { #[cfg(feature = "hyper")] - let stream = FramedRead::new(StreamReader::new(res.into_body()), decoder); - + { + FramedRead::new(StreamReader::new(res.into_body()), decoder) + } #[cfg(feature = "actix")] - let stream = FramedRead::new(StreamReader::new(res), decoder); - - stream + { + FramedRead::new(StreamReader::new(res), decoder) + } } /// Generates a request, and returns the unprocessed response future. @@ -243,101 +240,112 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> impl Future> + 'static + ) -> impl Future> where Req: ApiRequest + Serialize, { - let client = self.client.clone(); - - future::ready(self.build_base_request(req, form)) - .and_then(move |req| { - #[cfg(feature = "hyper")] - let res = client - .request(req) - .and_then(|res| { - let status = res.status(); - - body::to_bytes(res.into_body()).map_ok(move |body| (status, body)) - }) - .err_into(); - - #[cfg(feature = "actix")] - let res = req - .timeout(std::time::Duration::from_secs(90)) - .send() - .err_into() - .and_then(|mut res| { - let status = res.status(); + let request = future::ready(self.build_base_request(req, form)); - res.body().map_ok(move |body| (status, body)).err_into() - }); + #[cfg(feature = "hyper")] + { + let client = self.client.clone(); - res - }) - .err_into() + request + .and_then(move |req| client.request(req).err_into()) + .and_then(|res| { + let status = res.status(); + + body::to_bytes(res.into_body()) + .map_ok(move |body| (status, body)) + .err_into() + }) + } + #[cfg(feature = "actix")] + { + request + .and_then(|req| { + req.timeout(std::time::Duration::from_secs(90)) + .send() + .err_into() + }) + .and_then(|mut res| { + let status = res.status(); + + res.body().map_ok(move |body| (status, body)).err_into() + }) + } } - /* - /// Generic method for making a request that expects back a streaming - /// response. - /// - fn request_stream( - &self, - req: &Req, - form: Option>, - process: F, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - Res: 'static + Send, - F: 'static + Fn(Response) -> AsyncStreamResponse + Send, + /// Generic method for making a request that expects back a streaming + /// response. + /// + fn request_stream( + &self, + req: &Req, + form: Option>, + process: F, + ) -> impl Stream> + '_ + where + Req: 'static + ApiRequest + Serialize, + Res: 'static + Send, + F: 'static + Fn(Response) -> AsyncStreamResponse, + { + let request = future::ready(self.build_base_request(req, form)); + + #[cfg(feature = "hyper")] { - #[cfg(feature = "hyper")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = self - .client - .request(req) - .from_err() - .map(move |res| { - let stream: Box + Send + 'static> = - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.into_body() - .concat2() - .from_err() - .and_then(|chunk| { - Err(Self::build_error_from_body(chunk.into_bytes())) - }) - .into_stream(), - ), - }; - - stream - }) - .flatten_stream(); - Box::new(res) - } - Err(e) => Box::new(stream::once(Err(e))), - } - #[cfg(feature = "actix")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = req - .timeout(std::time::Duration::from_secs(90)) + let client = self.client.clone(); + + request + .and_then(move |req| self.client.request(req).err_into()) + .map_ok(move |res| { + match res.status() { + StatusCode::OK => process(res), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + body::to_bytes(res.into_body()) + .boxed() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream(), + ), + } + }) + .try_flatten_stream() + } + #[cfg(feature = "actix")] + { + request + .and_then(|req| { + req.timeout(std::time::Duration::from_secs(90)) .send() - .from_err(); - Box::new(res.map(process).flatten_stream()) - } - Err(e) => Box::new(stream::once(Err(e))), - } + .err_into() + }) + .map_ok(move |mut res| { + match res.status() { + StatusCode::OK => process(res), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.body() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream(), + ), + } + }) + .try_flatten_stream() } - */ + } /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. @@ -346,9 +354,9 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> impl Future> + 'static + ) -> impl Future> where - Req: ApiRequest + Serialize + 'static, + Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de> + Send, { self.request_raw(req, form).map(|res| { @@ -363,60 +371,59 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> impl Future> + 'static + ) -> impl Future> where - Req: ApiRequest + Serialize + 'static, + Req: ApiRequest + Serialize, { self.request_raw(req, form).map(|res| { res.and_then(|(status, chunk)| match status { StatusCode::OK => Ok(()), - _ => Err(Self::build_error_from_body(chunk)), + _ => Err(Self::process_error_from_body(chunk)), }) }) } - /* - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw String response. - /// - fn request_string( - &self, - req: &Req, - form: Option>, - ) -> AsyncResponse - where - Req: ApiRequest + Serialize, - { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), - }); - - Box::new(res) - } + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw String response. + /// + fn request_string( + &self, + req: &Req, + form: Option>, + ) -> impl Future> + where + Req: ApiRequest + Serialize, + { + self.request_raw(req, form).map(|res| { + res.and_then(|(status, chunk)| match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::process_error_from_body(chunk)), + }) + }) + } - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw stream of bytes. - /// - fn request_stream_bytes( - &self, - req: &Req, - form: Option>, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - { - #[cfg(feature = "hyper")] - let res = self.request_stream(req, form, |res| { - Box::new(res.into_body().from_err().map(|c| c.into_bytes())) - }); - #[cfg(feature = "actix")] - let res = self.request_stream(req, form, |res| Box::new(res.from_err())); - res - } + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// + fn request_stream_bytes( + &self, + req: &Req, + form: Option>, + ) -> impl Stream> + '_ + where + Req: 'static + ApiRequest + Serialize, + { + #[cfg(feature = "hyper")] + { + self.request_stream(req, form, |res| Box::new(res.into_body().err_into())) + } + #[cfg(feature = "actix")] + { + self.request_stream(req, form, |res| Box::new(res.err_into())) + } + } + /* /// Generic method to return a streaming response of deserialized json /// objects delineated by new line separators. /// @@ -461,6 +468,7 @@ impl IpfsClient { */ } + impl IpfsClient { /// Add file to Ipfs. /// @@ -570,7 +578,9 @@ impl IpfsClient { .map(|mut responses: Vec| responses.pop().unwrap()), ) } + */ + /* /// Returns the current ledger for a peer. /// /// # Examples @@ -587,9 +597,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse { + pub fn bitswap_ledger( + &self, + peer: &str, + ) -> impl Future> { self.request(&request::BitswapLedger { peer }, None) } + */ /// Triggers a reprovide. /// @@ -607,7 +621,9 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn bitswap_reprovide(&self) -> AsyncResponse { + pub fn bitswap_reprovide( + &self, + ) -> impl Future> { self.request_empty(&request::BitswapReprovide, None) } @@ -627,10 +643,13 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn bitswap_stat(&self) -> AsyncResponse { + pub fn bitswap_stat( + &self, + ) -> impl Future> { self.request(&request::BitswapStat, None) } + /* /// Remove a given block from your wantlist. /// /// # Examples diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs index f7b7947..03f43c3 100644 --- a/ipfs-api/src/lib.rs +++ b/ipfs-api/src/lib.rs @@ -167,23 +167,15 @@ //! ``` //! -#[cfg(feature = "actix")] -extern crate actix_http; #[cfg(feature = "actix")] extern crate actix_multipart_rfc7578 as actix_multipart; #[cfg(feature = "actix")] -extern crate awc; -#[cfg(feature = "actix")] #[macro_use] extern crate derive_more; -#[cfg(feature = "hyper")] -extern crate hyper; #[cfg(feature = "hyper")] extern crate hyper_multipart_rfc7578 as hyper_multipart; #[cfg(feature = "hyper")] -extern crate hyper_tls; -#[cfg(feature = "hyper")] #[macro_use] extern crate failure; -- cgit v1.2.3