From 4bdebbf4d1d1edb839eba860b013e3fdb870f66e Mon Sep 17 00:00:00 2001 From: Sameer Puri Date: Sat, 9 Feb 2019 14:34:00 -0600 Subject: Add actix feature for using actix-web --- .travis.yml | 7 ++ ipfs-api/Cargo.toml | 10 +- ipfs-api/examples/add_file.rs | 11 ++ ipfs-api/examples/add_tar.rs | 13 ++- ipfs-api/examples/bootstrap_default.rs | 19 ++- ipfs-api/examples/dns.rs | 11 ++ ipfs-api/examples/get_commands.rs | 11 ++ ipfs-api/examples/get_stats.rs | 23 +++- ipfs-api/examples/get_swarm.rs | 21 +++- ipfs-api/examples/get_version.rs | 15 ++- ipfs-api/examples/mfs.rs | 19 ++- ipfs-api/examples/ping_peer.rs | 11 ++ ipfs-api/examples/pubsub.rs | 34 ++++-- ipfs-api/examples/replace_config.rs | 11 ++ ipfs-api/examples/resolve_name.rs | 21 +++- ipfs-api/src/client.rs | 168 +++++++++++++++++++++------ ipfs-api/src/header.rs | 2 +- ipfs-api/src/lib.rs | 206 ++++++++++++++++++++++----------- ipfs-api/src/read.rs | 11 +- ipfs-api/src/request/add.rs | 2 +- ipfs-api/src/request/block.rs | 2 +- ipfs-api/src/request/config.rs | 2 +- ipfs-api/src/request/dag.rs | 2 +- ipfs-api/src/request/files.rs | 2 +- ipfs-api/src/request/mod.rs | 2 +- ipfs-api/src/request/tar.rs | 2 +- ipfs-api/src/response/error.rs | 41 ++++++- 27 files changed, 524 insertions(+), 155 deletions(-) diff --git a/.travis.yml b/.travis.yml index 814e54f..168d15b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,3 +5,10 @@ rust: - nightly cache: cargo: true +script: + - cargo build --verbose + - cargo test --verbose + - cd ipfs-api + - cargo build --verbose --features actix --no-default-features + - cargo test --verbose --features actix --no-default-features + - cd .. \ No newline at end of file diff --git a/ipfs-api/Cargo.toml b/ipfs-api/Cargo.toml index f3fb604..39cad56 100644 --- a/ipfs-api/Cargo.toml +++ b/ipfs-api/Cargo.toml @@ -13,13 +13,19 @@ license = "MIT OR Apache-2.0" [badges] travis-ci = { repository = "ferristseng/rust-ipfs-api" } +[features] +default = ["hyper", "hyper-multipart-rfc7578"] +actix = ["actix-web", "actix-multipart-rfc7578"] + [dependencies] +actix-multipart-rfc7578 = { version = "0.1", optional = true } +actix-web = { version = "0.7", optional = true } bytes = "0.4" failure = "0.1.2" futures = "0.1" http = "0.1" -hyper = "0.12" -hyper-multipart-rfc7578 = "0.3.0" +hyper = { version = "0.12", optional = true } +hyper-multipart-rfc7578 = { version = "0.3", optional = true } serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/ipfs-api/examples/add_file.rs b/ipfs-api/examples/add_file.rs index d771966..3d9be49 100644 --- a/ipfs-api/examples/add_file.rs +++ b/ipfs-api/examples/add_file.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -27,5 +30,13 @@ fn main() { .map(|add| println!("added file: {:?}", add)) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/add_tar.rs b/ipfs-api/examples/add_tar.rs index 1880712..5d75e28 100644 --- a/ipfs-api/examples/add_tar.rs +++ b/ipfs-api/examples/add_tar.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; extern crate tar; @@ -52,5 +55,13 @@ fn main() { }) .map_err(|e| eprintln!("{}", e)); - hyper::rt::run(req) + #[cfg(feature = "hyper")] + hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/bootstrap_default.rs b/ipfs-api/examples/bootstrap_default.rs index 6ba9575..bc36952 100644 --- a/ipfs-api/examples/bootstrap_default.rs +++ b/ipfs-api/examples/bootstrap_default.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -42,8 +45,7 @@ fn main() { } }); - hyper::rt::run( - bootstrap + let fut = bootstrap .and_then(|_| { println!(); println!("dropping all bootstrap peers..."); @@ -56,6 +58,15 @@ fn main() { add }) - .map_err(|e| eprintln!("{}", e)), - ); + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/dns.rs b/ipfs-api/examples/dns.rs index 78b4086..6b3823d 100644 --- a/ipfs-api/examples/dns.rs +++ b/ipfs-api/examples/dns.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -38,5 +41,13 @@ fn main() { }) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_commands.rs b/ipfs-api/examples/get_commands.rs index b12ffa1..f99167d 100644 --- a/ipfs-api/examples/get_commands.rs +++ b/ipfs-api/examples/get_commands.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -46,5 +49,13 @@ fn main() { .map(|commands| print_recursive(0, &commands)) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_stats.rs b/ipfs-api/examples/get_stats.rs index 41f2a5c..9107aa3 100644 --- a/ipfs-api/examples/get_stats.rs +++ b/ipfs-api/examples/get_stats.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -54,10 +57,18 @@ fn main() { println!(" version : {}", repo_stats.version); }); - hyper::rt::run( - bitswap_stats - .and_then(|_| bw_stats) - .and_then(|_| repo_stats) - .map_err(|e| eprintln!("{}", e)), - ); + let fut = bitswap_stats + .and_then(|_| bw_stats) + .and_then(|_| repo_stats) + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_swarm.rs b/ipfs-api/examples/get_swarm.rs index 2b9fd73..2b2c238 100644 --- a/ipfs-api/examples/get_swarm.rs +++ b/ipfs-api/examples/get_swarm.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -43,9 +46,17 @@ fn main() { } }); - hyper::rt::run( - local - .and_then(|_| connected) - .map_err(|e| eprintln!("{}", e)), - ); + let fut = local + .and_then(|_| connected) + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_version.rs b/ipfs-api/examples/get_version.rs index 41a4ee5..6fb9c7e 100644 --- a/ipfs-api/examples/get_version.rs +++ b/ipfs-api/examples/get_version.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -23,5 +26,15 @@ fn main() { .version() .map(|version| println!("version: {:?}", version.version)); - hyper::rt::run(req.map_err(|e| eprintln!("{}", e))); + let fut = req.map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/mfs.rs b/ipfs-api/examples/mfs.rs index 8bea192..97d1824 100644 --- a/ipfs-api/examples/mfs.rs +++ b/ipfs-api/examples/mfs.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -46,7 +49,7 @@ fn main() { let file_rm = client.files_rm("/test", true); - hyper::rt::run( + let fut = mkdir .and_then(|_| { println!("making dirs /test/does/not/exist/yet..."); @@ -78,6 +81,16 @@ fn main() { file_rm }) .map(|_| println!("done!")) - .map_err(|e| eprintln!("{}", e)), - ) + .map_err(|e| eprintln!("{}", e)) + ; + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/ping_peer.rs b/ipfs-api/examples/ping_peer.rs index 1d32210..c60e895 100644 --- a/ipfs-api/examples/ping_peer.rs +++ b/ipfs-api/examples/ping_peer.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -59,5 +62,13 @@ fn main() { }) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs index 10ee0c5..39fb25a 100644 --- a/ipfs-api/examples/pubsub.rs +++ b/ipfs-api/examples/pubsub.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; extern crate tokio_timer; @@ -49,7 +52,11 @@ fn main() { println!(); println!("starting task to publish messages to ({})...", TOPIC); + + #[cfg(feature = "hyper")] hyper::rt::run(publish); + #[cfg(feature = "actix")] + actix_web::actix::spawn(publish); }); // This block will execute a future that suscribes to a topic, @@ -61,15 +68,24 @@ fn main() { println!(); println!("waiting for messages on ({})...", TOPIC); - hyper::rt::run( - req.take(5) - .for_each(|msg| { - println!(); - println!("received ({:?})", msg); + let fut = req + .take(5) + .for_each(|msg| { + println!(); + println!("received ({:?})", msg); + + Ok(()) + }) + .map_err(|e| eprintln!("{}", e)); - Ok(()) - }) - .map_err(|e| eprintln!("{}", e)), - ) + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } } diff --git a/ipfs-api/examples/replace_config.rs b/ipfs-api/examples/replace_config.rs index f354386..9c1ec61 100644 --- a/ipfs-api/examples/replace_config.rs +++ b/ipfs-api/examples/replace_config.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -27,5 +30,13 @@ fn main() { .map(|_| println!("replaced file")) .map_err(|e| println!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/resolve_name.rs b/ipfs-api/examples/resolve_name.rs index 4bcb648..6d400dc 100644 --- a/ipfs-api/examples/resolve_name.rs +++ b/ipfs-api/examples/resolve_name.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -40,9 +43,17 @@ fn main() { }) }); - hyper::rt::run( - name_resolve - .and_then(|_| name_publish) - .map_err(|e| eprintln!("{}", e)), - ); + let fut = name_resolve + .and_then(|_| name_publish) + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 0c3277b..5e1e9f5 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -5,19 +5,22 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // - +#[cfg(feature = "actix")] +use actix_multipart::client::multipart; +#[cfg(feature = "actix")] +use actix_web::HttpMessage; +use bytes::Bytes; use futures::{ future, stream::{self, Stream}, Future, IntoFuture, }; use header::TRAILER; -use http::uri::InvalidUri; -use hyper::{ - self, - client::{Client, HttpConnector}, - Chunk, Request, Response, StatusCode, Uri, -}; +use http::uri::{InvalidUri, Uri}; +use http::StatusCode; +#[cfg(feature = "hyper")] +use hyper::client::{Client, HttpConnector}; +#[cfg(feature = "hyper")] use hyper_multipart::client::multipart; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; @@ -31,17 +34,34 @@ use tokio_codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// +#[cfg(feature = "actix")] +type AsyncResponse = Box + 'static>; +#[cfg(feature = "hyper")] type AsyncResponse = Box + Send + 'static>; /// A future that returns a stream of responses. /// +#[cfg(feature = "actix")] +type AsyncStreamResponse = Box + 'static>; +#[cfg(feature = "hyper")] type AsyncStreamResponse = Box + Send + 'static>; +#[cfg(feature = "actix")] +type Request = actix_web::client::ClientRequest; +#[cfg(feature = "hyper")] +type Request = http::Request; + +#[cfg(feature = "actix")] +type Response = actix_web::client::ClientResponse; +#[cfg(feature = "hyper")] +type Response = http::Response; + /// Asynchronous Ipfs client. /// #[derive(Clone)] pub struct IpfsClient { base: Uri, + #[cfg(feature = "hyper")] client: Client, } @@ -101,6 +121,7 @@ impl IpfsClient { Ok(IpfsClient { base: base_path, + #[cfg(feature = "hyper")] client: Client::builder().keep_alive(false).build_http(), }) } @@ -117,7 +138,7 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> Result, Error> + ) -> Result where Req: ApiRequest + Serialize, { @@ -127,9 +148,9 @@ impl IpfsClient { Req::PATH, ::serde_urlencoded::to_string(req)? ); - - url.parse::().map_err(From::from).and_then(move |url| { - let mut builder = Request::builder(); + #[cfg(feature = "hyper")] + let req = url.parse::().map_err(From::from).and_then(move |url| { + let mut builder = http::Request::builder(); let mut builder = builder.method(Req::METHOD.clone()).uri(url); let req = if let Some(form) = form { @@ -139,13 +160,29 @@ impl IpfsClient { }; req.map_err(From::from) - }) + }); + #[cfg(feature = "actix")] + let req = if let Some(form) = form { + Request::build() + .method(Req::METHOD.clone()) + .uri(url) + .content_type(form.content_type()) + .streaming(multipart::Body::from(form)) + .map_err(From::from) + } else { + Request::build() + .method(Req::METHOD.clone()) + .uri(url) + .finish() + .map_err(From::from) + }; + req } /// Builds an Api error from a response body. /// #[inline] - fn build_error_from_body(chunk: Chunk) -> Error { + fn build_error_from_body(chunk: Bytes) -> Error { match serde_json::from_slice(&chunk) { Ok(e) => Error::Api(e), Err(_) => match String::from_utf8(chunk.to_vec()) { @@ -158,7 +195,7 @@ impl IpfsClient { /// Processes a response that expects a json encoded body, returning an /// error or a deserialized json response. /// - fn process_json_response(status: StatusCode, chunk: Chunk) -> Result + fn process_json_response(status: StatusCode, chunk: Bytes) -> Result where for<'de> Res: 'static + Deserialize<'de>, { @@ -171,15 +208,19 @@ impl IpfsClient { /// Processes a response that returns a stream of json deserializable /// results. /// - fn process_stream_response( - res: Response, - decoder: D, - ) -> AsyncStreamResponse + fn process_stream_response(res: Response, decoder: D) -> AsyncStreamResponse where D: 'static + Decoder + Send, Res: 'static, { - let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder); + #[cfg(feature = "hyper")] + let stream = FramedRead::new( + StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()), + decoder, + ); + + #[cfg(feature = "actix")] + let stream = FramedRead::new(StreamReader::new(res.payload().from_err()), decoder); Box::new(stream) } @@ -190,22 +231,33 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncResponse<(StatusCode, Chunk)> + ) -> AsyncResponse<(StatusCode, Bytes)> where Req: ApiRequest + Serialize, { match self.build_base_request(req, form) { Ok(req) => { + #[cfg(feature = "hyper")] let res = self .client .request(req) .and_then(|res| { let status = res.status(); - res.into_body().concat2().map(move |chunk| (status, chunk)) + res.into_body() + .concat2() + .map(move |chunk| (status, chunk.into_bytes())) }) .from_err(); - + #[cfg(feature = "actix")] + let res = req + .send() + .timeout(std::time::Duration::from_secs(90)) + .from_err() + .and_then(|x| { + let status = x.status(); + x.body().map(move |body| (status, body)).from_err() + }); Box::new(res) } Err(e) => Box::new(Err(e).into_future()), @@ -224,8 +276,9 @@ impl IpfsClient { where Req: ApiRequest + Serialize, Res: 'static + Send, - F: 'static + Fn(hyper::Response) -> AsyncStreamResponse + Send, + F: 'static + Fn(Response) -> AsyncStreamResponse + Send, { + #[cfg(feature = "hyper")] match self.build_base_request(req, form) { Ok(req) => { let res = self @@ -244,7 +297,9 @@ impl IpfsClient { res.into_body() .concat2() .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) + .and_then(|chunk| { + Err(Self::build_error_from_body(chunk.into_bytes())) + }) .into_stream(), ), }; @@ -252,17 +307,31 @@ impl IpfsClient { stream }) .flatten_stream(); - Box::new(res) } Err(e) => Box::new(stream::once(Err(e))), } + #[cfg(feature = "actix")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = req + .send() + .timeout(std::time::Duration::from_secs(90)) + .from_err(); + Box::new(res.map(process).flatten_stream()) + } + Err(e) => Box::new(stream::once(Err(e))), + } } /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request(&self, req: &Req, form: Option>) -> AsyncResponse + fn request( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse where Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de> + Send, @@ -277,7 +346,11 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// - fn request_empty(&self, req: &Req, form: Option>) -> AsyncResponse<()> + fn request_empty( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse<()> where Req: ApiRequest + Serialize, { @@ -294,7 +367,11 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// - fn request_string(&self, req: &Req, form: Option>) -> AsyncResponse + fn request_string( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse where Req: ApiRequest + Serialize, { @@ -315,11 +392,17 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncStreamResponse + ) -> AsyncStreamResponse where Req: ApiRequest + Serialize, { - self.request_stream(req, form, |res| Box::new(res.into_body().from_err())) + #[cfg(feature = "hyper")] + let res = self.request_stream(req, form, |res| { + Box::new(res.into_body().from_err().map(|c| c.into_bytes())) + }); + #[cfg(feature = "actix")] + let res = self.request_stream(req, form, |res| Box::new(res.payload().from_err())); + res } /// Generic method to return a streaming response of deserialized json @@ -595,7 +678,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn block_get(&self, hash: &str) -> AsyncStreamResponse { + pub fn block_get(&self, hash: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::BlockGet { hash }, None) } @@ -747,7 +830,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn cat(&self, path: &str) -> AsyncStreamResponse { + pub fn cat(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::Cat { path }, None) } @@ -1204,7 +1287,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn files_read(&self, path: &str) -> AsyncStreamResponse { + pub fn files_read(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::FilesRead { path }, None) } @@ -1363,7 +1446,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn get(&self, path: &str) -> AsyncStreamResponse { + pub fn get(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::Get { path }, None) } @@ -1528,6 +1611,7 @@ impl IpfsClient { /// ``` /// pub fn log_tail(&self) -> AsyncStreamResponse { + #[cfg(feature = "hyper")] let res = self .build_base_request(&request::LogTail, None) .map(|req| self.client.request(req).from_err()) @@ -1535,7 +1619,17 @@ impl IpfsClient { .flatten() .map(|res| IpfsClient::process_stream_response(res, LineDecoder)) .flatten_stream(); - + #[cfg(feature = "actix")] + let res = self + .build_base_request(&request::LogTail, None) + .into_future() + .and_then(|req| { + req.send() + .timeout(std::time::Duration::from_secs(90)) + .from_err() + }) + .map(|res| IpfsClient::process_stream_response(res, LineDecoder)) + .flatten_stream(); Box::new(res) } @@ -1644,7 +1738,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn object_data(&self, key: &str) -> AsyncStreamResponse { + pub fn object_data(&self, key: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::ObjectData { key }, None) } @@ -2155,7 +2249,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse { + pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::TarCat { path }, None) } diff --git a/ipfs-api/src/header.rs b/ipfs-api/src/header.rs index 9943bdb..009d178 100644 --- a/ipfs-api/src/header.rs +++ b/ipfs-api/src/header.rs @@ -6,6 +6,6 @@ // copied, modified, or distributed except according to those terms. // -pub use hyper::header::TRAILER; +pub use http::header::TRAILER; pub const X_STREAM_ERROR: &str = "x-stream-error"; diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs index 503e482..64ea2f4 100644 --- a/ipfs-api/src/lib.rs +++ b/ipfs-api/src/lib.rs @@ -17,78 +17,152 @@ //! ipfs-api = "0.5.0-alpha2" //! ``` //! -//! ## Examples -//! -//! Write a file to IPFS: -//! -//! ```no_run -//! # extern crate hyper; -//! # extern crate ipfs_api; -//! # -//! use hyper::rt::Future; -//! use ipfs_api::IpfsClient; -//! use std::io::Cursor; -//! -//! # fn main() { -//! let client = IpfsClient::default(); -//! let data = Cursor::new("Hello World!"); -//! -//! let req = client -//! .add(data) -//! .map(|res| { -//! println!("{}", res.hash); -//! }) -//! .map_err(|e| eprintln!("{}", e)); -//! -//! hyper::rt::run(req); -//! # } -//! ``` -//! -//! Read a file from IPFS: -//! -//! ```no_run -//! # extern crate futures; -//! # extern crate hyper; -//! # extern crate ipfs_api; -//! # -//! use futures::{Future, Stream}; -//! use ipfs_api::IpfsClient; -//! use std::io::{self, Write}; -//! -//! # fn main() { -//! let client = IpfsClient::default(); -//! -//! let req = client -//! .get("/test/file.json") -//! .concat2() -//! .map(|res| { -//! let out = io::stdout(); -//! let mut out = out.lock(); -//! -//! out.write_all(&res).unwrap(); -//! }) -//! .map_err(|e| eprintln!("{}", e)); -//! -//! hyper::rt::run(req); -//! # } -//! ``` -//! -//! There are also a bunch of examples included in the project, which -//! I used for testing -//! -//! You can run any of the examples with cargo: -//! -//! ```sh -//! $ cargo run -p ipfs-api --example add_file -//! ``` + +#[cfg(feature = "actix")] +extern crate actix_multipart_rfc7578 as actix_multipart; + +/// ## Examples +/// +/// Write a file to IPFS: +/// +/// ```no_run +/// # extern crate actix_web; +/// # extern crate futures; +/// # extern crate ipfs_api; +/// # +/// use futures::future::Future; +/// use ipfs_api::IpfsClient; +/// use std::io::Cursor; +/// +/// # fn main() { +/// let client = IpfsClient::default(); +/// let data = Cursor::new("Hello World!"); +/// +/// let req = client +/// .add(data) +/// .map(|res| { +/// println!("{}", res.hash); +/// }) +/// .map_err(|e| eprintln!("{}", e)); +/// +/// tokio::runtime::current_thread::run(req); +/// # } +/// ``` +/// +/// Read a file from IPFS: +/// +/// ```no_run +/// # extern crate futures; +/// # extern crate actix_web; +/// # extern crate ipfs_api; +/// # +/// use futures::{Future, Stream}; +/// use ipfs_api::IpfsClient; +/// use std::io::{self, Write}; +/// +/// # fn main() { +/// let client = IpfsClient::default(); +/// +/// let req = client +/// .get("/test/file.json") +/// .concat2() +/// .map(|res| { +/// let out = io::stdout(); +/// let mut out = out.lock(); +/// +/// out.write_all(&res).unwrap(); +/// }) +/// .map_err(|e| eprintln!("{}", e)); +/// +/// tokio::runtime::current_thread::run(req); +/// # } +/// ``` +/// +/// There are also a bunch of examples included in the project, which +/// I used for testing +/// +/// You can run any of the examples with cargo: +/// +/// ```sh +/// $ cargo run -p ipfs-api --example add_file +/// ``` +#[cfg(feature = "actix")] +extern crate actix_web; + +/// ## Examples +/// +/// Write a file to IPFS: +/// +/// ```no_run +/// # extern crate hyper; +/// # extern crate ipfs_api; +/// # +/// use hyper::rt::Future; +/// use ipfs_api::IpfsClient; +/// use std::io::Cursor; +/// +/// # fn main() { +/// let client = IpfsClient::default(); +/// let data = Cursor::new("Hello World!"); +/// +/// let req = client +/// .add(data) +/// .map(|res| { +/// println!("{}", res.hash); +/// }) +/// .map_err(|e| eprintln!("{}", e)); +/// +/// hyper::rt::run(req); +/// # } +/// ``` +/// +/// Read a file from IPFS: +/// +/// ```no_run +/// # extern crate futures; +/// # extern crate hyper; +/// # extern crate ipfs_api; +/// # +/// use futures::{Future, Stream}; +/// use ipfs_api::IpfsClient; +/// use std::io::{self, Write}; +/// +/// # fn main() { +/// let client = IpfsClient::default(); +/// +/// let req = client +/// .get("/test/file.json") +/// .concat2() +/// .map(|res| { +/// let out = io::stdout(); +/// let mut out = out.lock(); +/// +/// out.write_all(&res).unwrap(); +/// }) +/// .map_err(|e| eprintln!("{}", e)); +/// +/// hyper::rt::run(req); +/// # } +/// ``` +/// +/// There are also a bunch of examples included in the project, which +/// I used for testing +/// +/// You can run any of the examples with cargo: +/// +/// ```sh +/// $ cargo run -p ipfs-api --example add_file +/// ``` +#[cfg(feature = "hyper")] +extern crate hyper; +#[cfg(feature = "hyper")] +extern crate hyper_multipart_rfc7578 as hyper_multipart; extern crate bytes; #[macro_use] extern crate failure; extern crate futures; extern crate http; -extern crate hyper; -extern crate hyper_multipart_rfc7578 as hyper_multipart; extern crate serde; #[macro_use] extern crate serde_derive; diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs index b12c8b2..d9b2f98 100644 --- a/ipfs-api/src/read.rs +++ b/ipfs-api/src/read.rs @@ -6,10 +6,9 @@ // copied, modified, or distributed except according to those terms. // -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::{Async, Stream}; use header::X_STREAM_ERROR; -use hyper::Chunk; use response::Error; use serde::Deserialize; use serde_json; @@ -122,7 +121,7 @@ impl Decoder for LineDecoder { enum ReadState { /// A chunk is ready to be read from. /// - Ready(Chunk, usize), + Ready(Bytes, usize), /// The next chunk isn't ready yet. /// @@ -138,7 +137,7 @@ pub struct StreamReader { impl StreamReader where - S: Stream, + S: Stream, { #[inline] pub fn new(stream: S) -> StreamReader { @@ -151,7 +150,7 @@ where impl Read for StreamReader where - S: Stream, + S: Stream, { fn read(&mut self, buf: &mut [u8]) -> io::Result { loop { @@ -203,4 +202,4 @@ where } } -impl AsyncRead for StreamReader where S: Stream {} +impl AsyncRead for StreamReader where S: Stream {} diff --git a/ipfs-api/src/request/add.rs b/ipfs-api/src/request/add.rs index be7960d..123ba4a 100644 --- a/ipfs-api/src/request/add.rs +++ b/ipfs-api/src/request/add.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use hyper::Method; +use http::Method; use request::ApiRequest; pub struct Add; diff --git a/ipfs-api/src/request/block.rs b/ipfs-api/src/request/block.rs index 24f5d39..a6dd99c 100644 --- a/ipfs-api/src/request/block.rs +++ b/ipfs-api/src/request/block.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use hyper::Method; +use http::Method; use request::ApiRequest; #[derive(Serialize)] diff --git a/ipfs-api/src/request/config.rs b/ipfs-api/src/request/config.rs index e586278..e3753ea 100644 --- a/ipfs-api/src/request/config.rs +++ b/ipfs-api/src/request/config.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use hyper::Method; +use http::Method; use request::ApiRequest; pub struct ConfigEdit; diff --git a/ipfs-api/src/request/dag.rs b/ipfs-api/src/request/dag.rs index 78705f9..2d49c3a 100644 --- a/ipfs-api/src/request/dag.rs +++ b/ipfs-api/src/request/dag.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use hyper::Method; +use http::Method; use request::ApiRequest; #[derive(Serialize)] diff --git a/ipfs-api/src/request/files.rs b/ipfs-api/src/request/files.rs index 179e1a4..394d7b3 100644 --- a/ipfs-api/src/request/files.rs +++ b/ipfs-api/src/request/files.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use hyper::Method; +use http::Method; use request::ApiRequest; #[derive(Serialize)] diff --git a/ipfs-api/src/request/mod.rs b/ipfs-api/src/request/mod.rs index 947a912..52df37c 100644 --- a/ipfs-api/src/request/mod.rs +++ b/ipfs-api/src/request/mod.rs @@ -109,5 +109,5 @@ pub trait ApiRequest { /// Method used to make the request. /// - const METHOD: &'static ::hyper::Method = &::hyper::Method::GET; + const METHOD: &'static ::http::Method = &::http::Method::GET; } diff --git a/ipfs-api/src/request/tar.rs b/ipfs-api/src/request/tar.rs index 26772b9..05c1770 100644 --- a/ipfs-api/src/request/tar.rs +++ b/ipfs-api/src/request/tar.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. // -use hyper::Method; +use http::Method; use request::ApiRequest; pub struct TarAdd; diff --git a/ipfs-api/src/response/error.rs b/ipfs-api/src/response/error.rs index 94f7fa9..ca69de6 100644 --- a/ipfs-api/src/response/error.rs +++ b/ipfs-api/src/response/error.rs @@ -5,8 +5,10 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // - +#[cfg(feature = "actix")] +use actix_web; use http; +#[cfg(feature = "hyper")] use hyper; use serde_json; use serde_urlencoded; @@ -23,10 +25,23 @@ pub struct ApiError { #[derive(Fail, Debug)] pub enum Error { - // Foreign errors. + /// Foreign errors. + #[cfg(feature = "hyper")] #[fail(display = "hyper client error '{}'", _0)] Client(hyper::Error), + #[cfg(feature = "actix")] + #[fail(display = "actix client error '{}'", _0)] + Client(actix_web::error::Error), + + #[cfg(feature = "actix")] + #[fail(display = "actix client payload error '{}'", _0)] + ClientPayload(actix_web::error::PayloadError), + + #[cfg(feature = "actix")] + #[fail(display = "actix client send request error '{}'", _0)] + ClientSend(actix_web::client::SendRequestError), + #[fail(display = "http error '{}'", _0)] Http(http::Error), @@ -61,12 +76,34 @@ pub enum Error { Uncategorized(String), } +#[cfg(feature = "hyper")] impl From for Error { fn from(err: hyper::Error) -> Error { Error::Client(err) } } +#[cfg(feature = "actix")] +impl From for Error { + fn from(err: actix_web::error::Error) -> Error { + Error::Client(err) + } +} + +#[cfg(feature = "actix")] +impl From for Error { + fn from(err: actix_web::client::SendRequestError) -> Error { + Error::ClientSend(err) + } +} + +#[cfg(feature = "actix")] +impl From for Error { + fn from(err: actix_web::error::PayloadError) -> Error { + Error::ClientPayload(err) + } +} + impl From for Error { fn from(err: http::Error) -> Error { Error::Http(err) -- cgit v1.2.3