summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r--ipfs-api/src/client.rs168
1 files changed, 131 insertions, 37 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 0c3277b..5e1e9f5 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -5,19 +5,22 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//
-
+#[cfg(feature = "actix")]
+use actix_multipart::client::multipart;
+#[cfg(feature = "actix")]
+use actix_web::HttpMessage;
+use bytes::Bytes;
use futures::{
future,
stream::{self, Stream},
Future, IntoFuture,
};
use header::TRAILER;
-use http::uri::InvalidUri;
-use hyper::{
- self,
- client::{Client, HttpConnector},
- Chunk, Request, Response, StatusCode, Uri,
-};
+use http::uri::{InvalidUri, Uri};
+use http::StatusCode;
+#[cfg(feature = "hyper")]
+use hyper::client::{Client, HttpConnector};
+#[cfg(feature = "hyper")]
use hyper_multipart::client::multipart;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
@@ -31,17 +34,34 @@ use tokio_codec::{Decoder, FramedRead};
/// A response returned by the HTTP client.
///
+#[cfg(feature = "actix")]
+type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + 'static>;
+#[cfg(feature = "hyper")]
type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + Send + 'static>;
/// A future that returns a stream of responses.
///
+#[cfg(feature = "actix")]
+type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + 'static>;
+#[cfg(feature = "hyper")]
type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + Send + 'static>;
+#[cfg(feature = "actix")]
+type Request = actix_web::client::ClientRequest;
+#[cfg(feature = "hyper")]
+type Request = http::Request<hyper::Body>;
+
+#[cfg(feature = "actix")]
+type Response = actix_web::client::ClientResponse;
+#[cfg(feature = "hyper")]
+type Response = http::Response<hyper::Body>;
+
/// Asynchronous Ipfs client.
///
#[derive(Clone)]
pub struct IpfsClient {
base: Uri,
+ #[cfg(feature = "hyper")]
client: Client<HttpConnector, hyper::Body>,
}
@@ -101,6 +121,7 @@ impl IpfsClient {
Ok(IpfsClient {
base: base_path,
+ #[cfg(feature = "hyper")]
client: Client::builder().keep_alive(false).build_http(),
})
}
@@ -117,7 +138,7 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> Result<Request<hyper::Body>, Error>
+ ) -> Result<Request, Error>
where
Req: ApiRequest + Serialize,
{
@@ -127,9 +148,9 @@ impl IpfsClient {
Req::PATH,
::serde_urlencoded::to_string(req)?
);
-
- url.parse::<Uri>().map_err(From::from).and_then(move |url| {
- let mut builder = Request::builder();
+ #[cfg(feature = "hyper")]
+ let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let mut builder = http::Request::builder();
let mut builder = builder.method(Req::METHOD.clone()).uri(url);
let req = if let Some(form) = form {
@@ -139,13 +160,29 @@ impl IpfsClient {
};
req.map_err(From::from)
- })
+ });
+ #[cfg(feature = "actix")]
+ let req = if let Some(form) = form {
+ Request::build()
+ .method(Req::METHOD.clone())
+ .uri(url)
+ .content_type(form.content_type())
+ .streaming(multipart::Body::from(form))
+ .map_err(From::from)
+ } else {
+ Request::build()
+ .method(Req::METHOD.clone())
+ .uri(url)
+ .finish()
+ .map_err(From::from)
+ };
+ req
}
/// Builds an Api error from a response body.
///
#[inline]
- fn build_error_from_body(chunk: Chunk) -> Error {
+ fn build_error_from_body(chunk: Bytes) -> Error {
match serde_json::from_slice(&chunk) {
Ok(e) => Error::Api(e),
Err(_) => match String::from_utf8(chunk.to_vec()) {
@@ -158,7 +195,7 @@ impl IpfsClient {
/// Processes a response that expects a json encoded body, returning an
/// error or a deserialized json response.
///
- fn process_json_response<Res>(status: StatusCode, chunk: Chunk) -> Result<Res, Error>
+ fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
@@ -171,15 +208,19 @@ impl IpfsClient {
/// Processes a response that returns a stream of json deserializable
/// results.
///
- fn process_stream_response<D, Res>(
- res: Response<hyper::Body>,
- decoder: D,
- ) -> AsyncStreamResponse<Res>
+ fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res>
where
D: 'static + Decoder<Item = Res, Error = Error> + Send,
Res: 'static,
{
- let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder);
+ #[cfg(feature = "hyper")]
+ let stream = FramedRead::new(
+ StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()),
+ decoder,
+ );
+
+ #[cfg(feature = "actix")]
+ let stream = FramedRead::new(StreamReader::new(res.payload().from_err()), decoder);
Box::new(stream)
}
@@ -190,22 +231,33 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<(StatusCode, Chunk)>
+ ) -> AsyncResponse<(StatusCode, Bytes)>
where
Req: ApiRequest + Serialize,
{
match self.build_base_request(req, form) {
Ok(req) => {
+ #[cfg(feature = "hyper")]
let res = self
.client
.request(req)
.and_then(|res| {
let status = res.status();
- res.into_body().concat2().map(move |chunk| (status, chunk))
+ res.into_body()
+ .concat2()
+ .map(move |chunk| (status, chunk.into_bytes()))
})
.from_err();
-
+ #[cfg(feature = "actix")]
+ let res = req
+ .send()
+ .timeout(std::time::Duration::from_secs(90))
+ .from_err()
+ .and_then(|x| {
+ let status = x.status();
+ x.body().map(move |body| (status, body)).from_err()
+ });
Box::new(res)
}
Err(e) => Box::new(Err(e).into_future()),
@@ -224,8 +276,9 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
Res: 'static + Send,
- F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res> + Send,
+ F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
{
+ #[cfg(feature = "hyper")]
match self.build_base_request(req, form) {
Ok(req) => {
let res = self
@@ -244,7 +297,9 @@ impl IpfsClient {
res.into_body()
.concat2()
.from_err()
- .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
+ .and_then(|chunk| {
+ Err(Self::build_error_from_body(chunk.into_bytes()))
+ })
.into_stream(),
),
};
@@ -252,17 +307,31 @@ impl IpfsClient {
stream
})
.flatten_stream();
-
Box::new(res)
}
Err(e) => Box::new(stream::once(Err(e))),
}
+ #[cfg(feature = "actix")]
+ match self.build_base_request(req, form) {
+ Ok(req) => {
+ let res = req
+ .send()
+ .timeout(std::time::Duration::from_secs(90))
+ .from_err();
+ Box::new(res.map(process).flatten_stream())
+ }
+ Err(e) => Box::new(stream::once(Err(e))),
+ }
}
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
- fn request<Req, Res>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<Res>
+ fn request<Req, Res>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
@@ -277,7 +346,11 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// back a response with no body.
///
- fn request_empty<Req>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<()>
+ fn request_empty<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<()>
where
Req: ApiRequest + Serialize,
{
@@ -294,7 +367,11 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// back a raw String response.
///
- fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<String>
+ fn request_string<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
@@ -315,11 +392,17 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Chunk>
+ ) -> AsyncStreamResponse<Bytes>
where
Req: ApiRequest + Serialize,
{
- self.request_stream(req, form, |res| Box::new(res.into_body().from_err()))
+ #[cfg(feature = "hyper")]
+ let res = self.request_stream(req, form, |res| {
+ Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
+ });
+ #[cfg(feature = "actix")]
+ let res = self.request_stream(req, form, |res| Box::new(res.payload().from_err()));
+ res
}
/// Generic method to return a streaming response of deserialized json
@@ -595,7 +678,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::BlockGet { hash }, None)
}
@@ -747,7 +830,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn cat(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::Cat { path }, None)
}
@@ -1204,7 +1287,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn files_read(&self, path: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn files_read(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::FilesRead { path }, None)
}
@@ -1363,7 +1446,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn get(&self, path: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn get(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::Get { path }, None)
}
@@ -1528,6 +1611,7 @@ impl IpfsClient {
/// ```
///
pub fn log_tail(&self) -> AsyncStreamResponse<String> {
+ #[cfg(feature = "hyper")]
let res = self
.build_base_request(&request::LogTail, None)
.map(|req| self.client.request(req).from_err())
@@ -1535,7 +1619,17 @@ impl IpfsClient {
.flatten()
.map(|res| IpfsClient::process_stream_response(res, LineDecoder))
.flatten_stream();
-
+ #[cfg(feature = "actix")]
+ let res = self
+ .build_base_request(&request::LogTail, None)
+ .into_future()
+ .and_then(|req| {
+ req.send()
+ .timeout(std::time::Duration::from_secs(90))
+ .from_err()
+ })
+ .map(|res| IpfsClient::process_stream_response(res, LineDecoder))
+ .flatten_stream();
Box::new(res)
}
@@ -1644,7 +1738,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn object_data(&self, key: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn object_data(&self, key: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::ObjectData { key }, None)
}
@@ -2155,7 +2249,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::TarCat { path }, None)
}