summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2018-06-27 00:22:50 -0400
committerFerris Tseng <ferristseng@fastmail.fm>2018-06-27 00:22:50 -0400
commit148b989784b47a4034ff82be1760e73a1429dbeb (patch)
treebc9acc23d403c4c5307ad1cf6b4765a94dacdd9c
parent90d3c0a8bb552c249bffb67d20b06869a24bd0a8 (diff)
get ipfs-api compiling
-rw-r--r--ipfs-api/Cargo.toml3
-rw-r--r--ipfs-api/src/client.rs98
-rw-r--r--ipfs-api/src/header.rs74
-rw-r--r--ipfs-api/src/lib.rs4
-rw-r--r--ipfs-api/src/read.rs26
-rw-r--r--ipfs-api/src/request/add.rs2
-rw-r--r--ipfs-api/src/request/block.rs2
-rw-r--r--ipfs-api/src/request/config.rs2
-rw-r--r--ipfs-api/src/request/dag.rs2
-rw-r--r--ipfs-api/src/request/files.rs2
-rw-r--r--ipfs-api/src/request/mod.rs2
-rw-r--r--ipfs-api/src/request/tar.rs2
-rw-r--r--ipfs-api/src/response/error.rs13
13 files changed, 85 insertions, 147 deletions
diff --git a/ipfs-api/Cargo.toml b/ipfs-api/Cargo.toml
index ea84310..e869e93 100644
--- a/ipfs-api/Cargo.toml
+++ b/ipfs-api/Cargo.toml
@@ -6,7 +6,7 @@ documentation = "https://docs.rs/ipfs-api"
repository = "https://github.com/ferristseng/rust-ipfs-api"
keywords = ["ipfs"]
categories = ["filesystem", "web-programming"]
-version = "0.4.0-alpha.3"
+version = "0.5.0-alpha1"
readme = "../README.md"
license = "MIT OR Apache-2.0"
@@ -17,6 +17,7 @@ travis-ci = { repository = "ferristseng/rust-ipfs-api" }
bytes = "0.4"
error-chain = "0.12"
futures = "0.1"
+http = "0.1"
hyper = "0.12"
hyper-multipart-rfc7578 = "0.2.0-alpha1"
serde = "1.0"
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index da68919..30a4461 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -6,20 +6,18 @@
// copied, modified, or distributed except according to those terms.
//
-use futures::future::{Future, IntoFuture};
-use futures::stream::{self, Stream};
-use header::Trailer;
+use futures::{Future, IntoFuture, stream::{self, Stream}};
+use header::TRAILER;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error, ErrorKind};
-use hyper::{self, Chunk, Request, Response, StatusCode, Uri};
-use hyper::client::{Client, Config, HttpConnector};
+use http::uri::InvalidUri;
+use hyper::{self, Chunk, Request, Response, StatusCode, Uri, client::{Client, HttpConnector}};
use hyper_multipart::client::multipart;
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
-use tokio_core::reactor::Handle;
-use tokio_io::codec::{Decoder, FramedRead};
+use tokio_codec::{Decoder, FramedRead};
/// A response returned by the HTTP client.
///
@@ -33,38 +31,33 @@ type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
///
pub struct IpfsClient {
base: Uri,
- client: Client<HttpConnector, multipart::Body>,
+ client: Client<HttpConnector, hyper::Body>,
+}
+
+impl Default for IpfsClient {
+ /// Creates an `IpfsClient` connected to `localhost:5001`.
+ ///
+ fn default() -> IpfsClient {
+ IpfsClient::new("localhost", 5001).unwrap()
+ }
}
impl IpfsClient {
/// Creates a new `IpfsClient`.
///
#[inline]
- pub fn new(
- handle: &Handle,
- host: &str,
- port: u16,
- ) -> Result<IpfsClient, hyper::error::UriError> {
+ pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(host, port)?;
Ok(IpfsClient {
base: base_path,
- client: Config::default()
- .body::<multipart::Body>()
- .keep_alive(true)
- .build(handle),
+ client: Client::builder().keep_alive(true).build_http(),
})
}
- /// Creates an `IpfsClient` connected to `localhost:5001`.
- ///
- pub fn default(handle: &Handle) -> IpfsClient {
- IpfsClient::new(handle, "localhost", 5001).unwrap()
- }
-
/// Builds the base url path for the Ipfs api.
///
- fn build_base_path(host: &str, port: u16) -> Result<Uri, hyper::error::UriError> {
+ fn build_base_path(host: &str, port: u16) -> Result<Uri, InvalidUri> {
format!("http://{}:{}/api/v0", host, port).parse()
}
@@ -74,7 +67,7 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form>,
- ) -> Result<Request<multipart::Body>, Error>
+ ) -> Result<Request<hyper::Body>, Error>
where
Req: ApiRequest + Serialize,
{
@@ -85,17 +78,18 @@ impl IpfsClient {
::serde_urlencoded::to_string(req)?
);
- url.parse::<Uri>()
- .map(move |url| {
- let mut req = Request::new(Req::METHOD.clone(), url);
+ url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let mut builder = Request::builder();
+ let mut builder = builder.method(Req::METHOD.clone()).uri(url);
- if let Some(form) = form {
- form.set_body(&mut req);
- }
+ let req = if let Some(form) = form {
+ form.set_body(&mut builder)
+ } else {
+ builder.body(hyper::Body::empty())
+ };
- req
- })
- .map_err(From::from)
+ req.map_err(From::from)
+ })
}
/// Builds an Api error from a response body.
@@ -119,7 +113,7 @@ impl IpfsClient {
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
- StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from),
+ StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
}
}
@@ -128,14 +122,14 @@ impl IpfsClient {
/// results.
///
fn process_stream_response<D, Res>(
- res: Response,
+ res: Response<hyper::Body>,
decoder: D,
) -> Box<Stream<Item = Res, Error = Error>>
where
D: 'static + Decoder<Item = Res, Error = Error>,
Res: 'static,
{
- let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder);
+ let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder);
Box::new(stream)
}
@@ -157,7 +151,7 @@ impl IpfsClient {
.and_then(|res| {
let status = res.status();
- res.body().concat2().map(move |chunk| (status, chunk))
+ res.into_body().concat2().map(move |chunk| (status, chunk))
})
.from_err();
@@ -179,7 +173,7 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
Res: 'static,
- F: 'static + Fn(hyper::Response) -> AsyncStreamResponse<Res>,
+ F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res>,
{
match self.build_base_request(req, form) {
Ok(req) => {
@@ -188,14 +182,13 @@ impl IpfsClient {
.from_err()
.map(move |res| {
let stream: Box<Stream<Item = Res, Error = _>> = match res.status() {
- StatusCode::Ok => process(res),
-
+ StatusCode::OK => process(res),
// If the server responded with an error status code, the body
// still needs to be read so an error can be built. This block will
// read the entire body stream, then immediately return an error.
//
_ => Box::new(
- res.body()
+ res.into_body()
.concat2()
.from_err()
.and_then(|chunk| Err(Self::build_error_from_body(chunk)))
@@ -236,7 +229,7 @@ impl IpfsClient {
{
let res = self.request_raw(req, form)
.and_then(|(status, chunk)| match status {
- StatusCode::Ok => Ok(()),
+ StatusCode::OK => Ok(()),
_ => Err(Self::build_error_from_body(chunk)),
});
@@ -252,7 +245,7 @@ impl IpfsClient {
{
let res = self.request_raw(req, form)
.and_then(|(status, chunk)| match status {
- StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
});
@@ -270,7 +263,7 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
{
- self.request_stream(req, form, |res| Box::new(res.body().from_err()))
+ self.request_stream(req, form, |res| Box::new(res.into_body().from_err()))
}
/// Generic method to return a streaming response of deserialized json
@@ -286,13 +279,22 @@ impl IpfsClient {
for<'de> Res: 'static + Deserialize<'de>,
{
self.request_stream(req, form, |res| {
- let parse_stream_error = if let Some(trailer) = res.headers().get() {
+ let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
// Response has the Trailer header set. The StreamError trailer
// is used to indicate that there was an error while streaming
// data with Ipfs.
//
- match trailer {
- &Trailer::StreamError => true,
+ if trailer == "X-Stream-Error" {
+ true
+ } else {
+ let err = ErrorKind::UnrecognizedTrailerHeader(
+ String::from_utf8_lossy(trailer.as_ref()).into(),
+ );
+
+ // There was an unrecognized trailer value. If that is the case,
+ // create a stream that immediately errors.
+ //
+ return Box::new(stream::once(Err(err.into())));
}
} else {
false
diff --git a/ipfs-api/src/header.rs b/ipfs-api/src/header.rs
index aec3c0a..9943bdb 100644
--- a/ipfs-api/src/header.rs
+++ b/ipfs-api/src/header.rs
@@ -6,76 +6,6 @@
// copied, modified, or distributed except according to those terms.
//
-use hyper;
-use hyper::header::{self, Header, Raw};
-use std::fmt;
+pub use hyper::header::TRAILER;
-/// Header that is returned for streaming calls.
-///
-/// A `Trailer` header indicates that after a streaming call, there will
-/// be some additional information in the response.
-///
-#[derive(Debug, Clone, Copy)]
-pub enum Trailer {
- /// This trailer indicates that an error header will be returned in
- /// the stream if there is an error while streaming.
- ///
- StreamError,
-}
-
-impl Header for Trailer {
- fn header_name() -> &'static str {
- "Trailer"
- }
-
- fn parse_header(raw: &Raw) -> hyper::Result<Trailer> {
- if let Some(bytes) = raw.one() {
- let value = String::from_utf8_lossy(bytes);
-
- match value.as_ref() {
- "X-Stream-Error" => Ok(Trailer::StreamError),
- _ => Err(hyper::Error::Header),
- }
- } else {
- Err(hyper::Error::Header)
- }
- }
-
- fn fmt_header(&self, f: &mut header::Formatter) -> fmt::Result {
- let value = match *self {
- Trailer::StreamError => "X-Stream-Error",
- };
-
- f.fmt_line(&value)
- }
-}
-
-/// This header is included while streaming if an error occured
-/// while streaming the data.
-///
-#[derive(Debug, Clone)]
-pub struct XStreamError {
- pub error: String,
-}
-
-impl Header for XStreamError {
- fn header_name() -> &'static str {
- "X-Stream-Error"
- }
-
- fn parse_header(raw: &Raw) -> hyper::Result<XStreamError> {
- if let Some(bytes) = raw.one() {
- let value = String::from_utf8_lossy(bytes);
-
- Ok(XStreamError {
- error: value.into_owned(),
- })
- } else {
- Err(hyper::Error::Header)
- }
- }
-
- fn fmt_header(&self, f: &mut header::Formatter) -> fmt::Result {
- f.fmt_line(&self.error)
- }
-}
+pub const X_STREAM_ERROR: &str = "x-stream-error";
diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs
index 1e584c3..0ef8fe0 100644
--- a/ipfs-api/src/lib.rs
+++ b/ipfs-api/src/lib.rs
@@ -6,6 +6,8 @@
// copied, modified, or distributed except according to those terms.
//
+#![recursion_limit = "128"]
+
//! Rust library for connecting to the IPFS HTTP API using tokio.
//!
//! ## Usage
@@ -77,6 +79,7 @@ extern crate bytes;
#[macro_use]
extern crate error_chain;
extern crate futures;
+extern crate http;
extern crate hyper;
extern crate hyper_multipart_rfc7578 as hyper_multipart;
extern crate serde;
@@ -85,6 +88,7 @@ extern crate serde_derive;
extern crate serde_json;
extern crate serde_urlencoded;
extern crate tokio;
+extern crate tokio_codec;
extern crate tokio_io;
pub use client::IpfsClient;
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs
index 5a37b08..c0e3dd7 100644
--- a/ipfs-api/src/read.rs
+++ b/ipfs-api/src/read.rs
@@ -8,17 +8,14 @@
use bytes::BytesMut;
use futures::{Async, Stream};
-use header::XStreamError;
+use header::X_STREAM_ERROR;
use hyper::Chunk;
-use hyper::header::{Header, Raw};
use response::{Error, ErrorKind};
use serde::Deserialize;
use serde_json;
-use std::cmp;
-use std::io::{self, Read};
-use std::marker::PhantomData;
+use std::{cmp, io::{self, Read}, marker::PhantomData};
+use tokio_codec::Decoder;
use tokio_io::AsyncRead;
-use tokio_io::codec::Decoder;
/// A decoder for a response where each line is a full json object.
///
@@ -69,17 +66,12 @@ where
Err(e) => {
if self.parse_stream_error {
match slice.iter().position(|&x| x == b':') {
- Some(colon)
- if &slice[..colon] == XStreamError::header_name().as_bytes() =>
- {
- let raw = Raw::from(&slice[colon + 2..]);
-
- match XStreamError::parse_header(&raw) {
- Ok(stream_error) => {
- Err(ErrorKind::StreamError(stream_error.error).into())
- }
- Err(_) => Err(e.into()),
- }
+ Some(colon) if &slice[..colon] == X_STREAM_ERROR.as_bytes() => {
+ let e = ErrorKind::StreamError(
+ String::from_utf8_lossy(&slice[colon + 2..]).into(),
+ );
+
+ Err(e.into())
}
_ => Err(e.into()),
}
diff --git a/ipfs-api/src/request/add.rs b/ipfs-api/src/request/add.rs
index 08ac3b7..be7960d 100644
--- a/ipfs-api/src/request/add.rs
+++ b/ipfs-api/src/request/add.rs
@@ -16,5 +16,5 @@ impl_skip_serialize!(Add);
impl ApiRequest for Add {
const PATH: &'static str = "/add";
- const METHOD: &'static Method = &Method::Post;
+ const METHOD: &'static Method = &Method::POST;
}
diff --git a/ipfs-api/src/request/block.rs b/ipfs-api/src/request/block.rs
index a021f61..24f5d39 100644
--- a/ipfs-api/src/request/block.rs
+++ b/ipfs-api/src/request/block.rs
@@ -26,7 +26,7 @@ impl_skip_serialize!(BlockPut);
impl ApiRequest for BlockPut {
const PATH: &'static str = "/block/put";
- const METHOD: &'static Method = &Method::Post;
+ const METHOD: &'static Method = &Method::POST;
}
#[derive(Serialize)]
diff --git a/ipfs-api/src/request/config.rs b/ipfs-api/src/request/config.rs
index ee6d1c8..e586278 100644
--- a/ipfs-api/src/request/config.rs
+++ b/ipfs-api/src/request/config.rs
@@ -24,7 +24,7 @@ impl_skip_serialize!(ConfigReplace);
impl ApiRequest for ConfigReplace {
const PATH: &'static str = "/config/replace";
- const METHOD: &'static Method = &Method::Post;
+ const METHOD: &'static Method = &Method::POST;
}
pub struct ConfigShow;
diff --git a/ipfs-api/src/request/dag.rs b/ipfs-api/src/request/dag.rs
index c1a6d0c..78705f9 100644
--- a/ipfs-api/src/request/dag.rs
+++ b/ipfs-api/src/request/dag.rs
@@ -26,5 +26,5 @@ impl_skip_serialize!(DagPut);
impl ApiRequest for DagPut {
const PATH: &'static str = "/dag/put";
- const METHOD: &'static Method = &Method::Post;
+ const METHOD: &'static Method = &Method::POST;
}
diff --git a/ipfs-api/src/request/files.rs b/ipfs-api/src/request/files.rs
index 08aa500..179e1a4 100644
--- a/ipfs-api/src/request/files.rs
+++ b/ipfs-api/src/request/files.rs
@@ -112,5 +112,5 @@ pub struct FilesWrite<'a> {
impl<'a> ApiRequest for FilesWrite<'a> {
const PATH: &'static str = "/files/write";
- const METHOD: &'static Method = &Method::Post;
+ const METHOD: &'static Method = &Method::POST;
}
diff --git a/ipfs-api/src/request/mod.rs b/ipfs-api/src/request/mod.rs
index dd86757..d8c1660 100644
--- a/ipfs-api/src/request/mod.rs
+++ b/ipfs-api/src/request/mod.rs
@@ -109,5 +109,5 @@ pub trait ApiRequest {
/// Method used to make the request.
///
- const METHOD: &'static ::hyper::Method = &::hyper::Method::Get;
+ const METHOD: &'static ::hyper::Method = &::hyper::Method::GET;
}
diff --git a/ipfs-api/src/request/tar.rs b/ipfs-api/src/request/tar.rs
index 7d68a5e..26772b9 100644
--- a/ipfs-api/src/request/tar.rs
+++ b/ipfs-api/src/request/tar.rs
@@ -16,7 +16,7 @@ impl_skip_serialize!(TarAdd);
impl ApiRequest for TarAdd {
const PATH: &'static str = "/tar/add";
- const METHOD: &'static Method = &Method::Post;
+ const METHOD: &'static Method = &Method::POST;
}
#[derive(Serialize)]
diff --git a/ipfs-api/src/response/error.rs b/ipfs-api/src/response/error.rs
index 7075def..4cdd1c1 100644
--- a/ipfs-api/src/response/error.rs
+++ b/ipfs-api/src/response/error.rs
@@ -6,6 +6,7 @@
// copied, modified, or distributed except according to those terms.
//
+use http;
use hyper;
use serde_json;
use serde_urlencoded;
@@ -20,10 +21,11 @@ pub struct ApiError {
error_chain! {
foreign_links {
- Http(hyper::error::Error);
+ Client(hyper::Error);
+ Http(http::Error);
Parse(serde_json::Error);
ParseUtf8(FromUtf8Error);
- Url(hyper::error::UriError);
+ Url(http::uri::InvalidUri);
Io(::std::io::Error);
EncodeUrl(serde_urlencoded::ser::Error);
}
@@ -43,6 +45,13 @@ error_chain! {
display("api returned an error while streaming: '{}'", err)
}
+ /// API returned a trailer header with unrecognized value.
+ ///
+ UnrecognizedTrailerHeader(value: String) {
+ description("api returned a trailer header with an unknown value"),
+ display("api returned a trailer header with value: '{}'", value)
+ }
+
Uncategorized(err: String) {
description("api returned an unknown error"),
display("api returned '{}'", err)