summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r--ipfs-api/src/client.rs98
1 files changed, 50 insertions, 48 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index da68919..30a4461 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -6,20 +6,18 @@
// copied, modified, or distributed except according to those terms.
//
-use futures::future::{Future, IntoFuture};
-use futures::stream::{self, Stream};
-use header::Trailer;
+use futures::{Future, IntoFuture, stream::{self, Stream}};
+use header::TRAILER;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error, ErrorKind};
-use hyper::{self, Chunk, Request, Response, StatusCode, Uri};
-use hyper::client::{Client, Config, HttpConnector};
+use http::uri::InvalidUri;
+use hyper::{self, Chunk, Request, Response, StatusCode, Uri, client::{Client, HttpConnector}};
use hyper_multipart::client::multipart;
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
-use tokio_core::reactor::Handle;
-use tokio_io::codec::{Decoder, FramedRead};
+use tokio_codec::{Decoder, FramedRead};
/// A response returned by the HTTP client.
///
@@ -33,38 +31,33 @@ type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
///
pub struct IpfsClient {
base: Uri,
- client: Client<HttpConnector, multipart::Body>,
+ client: Client<HttpConnector, hyper::Body>,
+}
+
+impl Default for IpfsClient {
+ /// Creates an `IpfsClient` connected to `localhost:5001`.
+ ///
+ fn default() -> IpfsClient {
+ IpfsClient::new("localhost", 5001).unwrap()
+ }
}
impl IpfsClient {
/// Creates a new `IpfsClient`.
///
#[inline]
- pub fn new(
- handle: &Handle,
- host: &str,
- port: u16,
- ) -> Result<IpfsClient, hyper::error::UriError> {
+ pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(host, port)?;
Ok(IpfsClient {
base: base_path,
- client: Config::default()
- .body::<multipart::Body>()
- .keep_alive(true)
- .build(handle),
+ client: Client::builder().keep_alive(true).build_http(),
})
}
- /// Creates an `IpfsClient` connected to `localhost:5001`.
- ///
- pub fn default(handle: &Handle) -> IpfsClient {
- IpfsClient::new(handle, "localhost", 5001).unwrap()
- }
-
/// Builds the base url path for the Ipfs api.
///
- fn build_base_path(host: &str, port: u16) -> Result<Uri, hyper::error::UriError> {
+ fn build_base_path(host: &str, port: u16) -> Result<Uri, InvalidUri> {
format!("http://{}:{}/api/v0", host, port).parse()
}
@@ -74,7 +67,7 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form>,
- ) -> Result<Request<multipart::Body>, Error>
+ ) -> Result<Request<hyper::Body>, Error>
where
Req: ApiRequest + Serialize,
{
@@ -85,17 +78,18 @@ impl IpfsClient {
::serde_urlencoded::to_string(req)?
);
- url.parse::<Uri>()
- .map(move |url| {
- let mut req = Request::new(Req::METHOD.clone(), url);
+ url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let mut builder = Request::builder();
+ let mut builder = builder.method(Req::METHOD.clone()).uri(url);
- if let Some(form) = form {
- form.set_body(&mut req);
- }
+ let req = if let Some(form) = form {
+ form.set_body(&mut builder)
+ } else {
+ builder.body(hyper::Body::empty())
+ };
- req
- })
- .map_err(From::from)
+ req.map_err(From::from)
+ })
}
/// Builds an Api error from a response body.
@@ -119,7 +113,7 @@ impl IpfsClient {
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
- StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from),
+ StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
}
}
@@ -128,14 +122,14 @@ impl IpfsClient {
/// results.
///
fn process_stream_response<D, Res>(
- res: Response,
+ res: Response<hyper::Body>,
decoder: D,
) -> Box<Stream<Item = Res, Error = Error>>
where
D: 'static + Decoder<Item = Res, Error = Error>,
Res: 'static,
{
- let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder);
+ let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder);
Box::new(stream)
}
@@ -157,7 +151,7 @@ impl IpfsClient {
.and_then(|res| {
let status = res.status();
- res.body().concat2().map(move |chunk| (status, chunk))
+ res.into_body().concat2().map(move |chunk| (status, chunk))
})
.from_err();
@@ -179,7 +173,7 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
Res: 'static,
- F: 'static + Fn(hyper::Response) -> AsyncStreamResponse<Res>,
+ F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res>,
{
match self.build_base_request(req, form) {
Ok(req) => {
@@ -188,14 +182,13 @@ impl IpfsClient {
.from_err()
.map(move |res| {
let stream: Box<Stream<Item = Res, Error = _>> = match res.status() {
- StatusCode::Ok => process(res),
-
+ StatusCode::OK => process(res),
// If the server responded with an error status code, the body
// still needs to be read so an error can be built. This block will
// read the entire body stream, then immediately return an error.
//
_ => Box::new(
- res.body()
+ res.into_body()
.concat2()
.from_err()
.and_then(|chunk| Err(Self::build_error_from_body(chunk)))
@@ -236,7 +229,7 @@ impl IpfsClient {
{
let res = self.request_raw(req, form)
.and_then(|(status, chunk)| match status {
- StatusCode::Ok => Ok(()),
+ StatusCode::OK => Ok(()),
_ => Err(Self::build_error_from_body(chunk)),
});
@@ -252,7 +245,7 @@ impl IpfsClient {
{
let res = self.request_raw(req, form)
.and_then(|(status, chunk)| match status {
- StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
});
@@ -270,7 +263,7 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
{
- self.request_stream(req, form, |res| Box::new(res.body().from_err()))
+ self.request_stream(req, form, |res| Box::new(res.into_body().from_err()))
}
/// Generic method to return a streaming response of deserialized json
@@ -286,13 +279,22 @@ impl IpfsClient {
for<'de> Res: 'static + Deserialize<'de>,
{
self.request_stream(req, form, |res| {
- let parse_stream_error = if let Some(trailer) = res.headers().get() {
+ let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
// Response has the Trailer header set. The StreamError trailer
// is used to indicate that there was an error while streaming
// data with Ipfs.
//
- match trailer {
- &Trailer::StreamError => true,
+ if trailer == "X-Stream-Error" {
+ true
+ } else {
+ let err = ErrorKind::UnrecognizedTrailerHeader(
+ String::from_utf8_lossy(trailer.as_ref()).into(),
+ );
+
+ // There was an unrecognized trailer value. If that is the case,
+ // create a stream that immediately errors.
+ //
+ return Box::new(stream::once(Err(err.into())));
}
} else {
false