diff options
27 files changed, 524 insertions, 155 deletions
diff --git a/.travis.yml b/.travis.yml index 814e54f..168d15b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,3 +5,10 @@ rust: - nightly cache: cargo: true +script: + - cargo build --verbose + - cargo test --verbose + - cd ipfs-api + - cargo build --verbose --features actix --no-default-features + - cargo test --verbose --features actix --no-default-features + - cd ..
\ No newline at end of file diff --git a/ipfs-api/Cargo.toml b/ipfs-api/Cargo.toml index f3fb604..39cad56 100644 --- a/ipfs-api/Cargo.toml +++ b/ipfs-api/Cargo.toml @@ -13,13 +13,19 @@ license = "MIT OR Apache-2.0" [badges] travis-ci = { repository = "ferristseng/rust-ipfs-api" } +[features] +default = ["hyper", "hyper-multipart-rfc7578"] +actix = ["actix-web", "actix-multipart-rfc7578"] + [dependencies] +actix-multipart-rfc7578 = { version = "0.1", optional = true } +actix-web = { version = "0.7", optional = true } bytes = "0.4" failure = "0.1.2" futures = "0.1" http = "0.1" -hyper = "0.12" -hyper-multipart-rfc7578 = "0.3.0" +hyper = { version = "0.12", optional = true } +hyper-multipart-rfc7578 = { version = "0.3", optional = true } serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/ipfs-api/examples/add_file.rs b/ipfs-api/examples/add_file.rs index d771966..3d9be49 100644 --- a/ipfs-api/examples/add_file.rs +++ b/ipfs-api/examples/add_file.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -27,5 +30,13 @@ fn main() { .map(|add| println!("added file: {:?}", add)) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/add_tar.rs b/ipfs-api/examples/add_tar.rs index 1880712..5d75e28 100644 --- a/ipfs-api/examples/add_tar.rs +++ b/ipfs-api/examples/add_tar.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; extern crate tar; @@ -52,5 +55,13 @@ fn main() { }) .map_err(|e| eprintln!("{}", e)); - hyper::rt::run(req) + #[cfg(feature = "hyper")] + hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/bootstrap_default.rs b/ipfs-api/examples/bootstrap_default.rs index 6ba9575..bc36952 100644 --- a/ipfs-api/examples/bootstrap_default.rs +++ b/ipfs-api/examples/bootstrap_default.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -42,8 +45,7 @@ fn main() { } }); - hyper::rt::run( - bootstrap + let fut = bootstrap .and_then(|_| { println!(); println!("dropping all bootstrap peers..."); @@ -56,6 +58,15 @@ fn main() { add }) - .map_err(|e| eprintln!("{}", e)), - ); + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/dns.rs b/ipfs-api/examples/dns.rs index 78b4086..6b3823d 100644 --- a/ipfs-api/examples/dns.rs +++ b/ipfs-api/examples/dns.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -38,5 +41,13 @@ fn main() { }) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_commands.rs b/ipfs-api/examples/get_commands.rs index b12ffa1..f99167d 100644 --- a/ipfs-api/examples/get_commands.rs +++ b/ipfs-api/examples/get_commands.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -46,5 +49,13 @@ fn main() { .map(|commands| print_recursive(0, &commands)) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_stats.rs b/ipfs-api/examples/get_stats.rs index 41f2a5c..9107aa3 100644 --- a/ipfs-api/examples/get_stats.rs +++ b/ipfs-api/examples/get_stats.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -54,10 +57,18 @@ fn main() { println!(" version : {}", repo_stats.version); }); - hyper::rt::run( - bitswap_stats - .and_then(|_| bw_stats) - .and_then(|_| repo_stats) - .map_err(|e| eprintln!("{}", e)), - ); + let fut = bitswap_stats + .and_then(|_| bw_stats) + .and_then(|_| repo_stats) + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_swarm.rs b/ipfs-api/examples/get_swarm.rs index 2b9fd73..2b2c238 100644 --- a/ipfs-api/examples/get_swarm.rs +++ b/ipfs-api/examples/get_swarm.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -43,9 +46,17 @@ fn main() { } }); - hyper::rt::run( - local - .and_then(|_| connected) - .map_err(|e| eprintln!("{}", e)), - ); + let fut = local + .and_then(|_| connected) + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/get_version.rs b/ipfs-api/examples/get_version.rs index 41a4ee5..6fb9c7e 100644 --- a/ipfs-api/examples/get_version.rs +++ b/ipfs-api/examples/get_version.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -23,5 +26,15 @@ fn main() { .version() .map(|version| println!("version: {:?}", version.version)); - hyper::rt::run(req.map_err(|e| eprintln!("{}", e))); + let fut = req.map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/mfs.rs b/ipfs-api/examples/mfs.rs index 8bea192..97d1824 100644 --- a/ipfs-api/examples/mfs.rs +++ b/ipfs-api/examples/mfs.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -46,7 +49,7 @@ fn main() { let file_rm = client.files_rm("/test", true); - hyper::rt::run( + let fut = mkdir .and_then(|_| { println!("making dirs /test/does/not/exist/yet..."); @@ -78,6 +81,16 @@ fn main() { file_rm }) .map(|_| println!("done!")) - .map_err(|e| eprintln!("{}", e)), - ) + .map_err(|e| eprintln!("{}", e)) + ; + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/ping_peer.rs b/ipfs-api/examples/ping_peer.rs index 1d32210..c60e895 100644 --- a/ipfs-api/examples/ping_peer.rs +++ b/ipfs-api/examples/ping_peer.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -59,5 +62,13 @@ fn main() { }) .map_err(|e| eprintln!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs index 10ee0c5..39fb25a 100644 --- a/ipfs-api/examples/pubsub.rs +++ b/ipfs-api/examples/pubsub.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; extern crate tokio_timer; @@ -49,7 +52,11 @@ fn main() { println!(); println!("starting task to publish messages to ({})...", TOPIC); + + #[cfg(feature = "hyper")] hyper::rt::run(publish); + #[cfg(feature = "actix")] + actix_web::actix::spawn(publish); }); // This block will execute a future that suscribes to a topic, @@ -61,15 +68,24 @@ fn main() { println!(); println!("waiting for messages on ({})...", TOPIC); - hyper::rt::run( - req.take(5) - .for_each(|msg| { - println!(); - println!("received ({:?})", msg); + let fut = req + .take(5) + .for_each(|msg| { + println!(); + println!("received ({:?})", msg); + + Ok(()) + }) + .map_err(|e| eprintln!("{}", e)); - Ok(()) - }) - .map_err(|e| eprintln!("{}", e)), - ) + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } } diff --git a/ipfs-api/examples/replace_config.rs b/ipfs-api/examples/replace_config.rs index f354386..9c1ec61 100644 --- a/ipfs-api/examples/replace_config.rs +++ b/ipfs-api/examples/replace_config.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -27,5 +30,13 @@ fn main() { .map(|_| println!("replaced file")) .map_err(|e| println!("{}", e)); + #[cfg(feature = "hyper")] hyper::rt::run(req); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + req.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/examples/resolve_name.rs b/ipfs-api/examples/resolve_name.rs index 4bcb648..6d400dc 100644 --- a/ipfs-api/examples/resolve_name.rs +++ b/ipfs-api/examples/resolve_name.rs @@ -6,7 +6,10 @@ // copied, modified, or distributed except according to those terms. // +#[cfg(feature = "actix")] +extern crate actix_web; extern crate futures; +#[cfg(feature = "hyper")] extern crate hyper; extern crate ipfs_api; @@ -40,9 +43,17 @@ fn main() { }) }); - hyper::rt::run( - name_resolve - .and_then(|_| name_publish) - .map_err(|e| eprintln!("{}", e)), - ); + let fut = name_resolve + .and_then(|_| name_publish) + .map_err(|e| eprintln!("{}", e)); + + #[cfg(feature = "hyper")] + hyper::rt::run(fut); + #[cfg(feature = "actix")] + actix_web::actix::run(|| { + fut.then(|_| { + actix_web::actix::System::current().stop(); + Ok(()) + }) + }); } diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 0c3277b..5e1e9f5 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -5,19 +5,22 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // - +#[cfg(feature = "actix")] +use actix_multipart::client::multipart; +#[cfg(feature = "actix")] +use actix_web::HttpMessage; +use bytes::Bytes; use futures::{ future, stream::{self, Stream}, Future, IntoFuture, }; use header::TRAILER; -use http::uri::InvalidUri; -use hyper::{ - self, - client::{Client, HttpConnector}, - Chunk, Request, Response, StatusCode, Uri, -}; +use http::uri::{InvalidUri, Uri}; +use http::StatusCode; +#[cfg(feature = "hyper")] +use hyper::client::{Client, HttpConnector}; +#[cfg(feature = "hyper")] use hyper_multipart::client::multipart; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; @@ -31,17 +34,34 @@ use tokio_codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// +#[cfg(feature = "actix")] +type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + 'static>; +#[cfg(feature = "hyper")] type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + Send + 'static>; /// A future that returns a stream of responses. /// +#[cfg(feature = "actix")] +type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + 'static>; +#[cfg(feature = "hyper")] type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + Send + 'static>; +#[cfg(feature = "actix")] +type Request = actix_web::client::ClientRequest; +#[cfg(feature = "hyper")] +type Request = http::Request<hyper::Body>; + +#[cfg(feature = "actix")] +type Response = actix_web::client::ClientResponse; +#[cfg(feature = "hyper")] +type Response = http::Response<hyper::Body>; + /// Asynchronous Ipfs client. /// #[derive(Clone)] pub struct IpfsClient { base: Uri, + #[cfg(feature = "hyper")] client: Client<HttpConnector, hyper::Body>, } @@ -101,6 +121,7 @@ impl IpfsClient { Ok(IpfsClient { base: base_path, + #[cfg(feature = "hyper")] client: Client::builder().keep_alive(false).build_http(), }) } @@ -117,7 +138,7 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form<'static>>, - ) -> Result<Request<hyper::Body>, Error> + ) -> Result<Request, Error> where Req: ApiRequest + Serialize, { @@ -127,9 +148,9 @@ impl IpfsClient { Req::PATH, ::serde_urlencoded::to_string(req)? ); - - url.parse::<Uri>().map_err(From::from).and_then(move |url| { - let mut builder = Request::builder(); + #[cfg(feature = "hyper")] + let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| { + let mut builder = http::Request::builder(); let mut builder = builder.method(Req::METHOD.clone()).uri(url); let req = if let Some(form) = form { @@ -139,13 +160,29 @@ impl IpfsClient { }; req.map_err(From::from) - }) + }); + #[cfg(feature = "actix")] + let req = if let Some(form) = form { + Request::build() + .method(Req::METHOD.clone()) + .uri(url) + .content_type(form.content_type()) + .streaming(multipart::Body::from(form)) + .map_err(From::from) + } else { + Request::build() + .method(Req::METHOD.clone()) + .uri(url) + .finish() + .map_err(From::from) + }; + req } /// Builds an Api error from a response body. /// #[inline] - fn build_error_from_body(chunk: Chunk) -> Error { + fn build_error_from_body(chunk: Bytes) -> Error { match serde_json::from_slice(&chunk) { Ok(e) => Error::Api(e), Err(_) => match String::from_utf8(chunk.to_vec()) { @@ -158,7 +195,7 @@ impl IpfsClient { /// Processes a response that expects a json encoded body, returning an /// error or a deserialized json response. /// - fn process_json_response<Res>(status: StatusCode, chunk: Chunk) -> Result<Res, Error> + fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error> where for<'de> Res: 'static + Deserialize<'de>, { @@ -171,15 +208,19 @@ impl IpfsClient { /// Processes a response that returns a stream of json deserializable /// results. /// - fn process_stream_response<D, Res>( - res: Response<hyper::Body>, - decoder: D, - ) -> AsyncStreamResponse<Res> + fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res> where D: 'static + Decoder<Item = Res, Error = Error> + Send, Res: 'static, { - let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder); + #[cfg(feature = "hyper")] + let stream = FramedRead::new( + StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()), + decoder, + ); + + #[cfg(feature = "actix")] + let stream = FramedRead::new(StreamReader::new(res.payload().from_err()), decoder); Box::new(stream) } @@ -190,22 +231,33 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<(StatusCode, Chunk)> + ) -> AsyncResponse<(StatusCode, Bytes)> where Req: ApiRequest + Serialize, { match self.build_base_request(req, form) { Ok(req) => { + #[cfg(feature = "hyper")] let res = self .client .request(req) .and_then(|res| { let status = res.status(); - res.into_body().concat2().map(move |chunk| (status, chunk)) + res.into_body() + .concat2() + .map(move |chunk| (status, chunk.into_bytes())) }) .from_err(); - + #[cfg(feature = "actix")] + let res = req + .send() + .timeout(std::time::Duration::from_secs(90)) + .from_err() + .and_then(|x| { + let status = x.status(); + x.body().map(move |body| (status, body)).from_err() + }); Box::new(res) } Err(e) => Box::new(Err(e).into_future()), @@ -224,8 +276,9 @@ impl IpfsClient { where Req: ApiRequest + Serialize, Res: 'static + Send, - F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res> + Send, + F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send, { + #[cfg(feature = "hyper")] match self.build_base_request(req, form) { Ok(req) => { let res = self @@ -244,7 +297,9 @@ impl IpfsClient { res.into_body() .concat2() .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) + .and_then(|chunk| { + Err(Self::build_error_from_body(chunk.into_bytes())) + }) .into_stream(), ), }; @@ -252,17 +307,31 @@ impl IpfsClient { stream }) .flatten_stream(); - Box::new(res) } Err(e) => Box::new(stream::once(Err(e))), } + #[cfg(feature = "actix")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = req + .send() + .timeout(std::time::Duration::from_secs(90)) + .from_err(); + Box::new(res.map(process).flatten_stream()) + } + Err(e) => Box::new(stream::once(Err(e))), + } } /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request<Req, Res>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<Res> + fn request<Req, Res>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + ) -> AsyncResponse<Res> where Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de> + Send, @@ -277,7 +346,11 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// - fn request_empty<Req>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<()> + fn request_empty<Req>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + ) -> AsyncResponse<()> where Req: ApiRequest + Serialize, { @@ -294,7 +367,11 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// - fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<String> + fn request_string<Req>( + &self, + req: &Req, + form: Option<multipart::Form<'static>>, + ) -> AsyncResponse<String> where Req: ApiRequest + Serialize, { @@ -315,11 +392,17 @@ impl IpfsClient { &self, req: &Req, form: Option<multipart::Form<'static>>, - ) -> AsyncStreamResponse<Chunk> + ) -> AsyncStreamResponse<Bytes> where Req: ApiRequest + Serialize, { - self.request_stream(req, form, |res| Box::new(res.into_body().from_err())) + #[cfg(feature = "hyper")] + let res = self.request_stream(req, form, |res| { + Box::new(res.into_body().from_err().map(|c| c.into_bytes())) + }); + #[cfg(feature = "actix")] + let res = self.request_stream(req, form, |res| Box::new(res.payload().from_err())); + res } /// Generic method to return a streaming response of deserialized json @@ -595,7 +678,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Chunk> { + pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Bytes> { |