From 4bdebbf4d1d1edb839eba860b013e3fdb870f66e Mon Sep 17 00:00:00 2001 From: Sameer Puri Date: Sat, 9 Feb 2019 14:34:00 -0600 Subject: Add actix feature for using actix-web --- ipfs-api/src/client.rs | 168 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 131 insertions(+), 37 deletions(-) (limited to 'ipfs-api/src/client.rs') diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 0c3277b..5e1e9f5 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -5,19 +5,22 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // - +#[cfg(feature = "actix")] +use actix_multipart::client::multipart; +#[cfg(feature = "actix")] +use actix_web::HttpMessage; +use bytes::Bytes; use futures::{ future, stream::{self, Stream}, Future, IntoFuture, }; use header::TRAILER; -use http::uri::InvalidUri; -use hyper::{ - self, - client::{Client, HttpConnector}, - Chunk, Request, Response, StatusCode, Uri, -}; +use http::uri::{InvalidUri, Uri}; +use http::StatusCode; +#[cfg(feature = "hyper")] +use hyper::client::{Client, HttpConnector}; +#[cfg(feature = "hyper")] use hyper_multipart::client::multipart; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; @@ -31,17 +34,34 @@ use tokio_codec::{Decoder, FramedRead}; /// A response returned by the HTTP client. /// +#[cfg(feature = "actix")] +type AsyncResponse = Box + 'static>; +#[cfg(feature = "hyper")] type AsyncResponse = Box + Send + 'static>; /// A future that returns a stream of responses. /// +#[cfg(feature = "actix")] +type AsyncStreamResponse = Box + 'static>; +#[cfg(feature = "hyper")] type AsyncStreamResponse = Box + Send + 'static>; +#[cfg(feature = "actix")] +type Request = actix_web::client::ClientRequest; +#[cfg(feature = "hyper")] +type Request = http::Request; + +#[cfg(feature = "actix")] +type Response = actix_web::client::ClientResponse; +#[cfg(feature = "hyper")] +type Response = http::Response; + /// Asynchronous Ipfs client. /// #[derive(Clone)] pub struct IpfsClient { base: Uri, + #[cfg(feature = "hyper")] client: Client, } @@ -101,6 +121,7 @@ impl IpfsClient { Ok(IpfsClient { base: base_path, + #[cfg(feature = "hyper")] client: Client::builder().keep_alive(false).build_http(), }) } @@ -117,7 +138,7 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> Result, Error> + ) -> Result where Req: ApiRequest + Serialize, { @@ -127,9 +148,9 @@ impl IpfsClient { Req::PATH, ::serde_urlencoded::to_string(req)? ); - - url.parse::().map_err(From::from).and_then(move |url| { - let mut builder = Request::builder(); + #[cfg(feature = "hyper")] + let req = url.parse::().map_err(From::from).and_then(move |url| { + let mut builder = http::Request::builder(); let mut builder = builder.method(Req::METHOD.clone()).uri(url); let req = if let Some(form) = form { @@ -139,13 +160,29 @@ impl IpfsClient { }; req.map_err(From::from) - }) + }); + #[cfg(feature = "actix")] + let req = if let Some(form) = form { + Request::build() + .method(Req::METHOD.clone()) + .uri(url) + .content_type(form.content_type()) + .streaming(multipart::Body::from(form)) + .map_err(From::from) + } else { + Request::build() + .method(Req::METHOD.clone()) + .uri(url) + .finish() + .map_err(From::from) + }; + req } /// Builds an Api error from a response body. /// #[inline] - fn build_error_from_body(chunk: Chunk) -> Error { + fn build_error_from_body(chunk: Bytes) -> Error { match serde_json::from_slice(&chunk) { Ok(e) => Error::Api(e), Err(_) => match String::from_utf8(chunk.to_vec()) { @@ -158,7 +195,7 @@ impl IpfsClient { /// Processes a response that expects a json encoded body, returning an /// error or a deserialized json response. /// - fn process_json_response(status: StatusCode, chunk: Chunk) -> Result + fn process_json_response(status: StatusCode, chunk: Bytes) -> Result where for<'de> Res: 'static + Deserialize<'de>, { @@ -171,15 +208,19 @@ impl IpfsClient { /// Processes a response that returns a stream of json deserializable /// results. /// - fn process_stream_response( - res: Response, - decoder: D, - ) -> AsyncStreamResponse + fn process_stream_response(res: Response, decoder: D) -> AsyncStreamResponse where D: 'static + Decoder + Send, Res: 'static, { - let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder); + #[cfg(feature = "hyper")] + let stream = FramedRead::new( + StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()), + decoder, + ); + + #[cfg(feature = "actix")] + let stream = FramedRead::new(StreamReader::new(res.payload().from_err()), decoder); Box::new(stream) } @@ -190,22 +231,33 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncResponse<(StatusCode, Chunk)> + ) -> AsyncResponse<(StatusCode, Bytes)> where Req: ApiRequest + Serialize, { match self.build_base_request(req, form) { Ok(req) => { + #[cfg(feature = "hyper")] let res = self .client .request(req) .and_then(|res| { let status = res.status(); - res.into_body().concat2().map(move |chunk| (status, chunk)) + res.into_body() + .concat2() + .map(move |chunk| (status, chunk.into_bytes())) }) .from_err(); - + #[cfg(feature = "actix")] + let res = req + .send() + .timeout(std::time::Duration::from_secs(90)) + .from_err() + .and_then(|x| { + let status = x.status(); + x.body().map(move |body| (status, body)).from_err() + }); Box::new(res) } Err(e) => Box::new(Err(e).into_future()), @@ -224,8 +276,9 @@ impl IpfsClient { where Req: ApiRequest + Serialize, Res: 'static + Send, - F: 'static + Fn(hyper::Response) -> AsyncStreamResponse + Send, + F: 'static + Fn(Response) -> AsyncStreamResponse + Send, { + #[cfg(feature = "hyper")] match self.build_base_request(req, form) { Ok(req) => { let res = self @@ -244,7 +297,9 @@ impl IpfsClient { res.into_body() .concat2() .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) + .and_then(|chunk| { + Err(Self::build_error_from_body(chunk.into_bytes())) + }) .into_stream(), ), }; @@ -252,17 +307,31 @@ impl IpfsClient { stream }) .flatten_stream(); - Box::new(res) } Err(e) => Box::new(stream::once(Err(e))), } + #[cfg(feature = "actix")] + match self.build_base_request(req, form) { + Ok(req) => { + let res = req + .send() + .timeout(std::time::Duration::from_secs(90)) + .from_err(); + Box::new(res.map(process).flatten_stream()) + } + Err(e) => Box::new(stream::once(Err(e))), + } } /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request(&self, req: &Req, form: Option>) -> AsyncResponse + fn request( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse where Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de> + Send, @@ -277,7 +346,11 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// - fn request_empty(&self, req: &Req, form: Option>) -> AsyncResponse<()> + fn request_empty( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse<()> where Req: ApiRequest + Serialize, { @@ -294,7 +367,11 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// - fn request_string(&self, req: &Req, form: Option>) -> AsyncResponse + fn request_string( + &self, + req: &Req, + form: Option>, + ) -> AsyncResponse where Req: ApiRequest + Serialize, { @@ -315,11 +392,17 @@ impl IpfsClient { &self, req: &Req, form: Option>, - ) -> AsyncStreamResponse + ) -> AsyncStreamResponse where Req: ApiRequest + Serialize, { - self.request_stream(req, form, |res| Box::new(res.into_body().from_err())) + #[cfg(feature = "hyper")] + let res = self.request_stream(req, form, |res| { + Box::new(res.into_body().from_err().map(|c| c.into_bytes())) + }); + #[cfg(feature = "actix")] + let res = self.request_stream(req, form, |res| Box::new(res.payload().from_err())); + res } /// Generic method to return a streaming response of deserialized json @@ -595,7 +678,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn block_get(&self, hash: &str) -> AsyncStreamResponse { + pub fn block_get(&self, hash: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::BlockGet { hash }, None) } @@ -747,7 +830,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn cat(&self, path: &str) -> AsyncStreamResponse { + pub fn cat(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::Cat { path }, None) } @@ -1204,7 +1287,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn files_read(&self, path: &str) -> AsyncStreamResponse { + pub fn files_read(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::FilesRead { path }, None) } @@ -1363,7 +1446,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn get(&self, path: &str) -> AsyncStreamResponse { + pub fn get(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::Get { path }, None) } @@ -1528,6 +1611,7 @@ impl IpfsClient { /// ``` /// pub fn log_tail(&self) -> AsyncStreamResponse { + #[cfg(feature = "hyper")] let res = self .build_base_request(&request::LogTail, None) .map(|req| self.client.request(req).from_err()) @@ -1535,7 +1619,17 @@ impl IpfsClient { .flatten() .map(|res| IpfsClient::process_stream_response(res, LineDecoder)) .flatten_stream(); - + #[cfg(feature = "actix")] + let res = self + .build_base_request(&request::LogTail, None) + .into_future() + .and_then(|req| { + req.send() + .timeout(std::time::Duration::from_secs(90)) + .from_err() + }) + .map(|res| IpfsClient::process_stream_response(res, LineDecoder)) + .flatten_stream(); Box::new(res) } @@ -1644,7 +1738,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn object_data(&self, key: &str) -> AsyncStreamResponse { + pub fn object_data(&self, key: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::ObjectData { key }, None) } @@ -2155,7 +2249,7 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse { + pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse { self.request_stream_bytes(&request::TarCat { path }, None) } -- cgit v1.2.3