From 43a543840d23d9ee1e9c4baaa6f8066e3ba69eaf Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sun, 22 Dec 2019 02:22:52 -0500 Subject: beginning to upgrade client, but blergh --- ipfs-api/src/client.rs | 408 +++++++++++++++++++++++++------------------------ 1 file changed, 208 insertions(+), 200 deletions(-) diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 5d44c7c..1a02fc7 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -5,26 +5,31 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // -use crate::header::TRAILER; -#[cfg(feature = "hyper")] -use crate::hyper_multipart::client::multipart; -use crate::read::{JsonLineDecoder, LineDecoder, StreamReader}; -use crate::request::{self, ApiRequest}; -use crate::response::{self, Error}; +use crate::{ + header::TRAILER, + read::{JsonLineDecoder, LineDecoder, StreamReader}, + request::{self, ApiRequest}, + response::{self, Error}, +}; #[cfg(feature = "actix")] use actix_http::{encoding, Payload, PayloadStream}; #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; use futures::{ - future, - stream::{self, Stream}, - Future, IntoFuture, + future, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt, +}; +use http::{ + uri::{InvalidUri, Uri}, + StatusCode, }; -use http::uri::{InvalidUri, Uri}; -use http::StatusCode; #[cfg(feature = "hyper")] -use hyper::client::{self, Builder, HttpConnector}; +use hyper::{ + body, + client::{self, Builder, HttpConnector}, +}; +#[cfg(feature = "hyper")] +use hyper_multipart::client::multipart; #[cfg(feature = "hyper")] use hyper_tls::HttpsConnector; use multiaddr::{AddrComponent, ToMultiaddr}; @@ -36,21 +41,21 @@ use std::{ net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, }; -use tokio_codec::{Decoder, FramedRead}; +use tokio_util::codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// #[cfg(feature = "actix")] -type AsyncResponse = Box + 'static>; +type AsyncResponse = Box> + 'static>; #[cfg(feature = "hyper")] -type AsyncResponse = Box + Send + 'static>; +type AsyncResponse = Box> + Send + 'static>; /// A future that returns a stream of responses. /// #[cfg(feature = "actix")] -type AsyncStreamResponse = Box + 'static>; +type AsyncStreamResponse = Box> + 'static>; #[cfg(feature = "hyper")] -type AsyncStreamResponse = Box + Send + 'static>; +type AsyncStreamResponse = Box> + Send + 'static>; #[cfg(feature = "actix")] type Request = awc::ClientRequest; @@ -130,10 +135,9 @@ impl IpfsClient { Ok(IpfsClient { base: base_path, #[cfg(feature = "hyper")] - client: { - let connector = HttpsConnector::new(4).unwrap(); - Builder::default().keep_alive(false).build(connector) - }, + client: Builder::default() + .keep_alive(false) + .build(HttpsConnector::new()), #[cfg(feature = "actix")] client: Client::default(), }) @@ -163,11 +167,11 @@ impl IpfsClient { ); #[cfg(feature = "hyper")] let req = url.parse::().map_err(From::from).and_then(move |url| { - let mut builder = http::Request::builder(); - let mut builder = builder.method(Req::METHOD.clone()).uri(url); + let builder = http::Request::builder(); + let builder = builder.method(Req::METHOD.clone()).uri(url); let req = if let Some(form) = form { - form.set_body_convert::(&mut builder) + form.set_body_convert::(builder) } else { builder.body(hyper::Body::empty()) }; @@ -216,21 +220,21 @@ impl IpfsClient { /// Processes a response that returns a stream of json deserializable /// results. /// - fn process_stream_response(res: Response, decoder: D) -> AsyncStreamResponse + fn process_stream_response( + res: Response, + decoder: D, + ) -> impl Stream> where D: 'static + Decoder + Send, Res: 'static, { #[cfg(feature = "hyper")] - let stream = FramedRead::new( - StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()), - decoder, - ); + let stream = FramedRead::new(StreamReader::new(res.into_body()), decoder); #[cfg(feature = "actix")] - let stream = FramedRead::new(StreamReader::new(res.from_err()), decoder); + let stream = FramedRead::new(StreamReader::new(res), decoder); - Box::new(stream) + stream } /// Generates a request, and returns the unprocessed response future. @@ -239,98 +243,101 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncResponse<(StatusCode, Bytes)> + ) -> impl Future> + 'static where Req: ApiRequest + Serialize, { - match self.build_base_request(req, form) { - Ok(req) => { + let client = self.client.clone(); + + future::ready(self.build_base_request(req, form)) + .and_then(move |req| { #[cfg(feature = "hyper")] - let res = self - .client + let res = client .request(req) .and_then(|res| { let status = res.status(); - res.into_body() - .concat2() - .map(move |chunk| (status, chunk.into_bytes())) + body::to_bytes(res.into_body()).map_ok(move |body| (status, body)) }) - .from_err(); + .err_into(); + #[cfg(feature = "actix")] let res = req .timeout(std::time::Duration::from_secs(90)) .send() - .from_err() - .and_then(|mut x| { - let status = x.status(); - x.body().map(move |body| (status, body)).from_err() + .err_into() + .and_then(|mut res| { + let status = res.status(); + + res.body().map_ok(move |body| (status, body)).err_into() }); - Box::new(res) - } - Err(e) => Box::new(Err(e).into_future()), - } - } - /// Generic method for making a request that expects back a streaming - /// response. - /// - fn request_stream( - &self, - req: &Req, - form: Option>, - process: F, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - Res: 'static + Send, - F: 'static + Fn(Response) -> AsyncStreamResponse + Send, - { - #[cfg(feature = "hyper")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = self - .client - .request(req) - .from_err() - .map(move |res| { - let stream: Box + Send + 'static> = - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.into_body() - .concat2() - .from_err() - .and_then(|chunk| { - Err(Self::build_error_from_body(chunk.into_bytes())) - }) - .into_stream(), - ), - }; - - stream - }) - .flatten_stream(); - Box::new(res) + res + }) + .err_into() + } + + /* + /// Generic method for making a request that expects back a streaming + /// response. + /// + fn request_stream( + &self, + req: &Req, + form: Option>, + process: F, + ) -> AsyncStreamResponse + where + Req: ApiRequest + Serialize, + Res: 'static + Send, + F: 'static + Fn(Response) -> AsyncStreamResponse + Send, + { + #[cfg(feature = "hyper")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = self + .client + .request(req) + .from_err() + .map(move |res| { + let stream: Box + Send + 'static> = + match res.status() { + StatusCode::OK => process(res), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.into_body() + .concat2() + .from_err() + .and_then(|chunk| { + Err(Self::build_error_from_body(chunk.into_bytes())) + }) + .into_stream(), + ), + }; + + stream + }) + .flatten_stream(); + Box::new(res) + } + Err(e) => Box::new(stream::once(Err(e))), } - Err(e) => Box::new(stream::once(Err(e))), - } - #[cfg(feature = "actix")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = req - .timeout(std::time::Duration::from_secs(90)) - .send() - .from_err(); - Box::new(res.map(process).flatten_stream()) + #[cfg(feature = "actix")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = req + .timeout(std::time::Duration::from_secs(90)) + .send() + .from_err(); + Box::new(res.map(process).flatten_stream()) + } + Err(e) => Box::new(stream::once(Err(e))), } - Err(e) => Box::new(stream::once(Err(e))), } - } + */ /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. @@ -339,16 +346,14 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncResponse + ) -> impl Future> + 'static where - Req: ApiRequest + Serialize, + Req: ApiRequest + Serialize + 'static, for<'de> Res: 'static + Deserialize<'de> + Send, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)); - - Box::new(res) + self.request_raw(req, form).map(|res| { + res.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)) + }) } /// Generic method for making a request to the Ipfs server, and getting @@ -358,103 +363,104 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncResponse<()> + ) -> impl Future> + 'static where - Req: ApiRequest + Serialize, + Req: ApiRequest + Serialize + 'static, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { + self.request_raw(req, form).map(|res| { + res.and_then(|(status, chunk)| match status { StatusCode::OK => Ok(()), _ => Err(Self::build_error_from_body(chunk)), - }); - - Box::new(res) + }) + }) } - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw String response. - /// - fn request_string( - &self, - req: &Req, - form: Option>, - ) -> AsyncResponse - where - Req: ApiRequest + Serialize, - { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), - }); + /* + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw String response. + /// + fn request_string( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse + where + Req: ApiRequest + Serialize, + { + let res = self + .request_raw(req, form) + .and_then(|(status, chunk)| match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::build_error_from_body(chunk)), + }); - Box::new(res) - } + Box::new(res) + } - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw stream of bytes. - /// - fn request_stream_bytes( - &self, - req: &Req, - form: Option>, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - { - #[cfg(feature = "hyper")] - let res = self.request_stream(req, form, |res| { - Box::new(res.into_body().from_err().map(|c| c.into_bytes())) - }); - #[cfg(feature = "actix")] - let res = self.request_stream(req, form, |res| Box::new(res.from_err())); - res - } + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// + fn request_stream_bytes( + &self, + req: &Req, + form: Option>, + ) -> AsyncStreamResponse + where + Req: ApiRequest + Serialize, + { + #[cfg(feature = "hyper")] + let res = self.request_stream(req, form, |res| { + Box::new(res.into_body().from_err().map(|c| c.into_bytes())) + }); + #[cfg(feature = "actix")] + let res = self.request_stream(req, form, |res| Box::new(res.from_err())); + res + } - /// Generic method to return a streaming response of deserialized json - /// objects delineated by new line separators. - /// - fn request_stream_json( - &self, - req: &Req, - form: Option>, - ) -> AsyncStreamResponse - where - Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de> + Send, - { - self.request_stream(req, form, |res| { - let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { - // Response has the Trailer header set. The StreamError trailer - // is used to indicate that there was an error while streaming - // data with Ipfs. - // - if trailer == "X-Stream-Error" { - true - } else { - let err = Error::UnrecognizedTrailerHeader( - String::from_utf8_lossy(trailer.as_ref()).into(), - ); - - // There was an unrecognized trailer value. If that is the case, - // create a stream that immediately errors. - // - return Box::new(stream::once(Err(err))); - } - } else { - false - }; + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream_json( + &self, + req: &Req, + form: Option>, + ) -> AsyncStreamResponse + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de> + Send, + { + self.request_stream(req, form, |res| { + let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + if trailer == "X-Stream-Error" { + true + } else { + let err = Error::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return Box::new(stream::once(Err(err))); + } + } else { + false + }; + + Box::new(IpfsClient::process_stream_response( + res, + JsonLineDecoder::new(parse_stream_error), + )) + }) + } + } - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) - }) - } + */ } - impl IpfsClient { /// Add file to Ipfs. /// @@ -474,9 +480,9 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn add(&self, data: R) -> AsyncResponse + pub fn add(&self, data: R) -> impl Future> where - R: 'static + Read + Send, + R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); @@ -485,6 +491,7 @@ impl IpfsClient { self.request(&request::Add, Some(form)) } + /* /// Add a path to Ipfs. Can be a file or directory. /// A hard limit of 128 open file descriptors is set such /// that any small additional files are stored in-memory. @@ -2278,4 +2285,5 @@ impl IpfsClient { pub fn version(&self) -> AsyncResponse { self.request(&request::Version, None) } + */ } -- cgit v1.2.3