diff options
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r-- | ipfs-api/src/client.rs | 98 |
1 files changed, 50 insertions, 48 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index da68919..30a4461 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -6,20 +6,18 @@ // copied, modified, or distributed except according to those terms. // -use futures::future::{Future, IntoFuture}; -use futures::stream::{self, Stream}; -use header::Trailer; +use futures::{Future, IntoFuture, stream::{self, Stream}}; +use header::TRAILER; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; use response::{self, Error, ErrorKind}; -use hyper::{self, Chunk, Request, Response, StatusCode, Uri}; -use hyper::client::{Client, Config, HttpConnector}; +use http::uri::InvalidUri; +use hyper::{self, Chunk, Request, Response, StatusCode, Uri, client::{Client, HttpConnector}}; use hyper_multipart::client::multipart; use serde::{Deserialize, Serialize}; use serde_json; use std::io::Read; -use tokio_core::reactor::Handle; -use tokio_io::codec::{Decoder, FramedRead}; +use tokio_codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// @@ -33,38 +31,33 @@ type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>; /// pub struct IpfsClient { base: Uri, - client: Client<HttpConnector, multipart::Body>, + client: Client<HttpConnector, hyper::Body>, +} + +impl Default for IpfsClient { + /// Creates an `IpfsClient` connected to `localhost:5001`. + /// + fn default() -> IpfsClient { + IpfsClient::new("localhost", 5001).unwrap() + } } impl IpfsClient { /// Creates a new `IpfsClient`. /// #[inline] - pub fn new( - handle: &Handle, - host: &str, - port: u16, - ) -> Result<IpfsClient, hyper::error::UriError> { + pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> { let base_path = IpfsClient::build_base_path(host, port)?; Ok(IpfsClient { base: base_path, - client: Config::default() - .body::<multipart::Body>() - .keep_alive(true) - .build(handle), + client: Client::builder().keep_alive(true).build_http(), }) } - /// Creates an `IpfsClient` connected to `localhost:5001`. - /// - pub fn default(handle: &Handle) -> IpfsClient { - IpfsClient::new(handle, "localhost", 5001).unwrap() - } - /// Builds the base url path for the Ipfs api. /// - fn build_base_path(host: &str, port: u16) -> Result<Uri, hyper::error::UriError> { + fn build_base_path(host: &str, port: u16) -> Result<Uri, InvalidUri> { format!("http://{}:{}/api/v0", host, port).parse() } @@ -74,7 +67,7 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form>, - ) -> Result<Request<multipart::Body>, Error> + ) -> Result<Request<hyper::Body>, Error> where Req: ApiRequest + Serialize, { @@ -85,17 +78,18 @@ impl IpfsClient { ::serde_urlencoded::to_string(req)? ); - url.parse::<Uri>() - .map(move |url| { - let mut req = Request::new(Req::METHOD.clone(), url); + url.parse::<Uri>().map_err(From::from).and_then(move |url| { + let mut builder = Request::builder(); + let mut builder = builder.method(Req::METHOD.clone()).uri(url); - if let Some(form) = form { - form.set_body(&mut req); - } + let req = if let Some(form) = form { + form.set_body(&mut builder) + } else { + builder.body(hyper::Body::empty()) + }; - req - }) - .map_err(From::from) + req.map_err(From::from) + }) } /// Builds an Api error from a response body. @@ -119,7 +113,7 @@ impl IpfsClient { for<'de> Res: 'static + Deserialize<'de>, { match status { - StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from), + StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from), _ => Err(Self::build_error_from_body(chunk)), } } @@ -128,14 +122,14 @@ impl IpfsClient { /// results. /// fn process_stream_response<D, Res>( - res: Response, + res: Response<hyper::Body>, decoder: D, ) -> Box<Stream<Item = Res, Error = Error>> where D: 'static + Decoder<Item = Res, Error = Error>, Res: 'static, { - let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder); + let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder); Box::new(stream) } @@ -157,7 +151,7 @@ impl IpfsClient { .and_then(|res| { let status = res.status(); - res.body().concat2().map(move |chunk| (status, chunk)) + res.into_body().concat2().map(move |chunk| (status, chunk)) }) .from_err(); @@ -179,7 +173,7 @@ impl IpfsClient { where Req: ApiRequest + Serialize, Res: 'static, - F: 'static + Fn(hyper::Response) -> AsyncStreamResponse<Res>, + F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res>, { match self.build_base_request(req, form) { Ok(req) => { @@ -188,14 +182,13 @@ impl IpfsClient { .from_err() .map(move |res| { let stream: Box<Stream<Item = Res, Error = _>> = match res.status() { - StatusCode::Ok => process(res), - + StatusCode::OK => process(res), // If the server responded with an error status code, the body // still needs to be read so an error can be built. This block will // read the entire body stream, then immediately return an error. // _ => Box::new( - res.body() + res.into_body() .concat2() .from_err() .and_then(|chunk| Err(Self::build_error_from_body(chunk))) @@ -236,7 +229,7 @@ impl IpfsClient { { let res = self.request_raw(req, form) .and_then(|(status, chunk)| match status { - StatusCode::Ok => Ok(()), + StatusCode::OK => Ok(()), _ => Err(Self::build_error_from_body(chunk)), }); @@ -252,7 +245,7 @@ impl IpfsClient { { let res = self.request_raw(req, form) .and_then(|(status, chunk)| match status { - StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from), + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), _ => Err(Self::build_error_from_body(chunk)), }); @@ -270,7 +263,7 @@ impl IpfsClient { where Req: ApiRequest + Serialize, { - self.request_stream(req, form, |res| Box::new(res.body().from_err())) + self.request_stream(req, form, |res| Box::new(res.into_body().from_err())) } /// Generic method to return a streaming response of deserialized json @@ -286,13 +279,22 @@ impl IpfsClient { for<'de> Res: 'static + Deserialize<'de>, { self.request_stream(req, form, |res| { - let parse_stream_error = if let Some(trailer) = res.headers().get() { + let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { // Response has the Trailer header set. The StreamError trailer // is used to indicate that there was an error while streaming // data with Ipfs. // - match trailer { - &Trailer::StreamError => true, + if trailer == "X-Stream-Error" { + true + } else { + let err = ErrorKind::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return Box::new(stream::once(Err(err.into()))); } } else { false |