From 148b989784b47a4034ff82be1760e73a1429dbeb Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Wed, 27 Jun 2018 00:22:50 -0400 Subject: get ipfs-api compiling --- ipfs-api/Cargo.toml | 3 +- ipfs-api/src/client.rs | 98 +++++++++++++++++++++--------------------- ipfs-api/src/header.rs | 74 +------------------------------ ipfs-api/src/lib.rs | 4 ++ ipfs-api/src/read.rs | 26 ++++------- ipfs-api/src/request/add.rs | 2 +- ipfs-api/src/request/block.rs | 2 +- ipfs-api/src/request/config.rs | 2 +- ipfs-api/src/request/dag.rs | 2 +- ipfs-api/src/request/files.rs | 2 +- ipfs-api/src/request/mod.rs | 2 +- ipfs-api/src/request/tar.rs | 2 +- ipfs-api/src/response/error.rs | 13 +++++- 13 files changed, 85 insertions(+), 147 deletions(-) diff --git a/ipfs-api/Cargo.toml b/ipfs-api/Cargo.toml index ea84310..e869e93 100644 --- a/ipfs-api/Cargo.toml +++ b/ipfs-api/Cargo.toml @@ -6,7 +6,7 @@ documentation = "https://docs.rs/ipfs-api" repository = "https://github.com/ferristseng/rust-ipfs-api" keywords = ["ipfs"] categories = ["filesystem", "web-programming"] -version = "0.4.0-alpha.3" +version = "0.5.0-alpha1" readme = "../README.md" license = "MIT OR Apache-2.0" @@ -17,6 +17,7 @@ travis-ci = { repository = "ferristseng/rust-ipfs-api" } bytes = "0.4" error-chain = "0.12" futures = "0.1" +http = "0.1" hyper = "0.12" hyper-multipart-rfc7578 = "0.2.0-alpha1" serde = "1.0" diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index da68919..30a4461 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -6,20 +6,18 @@ // copied, modified, or distributed except according to those terms. // -use futures::future::{Future, IntoFuture}; -use futures::stream::{self, Stream}; -use header::Trailer; +use futures::{Future, IntoFuture, stream::{self, Stream}}; +use header::TRAILER; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; use response::{self, Error, ErrorKind}; -use hyper::{self, Chunk, Request, Response, StatusCode, Uri}; -use hyper::client::{Client, Config, HttpConnector}; +use http::uri::InvalidUri; +use hyper::{self, Chunk, Request, Response, StatusCode, Uri, client::{Client, HttpConnector}}; use hyper_multipart::client::multipart; use serde::{Deserialize, Serialize}; use serde_json; use std::io::Read; -use tokio_core::reactor::Handle; -use tokio_io::codec::{Decoder, FramedRead}; +use tokio_codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// @@ -33,38 +31,33 @@ type AsyncStreamResponse = Box>; /// pub struct IpfsClient { base: Uri, - client: Client, + client: Client, +} + +impl Default for IpfsClient { + /// Creates an `IpfsClient` connected to `localhost:5001`. + /// + fn default() -> IpfsClient { + IpfsClient::new("localhost", 5001).unwrap() + } } impl IpfsClient { /// Creates a new `IpfsClient`. /// #[inline] - pub fn new( - handle: &Handle, - host: &str, - port: u16, - ) -> Result { + pub fn new(host: &str, port: u16) -> Result { let base_path = IpfsClient::build_base_path(host, port)?; Ok(IpfsClient { base: base_path, - client: Config::default() - .body::() - .keep_alive(true) - .build(handle), + client: Client::builder().keep_alive(true).build_http(), }) } - /// Creates an `IpfsClient` connected to `localhost:5001`. - /// - pub fn default(handle: &Handle) -> IpfsClient { - IpfsClient::new(handle, "localhost", 5001).unwrap() - } - /// Builds the base url path for the Ipfs api. /// - fn build_base_path(host: &str, port: u16) -> Result { + fn build_base_path(host: &str, port: u16) -> Result { format!("http://{}:{}/api/v0", host, port).parse() } @@ -74,7 +67,7 @@ impl IpfsClient { &self, req: &Req, form: Option, - ) -> Result, Error> + ) -> Result, Error> where Req: ApiRequest + Serialize, { @@ -85,17 +78,18 @@ impl IpfsClient { ::serde_urlencoded::to_string(req)? ); - url.parse::() - .map(move |url| { - let mut req = Request::new(Req::METHOD.clone(), url); + url.parse::().map_err(From::from).and_then(move |url| { + let mut builder = Request::builder(); + let mut builder = builder.method(Req::METHOD.clone()).uri(url); - if let Some(form) = form { - form.set_body(&mut req); - } + let req = if let Some(form) = form { + form.set_body(&mut builder) + } else { + builder.body(hyper::Body::empty()) + }; - req - }) - .map_err(From::from) + req.map_err(From::from) + }) } /// Builds an Api error from a response body. @@ -119,7 +113,7 @@ impl IpfsClient { for<'de> Res: 'static + Deserialize<'de>, { match status { - StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from), + StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from), _ => Err(Self::build_error_from_body(chunk)), } } @@ -128,14 +122,14 @@ impl IpfsClient { /// results. /// fn process_stream_response( - res: Response, + res: Response, decoder: D, ) -> Box> where D: 'static + Decoder, Res: 'static, { - let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder); + let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder); Box::new(stream) } @@ -157,7 +151,7 @@ impl IpfsClient { .and_then(|res| { let status = res.status(); - res.body().concat2().map(move |chunk| (status, chunk)) + res.into_body().concat2().map(move |chunk| (status, chunk)) }) .from_err(); @@ -179,7 +173,7 @@ impl IpfsClient { where Req: ApiRequest + Serialize, Res: 'static, - F: 'static + Fn(hyper::Response) -> AsyncStreamResponse, + F: 'static + Fn(hyper::Response) -> AsyncStreamResponse, { match self.build_base_request(req, form) { Ok(req) => { @@ -188,14 +182,13 @@ impl IpfsClient { .from_err() .map(move |res| { let stream: Box> = match res.status() { - StatusCode::Ok => process(res), - + StatusCode::OK => process(res), // If the server responded with an error status code, the body // still needs to be read so an error can be built. This block will // read the entire body stream, then immediately return an error. // _ => Box::new( - res.body() + res.into_body() .concat2() .from_err() .and_then(|chunk| Err(Self::build_error_from_body(chunk))) @@ -236,7 +229,7 @@ impl IpfsClient { { let res = self.request_raw(req, form) .and_then(|(status, chunk)| match status { - StatusCode::Ok => Ok(()), + StatusCode::OK => Ok(()), _ => Err(Self::build_error_from_body(chunk)), }); @@ -252,7 +245,7 @@ impl IpfsClient { { let res = self.request_raw(req, form) .and_then(|(status, chunk)| match status { - StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from), + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), _ => Err(Self::build_error_from_body(chunk)), }); @@ -270,7 +263,7 @@ impl IpfsClient { where Req: ApiRequest + Serialize, { - self.request_stream(req, form, |res| Box::new(res.body().from_err())) + self.request_stream(req, form, |res| Box::new(res.into_body().from_err())) } /// Generic method to return a streaming response of deserialized json @@ -286,13 +279,22 @@ impl IpfsClient { for<'de> Res: 'static + Deserialize<'de>, { self.request_stream(req, form, |res| { - let parse_stream_error = if let Some(trailer) = res.headers().get() { + let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { // Response has the Trailer header set. The StreamError trailer // is used to indicate that there was an error while streaming // data with Ipfs. // - match trailer { - &Trailer::StreamError => true, + if trailer == "X-Stream-Error" { + true + } else { + let err = ErrorKind::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return Box::new(stream::once(Err(err.into()))); } } else { false diff --git a/ipfs-api/src/header.rs b/ipfs-api/src/header.rs index aec3c0a..9943bdb 100644 --- a/ipfs-api/src/header.rs +++ b/ipfs-api/src/header.rs @@ -6,76 +6,6 @@ // copied, modified, or distributed except according to those terms. // -use hyper; -use hyper::header::{self, Header, Raw}; -use std::fmt; +pub use hyper::header::TRAILER; -/// Header that is returned for streaming calls. -/// -/// A `Trailer` header indicates that after a streaming call, there will -/// be some additional information in the response. -/// -#[derive(Debug, Clone, Copy)] -pub enum Trailer { - /// This trailer indicates that an error header will be returned in - /// the stream if there is an error while streaming. - /// - StreamError, -} - -impl Header for Trailer { - fn header_name() -> &'static str { - "Trailer" - } - - fn parse_header(raw: &Raw) -> hyper::Result { - if let Some(bytes) = raw.one() { - let value = String::from_utf8_lossy(bytes); - - match value.as_ref() { - "X-Stream-Error" => Ok(Trailer::StreamError), - _ => Err(hyper::Error::Header), - } - } else { - Err(hyper::Error::Header) - } - } - - fn fmt_header(&self, f: &mut header::Formatter) -> fmt::Result { - let value = match *self { - Trailer::StreamError => "X-Stream-Error", - }; - - f.fmt_line(&value) - } -} - -/// This header is included while streaming if an error occured -/// while streaming the data. -/// -#[derive(Debug, Clone)] -pub struct XStreamError { - pub error: String, -} - -impl Header for XStreamError { - fn header_name() -> &'static str { - "X-Stream-Error" - } - - fn parse_header(raw: &Raw) -> hyper::Result { - if let Some(bytes) = raw.one() { - let value = String::from_utf8_lossy(bytes); - - Ok(XStreamError { - error: value.into_owned(), - }) - } else { - Err(hyper::Error::Header) - } - } - - fn fmt_header(&self, f: &mut header::Formatter) -> fmt::Result { - f.fmt_line(&self.error) - } -} +pub const X_STREAM_ERROR: &str = "x-stream-error"; diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs index 1e584c3..0ef8fe0 100644 --- a/ipfs-api/src/lib.rs +++ b/ipfs-api/src/lib.rs @@ -6,6 +6,8 @@ // copied, modified, or distributed except according to those terms. // +#![recursion_limit = "128"] + //! Rust library for connecting to the IPFS HTTP API using tokio. //! //! ## Usage @@ -77,6 +79,7 @@ extern crate bytes; #[macro_use] extern crate error_chain; extern crate futures; +extern crate http; extern crate hyper; extern crate hyper_multipart_rfc7578 as hyper_multipart; extern crate serde; @@ -85,6 +88,7 @@ extern crate serde_derive; extern crate serde_json; extern crate serde_urlencoded; extern crate tokio; +extern crate tokio_codec; extern crate tokio_io; pub use client::IpfsClient; diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs index 5a37b08..c0e3dd7 100644 --- a/ipfs-api/src/read.rs +++ b/ipfs-api/src/read.rs @@ -8,17 +8,14 @@ use bytes::BytesMut; use futures::{Async, Stream}; -use header::XStreamError; +use header::X_STREAM_ERROR; use hyper::Chunk; -use hyper::header::{Header, Raw}; use response::{Error, ErrorKind}; use serde::Deserialize; use serde_json; -use std::cmp; -use std::io::{self, Read}; -use std::marker::PhantomData; +use std::{cmp, io::{self, Read}, marker::PhantomData}; +use tokio_codec::Decoder; use tokio_io::AsyncRead; -use tokio_io::codec::Decoder; /// A decoder for a response where each line is a full json object. /// @@ -69,17 +66,12 @@ where Err(e) => { if self.parse_stream_error { match slice.iter().position(|&x| x == b':') { - Some(colon) - if &slice[..colon] == XStreamError::header_name().as_bytes() => - { - let raw = Raw::from(&slice[colon + 2..]); - - match XStreamError::parse_header(&raw) { - Ok(stream_error) => { - Err(ErrorKind::StreamError(stream_error.error).into()) - } - Err(_) => Err(e.into()), - } + Some(colon) if &slice[..colon] == X_STREAM_ERROR.as_bytes() => { + let e = ErrorKind::StreamError( + String::from_utf8_lossy(&slice[colon + 2..]).into(), + ); + + Err(e.into()) } _ => Err(e.into()), } diff --git a/ipfs-api/src/request/add.rs b/ipfs-api/src/request/add.rs index 08ac3b7..be7960d 100644 --- a/ipfs-api/src/request/add.rs +++ b/ipfs-api/src/request/add.rs @@ -16,5 +16,5 @@ impl_skip_serialize!(Add); impl ApiRequest for Add { const PATH: &'static str = "/add"; - const METHOD: &'static Method = &Method::Post; + const METHOD: &'static Method = &Method::POST; } diff --git a/ipfs-api/src/request/block.rs b/ipfs-api/src/request/block.rs index a021f61..24f5d39 100644 --- a/ipfs-api/src/request/block.rs +++ b/ipfs-api/src/request/block.rs @@ -26,7 +26,7 @@ impl_skip_serialize!(BlockPut); impl ApiRequest for BlockPut { const PATH: &'static str = "/block/put"; - const METHOD: &'static Method = &Method::Post; + const METHOD: &'static Method = &Method::POST; } #[derive(Serialize)] diff --git a/ipfs-api/src/request/config.rs b/ipfs-api/src/request/config.rs index ee6d1c8..e586278 100644 --- a/ipfs-api/src/request/config.rs +++ b/ipfs-api/src/request/config.rs @@ -24,7 +24,7 @@ impl_skip_serialize!(ConfigReplace); impl ApiRequest for ConfigReplace { const PATH: &'static str = "/config/replace"; - const METHOD: &'static Method = &Method::Post; + const METHOD: &'static Method = &Method::POST; } pub struct ConfigShow; diff --git a/ipfs-api/src/request/dag.rs b/ipfs-api/src/request/dag.rs index c1a6d0c..78705f9 100644 --- a/ipfs-api/src/request/dag.rs +++ b/ipfs-api/src/request/dag.rs @@ -26,5 +26,5 @@ impl_skip_serialize!(DagPut); impl ApiRequest for DagPut { const PATH: &'static str = "/dag/put"; - const METHOD: &'static Method = &Method::Post; + const METHOD: &'static Method = &Method::POST; } diff --git a/ipfs-api/src/request/files.rs b/ipfs-api/src/request/files.rs index 08aa500..179e1a4 100644 --- a/ipfs-api/src/request/files.rs +++ b/ipfs-api/src/request/files.rs @@ -112,5 +112,5 @@ pub struct FilesWrite<'a> { impl<'a> ApiRequest for FilesWrite<'a> { const PATH: &'static str = "/files/write"; - const METHOD: &'static Method = &Method::Post; + const METHOD: &'static Method = &Method::POST; } diff --git a/ipfs-api/src/request/mod.rs b/ipfs-api/src/request/mod.rs index dd86757..d8c1660 100644 --- a/ipfs-api/src/request/mod.rs +++ b/ipfs-api/src/request/mod.rs @@ -109,5 +109,5 @@ pub trait ApiRequest { /// Method used to make the request. /// - const METHOD: &'static ::hyper::Method = &::hyper::Method::Get; + const METHOD: &'static ::hyper::Method = &::hyper::Method::GET; } diff --git a/ipfs-api/src/request/tar.rs b/ipfs-api/src/request/tar.rs index 7d68a5e..26772b9 100644 --- a/ipfs-api/src/request/tar.rs +++ b/ipfs-api/src/request/tar.rs @@ -16,7 +16,7 @@ impl_skip_serialize!(TarAdd); impl ApiRequest for TarAdd { const PATH: &'static str = "/tar/add"; - const METHOD: &'static Method = &Method::Post; + const METHOD: &'static Method = &Method::POST; } #[derive(Serialize)] diff --git a/ipfs-api/src/response/error.rs b/ipfs-api/src/response/error.rs index 7075def..4cdd1c1 100644 --- a/ipfs-api/src/response/error.rs +++ b/ipfs-api/src/response/error.rs @@ -6,6 +6,7 @@ // copied, modified, or distributed except according to those terms. // +use http; use hyper; use serde_json; use serde_urlencoded; @@ -20,10 +21,11 @@ pub struct ApiError { error_chain! { foreign_links { - Http(hyper::error::Error); + Client(hyper::Error); + Http(http::Error); Parse(serde_json::Error); ParseUtf8(FromUtf8Error); - Url(hyper::error::UriError); + Url(http::uri::InvalidUri); Io(::std::io::Error); EncodeUrl(serde_urlencoded::ser::Error); } @@ -43,6 +45,13 @@ error_chain! { display("api returned an error while streaming: '{}'", err) } + /// API returned a trailer header with unrecognized value. + /// + UnrecognizedTrailerHeader(value: String) { + description("api returned a trailer header with an unknown value"), + display("api returned a trailer header with value: '{}'", value) + } + Uncategorized(err: String) { description("api returned an unknown error"), display("api returned '{}'", err) -- cgit v1.2.3