diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2019-12-22 02:22:52 -0500 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2019-12-22 02:22:52 -0500 |
commit | 43a543840d23d9ee1e9c4baaa6f8066e3ba69eaf (patch) | |
tree | 2985f4d9d3fc9d05d24b75048f5e94fa4138343e | |
parent | 27c25f96ac923bb4d6e02890eae08752f1c8228e (diff) |
beginning to upgrade client, but blergh
-rw-r--r-- | ipfs-api/src/client.rs | 408 |
1 files changed, 208 insertions, 200 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 5d44c7c..1a02fc7 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -5,26 +5,31 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // -use crate::header::TRAILER; -#[cfg(feature = "hyper")] -use crate::hyper_multipart::client::multipart; -use crate::read::{JsonLineDecoder, LineDecoder, StreamReader}; -use crate::request::{self, ApiRequest}; -use crate::response::{self, Error}; +use crate::{ + header::TRAILER, + read::{JsonLineDecoder, LineDecoder, StreamReader}, + request::{self, ApiRequest}, + response::{self, Error}, +}; #[cfg(feature = "actix")] use actix_http::{encoding, Payload, PayloadStream}; #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; use futures::{ - future, - stream::{self, Stream}, - Future, IntoFuture, + future, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt, +}; +use http::{ + uri::{InvalidUri, Uri}, + StatusCode, }; -use http::uri::{InvalidUri, Uri}; -use http::StatusCode; #[cfg(feature = "hyper")] -use hyper::client::{self, Builder, HttpConnector}; +use hyper::{ + body, + client::{self, Builder, HttpConnector}, +}; +#[cfg(feature = "hyper")] +use hyper_multipart::client::multipart; #[cfg(feature = "hyper")] use hyper_tls::HttpsConnector; use multiaddr::{AddrComponent, ToMultiaddr}; @@ -36,21 +41,21 @@ use std::{ net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, }; -use tokio_codec::{Decoder, FramedRead}; +use tokio_util::codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// #[cfg(feature = "actix")] -type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + 'static>; +type AsyncResponse<T> = Box<dyn Future<Output = Result<T, Error>> + 'static>; #[cfg(feature = "hyper")] -type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + Send + 'static>; +type AsyncResponse<T> = Box<dyn Future<Output = Result<T, Error>> + Send + 'static>; /// A future that returns a stream of responses. /// #[cfg(feature = "actix")] -type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + 'static>; +type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + 'static>; #[cfg(feature = "hyper")] -type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>; +type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + Send + 'static>; #[cfg(feature = "actix")] type Request = awc::ClientRequest; @@ -130,10 +135,9 @@ impl IpfsClient { Ok(IpfsClient { base: base_path, #[cfg(feature = "hyper")] - client: { - let connector = HttpsConnector::new(4).unwrap(); - Builder::default().keep_alive(false).build(connector) - }, + client: Builder::default() + .keep_alive(false) + .build(HttpsConnector::new()), #[cfg(feature = "actix")] client: Client::default(), }) @@ -163,11 +167,11 @@ impl IpfsClient { ); #[cfg(feature = "hyper")] let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| { - let mut builder = http::Request::builder(); - let mut builder = builder.method(Req::METHOD.clone()).uri(url); + let builder = http::Request::builder(); + let builder = builder.method(Req::METHOD.clone()).uri(url); let req = if let Some(form) = form { - form.set_body_convert::<hyper::Body, multipart::Body>(&mut builder) + form.set_body_convert::<hyper::Body, multipart::Body>(builder) } else { builder.body(hyper::Body::empty()) }; @@ -216,21 +220,21 @@ impl IpfsClient { /// Processes a response that returns a stream of json deserializable /// results. /// - fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res> + fn process_stream_response<D, Res>( + res: Response, + decoder: D, + ) -> impl Stream<Item = Result<Res, Error>> where D: 'static + Decoder<Item = Res, Error = Error> + Send, Res: 'static, { #[cfg(feature = "hyper")] - let stream = FramedRead::new( - StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()), - decoder, - ); + let stream = FramedRead::new(StreamReader::new(res.into_body()), decoder); #[cfg(feature = "actix")] - let stream = FramedRead::new(StreamReader::new(res.from_err()), decoder); + let stream = FramedRead::new(StreamReader::new(res), decoder); - Box::new(stream) + stream } /// Generates a request, and returns the unprocessed response future. @@ -239,98 +243,101 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<(StatusCode, Bytes)> + ) -> impl Future<Output = Result<(StatusCode, Bytes), Error>> + 'static where Req: ApiRequest + Serialize, { - match self.build_base_request(req, form) { - Ok(req) => { + let client = self.client.clone(); + + future::ready(self.build_base_request(req, form)) + .and_then(move |req| { #[cfg(feature = "hyper")] - let res = self - .client + let res = client .request(req) .and_then(|res| { let status = res.status(); - res.into_body() - .concat2() - .map(move |chunk| (status, chunk.into_bytes())) + body::to_bytes(res.into_body()).map_ok(move |body| (status, body)) }) - .from_err(); + .err_into(); + #[cfg(feature = "actix")] let res = req .timeout(std::time::Duration::from_secs(90)) .send() - .from_err() - .and_then(|mut x| { - let status = x.status(); - x.body().map(move |body| (status, body)).from_err() + .err_into() + .and_then(|mut res| { + let status = res.status(); + + res.body().map_ok(move |body| (status, body)).err_into() }); - Box::new(res) - } - Err(e) => Box::new(Err(e).into_future()), - } - } - /// Generic method for making a request that expects back a streaming - /// response. - /// - fn request_stream<Req, Res, F>( - &self, - req: &Req, - form: Option<multipart::Form<'static>>, - process: F, - ) -> AsyncStreamResponse<Res> - where - Req: ApiRequest + Serialize, - Res: 'static + Send, - F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send, - { - #[cfg(feature = "hyper")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = self - .client - .request(req) - .from_err() - .map(move |res| { - let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> = - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.into_body() - .concat2() - .from_err() - .and_then(|chunk| { - Err(Self::build_error_from_body(chunk.into_bytes())) - }) - .into_stream(), - ), - }; - - stream - }) - .flatten_stream(); - Box::new(res) + res + }) + .err_into() + } + + /* + /// Generic method for making a request that expects back a streaming + /// response. + /// + fn request_stream<Req, Res, F>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + process: F, + ) -> AsyncStreamResponse<Res> + where + Req: ApiRequest + Serialize, + Res: 'static + Send, + F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send, + { + #[cfg(feature = "hyper")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = self + .client + .request(req) + .from_err() + .map(move |res| { + let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> = + match res.status() { + StatusCode::OK => process(res), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.into_body() + .concat2() + .from_err() + .and_then(|chunk| { + Err(Self::build_error_from_body(chunk.into_bytes())) + }) + .into_stream(), + ), + }; + + stream + }) + .flatten_stream(); + Box::new(res) + } + Err(e) => Box::new(stream::once(Err(e))), } - Err(e) => Box::new(stream::once(Err(e))), - } - #[cfg(feature = "actix")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = req - .timeout(std::time::Duration::from_secs(90)) - .send() - .from_err(); - Box::new(res.map(process).flatten_stream()) + #[cfg(feature = "actix")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = req + .timeout(std::time::Duration::from_secs(90)) + .send() + .from_err(); + Box::new(res.map(process).flatten_stream()) + } + Err(e) => Box::new(stream::once(Err(e))), } - Err(e) => Box::new(stream::once(Err(e))), } - } + */ /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. @@ -339,16 +346,14 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<Res> + ) -> impl Future<Output = Result<Res, Error>> + 'static where - Req: ApiRequest + Serialize, + Req: ApiRequest + Serialize + 'static, for<'de> Res: 'static + Deserialize<'de> + Send, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)); - - Box::new(res) + self.request_raw(req, form).map(|res| { + res.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)) + }) } /// Generic method for making a request to the Ipfs server, and getting @@ -358,103 +363,104 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<()> + ) -> impl Future<Output = Result<(), Error>> + 'static where - Req: ApiRequest + Serialize, + Req: ApiRequest + Serialize + 'static, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { + self.request_raw(req, form).map(|res| { + res.and_then(|(status, chunk)| match status { StatusCode::OK => Ok(()), _ => Err(Self::build_error_from_body(chunk)), - }); - - Box::new(res) + }) + }) } - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw String response. - /// - fn request_string<Req>( - &self, - req: &Req, - form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<String> - where - Req: ApiRequest + Serialize, - { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), - }); + /* + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw String response. + /// + fn request_string<Req>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + ) -> AsyncResponse<String> + where + Req: ApiRequest + Serialize, + { + let res = self + .request_raw(req, form) + .and_then(|(status, chunk)| match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::build_error_from_body(chunk)), + }); - Box::new(res) - } + Box::new(res) + } - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw stream of bytes. - /// - fn request_stream_bytes<Req>( - &self, - req: &Req, - form: Option<multipart::Form<'static>>, - ) -> AsyncStreamResponse<Bytes> - where - Req: ApiRequest + Serialize, - { - #[cfg(feature = "hyper")] - let res = self.request_stream(req, form, |res| { - Box::new(res.into_body().from_err().map(|c| c.into_bytes())) - }); - #[cfg(feature = "actix")] - let res = self.request_stream(req, form, |res| Box::new(res.from_err())); - res - } + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// + fn request_stream_bytes<Req>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + ) -> AsyncStreamResponse<Bytes> + where + Req: ApiRequest + Serialize, + { + #[cfg(feature = "hyper")] + let res = self.request_stream(req, form, |res| { + Box::new(res.into_body().from_err().map(|c| c.into_bytes())) + }); + #[cfg(feature = "actix")] + let res = self.request_stream(req, form, |res| Box::new(res.from_err())); + res + } - /// Generic method to return a streaming response of deserialized json - /// objects delineated by new line separators. - /// - fn request_stream_json<Req, Res>( - &self, - req: &Req, - form: Option<multipart::Form<'static>>, - ) -> AsyncStreamResponse<Res> - where - Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de> + Send, - { - self.request_stream(req, form, |res| { - let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { - // Response has the Trailer header set. The StreamError trailer - // is used to indicate that there was an error while streaming - // data with Ipfs. - // - if trailer == "X-Stream-Error" { - true - } else { - let err = Error::UnrecognizedTrailerHeader( - String::from_utf8_lossy(trailer.as_ref()).into(), - ); - - // There was an unrecognized trailer value. If that is the case, - // create a stream that immediately errors. - // - return Box::new(stream::once(Err(err))); - } - } else { - false - }; + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream_json<Req, Res>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + ) -> AsyncStreamResponse<Res> + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de> + Send, + { + self.request_stream(req, form, |res| { + let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + if trailer == "X-Stream-Error" { + true + } else { + let err = Error::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return Box::new(stream::once(Err(err))); + } + } else { + false + }; + + Box::new(IpfsClient::process_stream_response( + res, + JsonLineDecoder::new(parse_stream_error), + )) + }) + } + } - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) - }) - } + */ } - impl IpfsClient { /// Add file to Ipfs. /// @@ -474,9 +480,9 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse> + pub fn add<R>(&self, data: R) -> impl Future<Output = Result<response::AddResponse, Error>> where - R: 'static + Read + Send, + R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); @@ -485,6 +491,7 @@ impl IpfsClient { self.request(&request::Add, Some(form)) } + /* /// Add a path to Ipfs. Can be a file or directory. /// A hard limit of 128 open file descriptors is set such /// that any small additional files are stored in-memory. @@ -2278,4 +2285,5 @@ impl IpfsClient { pub fn version(&self) -> AsyncResponse<response::VersionResponse> { self.request(&request::Version, None) } + */ } |