summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSameer Puri <sameer@users.noreply.github.com>2019-02-09 14:34:00 -0600
committerSameer Puri <sameer@users.noreply.github.com>2019-02-13 18:29:04 -0600
commit4bdebbf4d1d1edb839eba860b013e3fdb870f66e (patch)
tree965a5a80b387c0eb719ca568a96a729d9d3f2ade
parentf125fdbde329095419630e02ebf33d2c73aeced7 (diff)
Add actix feature for using actix-web
-rw-r--r--.travis.yml7
-rw-r--r--ipfs-api/Cargo.toml10
-rw-r--r--ipfs-api/examples/add_file.rs11
-rw-r--r--ipfs-api/examples/add_tar.rs13
-rw-r--r--ipfs-api/examples/bootstrap_default.rs19
-rw-r--r--ipfs-api/examples/dns.rs11
-rw-r--r--ipfs-api/examples/get_commands.rs11
-rw-r--r--ipfs-api/examples/get_stats.rs23
-rw-r--r--ipfs-api/examples/get_swarm.rs21
-rw-r--r--ipfs-api/examples/get_version.rs15
-rw-r--r--ipfs-api/examples/mfs.rs19
-rw-r--r--ipfs-api/examples/ping_peer.rs11
-rw-r--r--ipfs-api/examples/pubsub.rs34
-rw-r--r--ipfs-api/examples/replace_config.rs11
-rw-r--r--ipfs-api/examples/resolve_name.rs21
-rw-r--r--ipfs-api/src/client.rs168
-rw-r--r--ipfs-api/src/header.rs2
-rw-r--r--ipfs-api/src/lib.rs206
-rw-r--r--ipfs-api/src/read.rs11
-rw-r--r--ipfs-api/src/request/add.rs2
-rw-r--r--ipfs-api/src/request/block.rs2
-rw-r--r--ipfs-api/src/request/config.rs2
-rw-r--r--ipfs-api/src/request/dag.rs2
-rw-r--r--ipfs-api/src/request/files.rs2
-rw-r--r--ipfs-api/src/request/mod.rs2
-rw-r--r--ipfs-api/src/request/tar.rs2
-rw-r--r--ipfs-api/src/response/error.rs41
27 files changed, 524 insertions, 155 deletions
diff --git a/.travis.yml b/.travis.yml
index 814e54f..168d15b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,3 +5,10 @@ rust:
- nightly
cache:
cargo: true
+script:
+ - cargo build --verbose
+ - cargo test --verbose
+ - cd ipfs-api
+ - cargo build --verbose --features actix --no-default-features
+ - cargo test --verbose --features actix --no-default-features
+ - cd .. \ No newline at end of file
diff --git a/ipfs-api/Cargo.toml b/ipfs-api/Cargo.toml
index f3fb604..39cad56 100644
--- a/ipfs-api/Cargo.toml
+++ b/ipfs-api/Cargo.toml
@@ -13,13 +13,19 @@ license = "MIT OR Apache-2.0"
[badges]
travis-ci = { repository = "ferristseng/rust-ipfs-api" }
+[features]
+default = ["hyper", "hyper-multipart-rfc7578"]
+actix = ["actix-web", "actix-multipart-rfc7578"]
+
[dependencies]
+actix-multipart-rfc7578 = { version = "0.1", optional = true }
+actix-web = { version = "0.7", optional = true }
bytes = "0.4"
failure = "0.1.2"
futures = "0.1"
http = "0.1"
-hyper = "0.12"
-hyper-multipart-rfc7578 = "0.3.0"
+hyper = { version = "0.12", optional = true }
+hyper-multipart-rfc7578 = { version = "0.3", optional = true }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
diff --git a/ipfs-api/examples/add_file.rs b/ipfs-api/examples/add_file.rs
index d771966..3d9be49 100644
--- a/ipfs-api/examples/add_file.rs
+++ b/ipfs-api/examples/add_file.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -27,5 +30,13 @@ fn main() {
.map(|add| println!("added file: {:?}", add))
.map_err(|e| eprintln!("{}", e));
+ #[cfg(feature = "hyper")]
hyper::rt::run(req);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ req.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/add_tar.rs b/ipfs-api/examples/add_tar.rs
index 1880712..5d75e28 100644
--- a/ipfs-api/examples/add_tar.rs
+++ b/ipfs-api/examples/add_tar.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
extern crate tar;
@@ -52,5 +55,13 @@ fn main() {
})
.map_err(|e| eprintln!("{}", e));
- hyper::rt::run(req)
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(req);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ req.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/bootstrap_default.rs b/ipfs-api/examples/bootstrap_default.rs
index 6ba9575..bc36952 100644
--- a/ipfs-api/examples/bootstrap_default.rs
+++ b/ipfs-api/examples/bootstrap_default.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -42,8 +45,7 @@ fn main() {
}
});
- hyper::rt::run(
- bootstrap
+ let fut = bootstrap
.and_then(|_| {
println!();
println!("dropping all bootstrap peers...");
@@ -56,6 +58,15 @@ fn main() {
add
})
- .map_err(|e| eprintln!("{}", e)),
- );
+ .map_err(|e| eprintln!("{}", e));
+
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/dns.rs b/ipfs-api/examples/dns.rs
index 78b4086..6b3823d 100644
--- a/ipfs-api/examples/dns.rs
+++ b/ipfs-api/examples/dns.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -38,5 +41,13 @@ fn main() {
})
.map_err(|e| eprintln!("{}", e));
+ #[cfg(feature = "hyper")]
hyper::rt::run(req);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ req.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/get_commands.rs b/ipfs-api/examples/get_commands.rs
index b12ffa1..f99167d 100644
--- a/ipfs-api/examples/get_commands.rs
+++ b/ipfs-api/examples/get_commands.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -46,5 +49,13 @@ fn main() {
.map(|commands| print_recursive(0, &commands))
.map_err(|e| eprintln!("{}", e));
+ #[cfg(feature = "hyper")]
hyper::rt::run(req);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ req.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/get_stats.rs b/ipfs-api/examples/get_stats.rs
index 41f2a5c..9107aa3 100644
--- a/ipfs-api/examples/get_stats.rs
+++ b/ipfs-api/examples/get_stats.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -54,10 +57,18 @@ fn main() {
println!(" version : {}", repo_stats.version);
});
- hyper::rt::run(
- bitswap_stats
- .and_then(|_| bw_stats)
- .and_then(|_| repo_stats)
- .map_err(|e| eprintln!("{}", e)),
- );
+ let fut = bitswap_stats
+ .and_then(|_| bw_stats)
+ .and_then(|_| repo_stats)
+ .map_err(|e| eprintln!("{}", e));
+
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/get_swarm.rs b/ipfs-api/examples/get_swarm.rs
index 2b9fd73..2b2c238 100644
--- a/ipfs-api/examples/get_swarm.rs
+++ b/ipfs-api/examples/get_swarm.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -43,9 +46,17 @@ fn main() {
}
});
- hyper::rt::run(
- local
- .and_then(|_| connected)
- .map_err(|e| eprintln!("{}", e)),
- );
+ let fut = local
+ .and_then(|_| connected)
+ .map_err(|e| eprintln!("{}", e));
+
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/get_version.rs b/ipfs-api/examples/get_version.rs
index 41a4ee5..6fb9c7e 100644
--- a/ipfs-api/examples/get_version.rs
+++ b/ipfs-api/examples/get_version.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -23,5 +26,15 @@ fn main() {
.version()
.map(|version| println!("version: {:?}", version.version));
- hyper::rt::run(req.map_err(|e| eprintln!("{}", e)));
+ let fut = req.map_err(|e| eprintln!("{}", e));
+
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/mfs.rs b/ipfs-api/examples/mfs.rs
index 8bea192..97d1824 100644
--- a/ipfs-api/examples/mfs.rs
+++ b/ipfs-api/examples/mfs.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -46,7 +49,7 @@ fn main() {
let file_rm = client.files_rm("/test", true);
- hyper::rt::run(
+ let fut =
mkdir
.and_then(|_| {
println!("making dirs /test/does/not/exist/yet...");
@@ -78,6 +81,16 @@ fn main() {
file_rm
})
.map(|_| println!("done!"))
- .map_err(|e| eprintln!("{}", e)),
- )
+ .map_err(|e| eprintln!("{}", e))
+ ;
+
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/ping_peer.rs b/ipfs-api/examples/ping_peer.rs
index 1d32210..c60e895 100644
--- a/ipfs-api/examples/ping_peer.rs
+++ b/ipfs-api/examples/ping_peer.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -59,5 +62,13 @@ fn main() {
})
.map_err(|e| eprintln!("{}", e));
+ #[cfg(feature = "hyper")]
hyper::rt::run(req);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ req.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs
index 10ee0c5..39fb25a 100644
--- a/ipfs-api/examples/pubsub.rs
+++ b/ipfs-api/examples/pubsub.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
extern crate tokio_timer;
@@ -49,7 +52,11 @@ fn main() {
println!();
println!("starting task to publish messages to ({})...", TOPIC);
+
+ #[cfg(feature = "hyper")]
hyper::rt::run(publish);
+ #[cfg(feature = "actix")]
+ actix_web::actix::spawn(publish);
});
// This block will execute a future that suscribes to a topic,
@@ -61,15 +68,24 @@ fn main() {
println!();
println!("waiting for messages on ({})...", TOPIC);
- hyper::rt::run(
- req.take(5)
- .for_each(|msg| {
- println!();
- println!("received ({:?})", msg);
+ let fut = req
+ .take(5)
+ .for_each(|msg| {
+ println!();
+ println!("received ({:?})", msg);
+
+ Ok(())
+ })
+ .map_err(|e| eprintln!("{}", e));
- Ok(())
- })
- .map_err(|e| eprintln!("{}", e)),
- )
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
}
diff --git a/ipfs-api/examples/replace_config.rs b/ipfs-api/examples/replace_config.rs
index f354386..9c1ec61 100644
--- a/ipfs-api/examples/replace_config.rs
+++ b/ipfs-api/examples/replace_config.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -27,5 +30,13 @@ fn main() {
.map(|_| println!("replaced file"))
.map_err(|e| println!("{}", e));
+ #[cfg(feature = "hyper")]
hyper::rt::run(req);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ req.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/examples/resolve_name.rs b/ipfs-api/examples/resolve_name.rs
index 4bcb648..6d400dc 100644
--- a/ipfs-api/examples/resolve_name.rs
+++ b/ipfs-api/examples/resolve_name.rs
@@ -6,7 +6,10 @@
// copied, modified, or distributed except according to those terms.
//
+#[cfg(feature = "actix")]
+extern crate actix_web;
extern crate futures;
+#[cfg(feature = "hyper")]
extern crate hyper;
extern crate ipfs_api;
@@ -40,9 +43,17 @@ fn main() {
})
});
- hyper::rt::run(
- name_resolve
- .and_then(|_| name_publish)
- .map_err(|e| eprintln!("{}", e)),
- );
+ let fut = name_resolve
+ .and_then(|_| name_publish)
+ .map_err(|e| eprintln!("{}", e));
+
+ #[cfg(feature = "hyper")]
+ hyper::rt::run(fut);
+ #[cfg(feature = "actix")]
+ actix_web::actix::run(|| {
+ fut.then(|_| {
+ actix_web::actix::System::current().stop();
+ Ok(())
+ })
+ });
}
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 0c3277b..5e1e9f5 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -5,19 +5,22 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//
-
+#[cfg(feature = "actix")]
+use actix_multipart::client::multipart;
+#[cfg(feature = "actix")]
+use actix_web::HttpMessage;
+use bytes::Bytes;
use futures::{
future,
stream::{self, Stream},
Future, IntoFuture,
};
use header::TRAILER;
-use http::uri::InvalidUri;
-use hyper::{
- self,
- client::{Client, HttpConnector},
- Chunk, Request, Response, StatusCode, Uri,
-};
+use http::uri::{InvalidUri, Uri};
+use http::StatusCode;
+#[cfg(feature = "hyper")]
+use hyper::client::{Client, HttpConnector};
+#[cfg(feature = "hyper")]
use hyper_multipart::client::multipart;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
@@ -31,17 +34,34 @@ use tokio_codec::{Decoder, FramedRead};
/// A response returned by the HTTP client.
///
+#[cfg(feature = "actix")]
+type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + 'static>;
+#[cfg(feature = "hyper")]
type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + Send + 'static>;
/// A future that returns a stream of responses.
///
+#[cfg(feature = "actix")]
+type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + 'static>;
+#[cfg(feature = "hyper")]
type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + Send + 'static>;
+#[cfg(feature = "actix")]
+type Request = actix_web::client::ClientRequest;
+#[cfg(feature = "hyper")]
+type Request = http::Request<hyper::Body>;
+
+#[cfg(feature = "actix")]
+type Response = actix_web::client::ClientResponse;
+#[cfg(feature = "hyper")]
+type Response = http::Response<hyper::Body>;
+
/// Asynchronous Ipfs client.
///
#[derive(Clone)]
pub struct IpfsClient {
base: Uri,
+ #[cfg(feature = "hyper")]
client: Client<HttpConnector, hyper::Body>,
}
@@ -101,6 +121,7 @@ impl IpfsClient {
Ok(IpfsClient {
base: base_path,
+ #[cfg(feature = "hyper")]
client: Client::builder().keep_alive(false).build_http(),
})
}
@@ -117,7 +138,7 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> Result<Request<hyper::Body>, Error>
+ ) -> Result<Request, Error>
where
Req: ApiRequest + Serialize,
{
@@ -127,9 +148,9 @@ impl IpfsClient {
Req::PATH,
::serde_urlencoded::to_string(req)?
);
-
- url.parse::<Uri>().map_err(From::from).and_then(move |url| {
- let mut builder = Request::builder();
+ #[cfg(feature = "hyper")]
+ let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let mut builder = http::Request::builder();
let mut builder = builder.method(Req::METHOD.clone()).uri(url);
let req = if let Some(form) = form {
@@ -139,13 +160,29 @@ impl IpfsClient {
};
req.map_err(From::from)
- })
+ });
+ #[cfg(feature = "actix")]
+ let req = if let Some(form) = form {
+ Request::build()
+ .method(Req::METHOD.clone())
+ .uri(url)
+ .content_type(form.content_type())
+ .streaming(multipart::Body::from(form))
+ .map_err(From::from)
+ } else {
+ Request::build()
+ .method(Req::METHOD.clone())
+ .uri(url)
+ .finish()
+ .map_err(From::from)
+ };
+ req
}
/// Builds an Api error from a response body.
///
#[inline]
- fn build_error_from_body(chunk: Chunk) -> Error {
+ fn build_error_from_body(chunk: Bytes) -> Error {
match serde_json::from_slice(&chunk) {
Ok(e) => Error::Api(e),
Err(_) => match String::from_utf8(chunk.to_vec()) {
@@ -158,7 +195,7 @@ impl IpfsClient {
/// Processes a response that expects a json encoded body, returning an
/// error or a deserialized json response.
///
- fn process_json_response<Res>(status: StatusCode, chunk: Chunk) -> Result<Res, Error>
+ fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
@@ -171,15 +208,19 @@ impl IpfsClient {
/// Processes a response that returns a stream of json deserializable
/// results.
///
- fn process_stream_response<D, Res>(
- res: Response<hyper::Body>,
- decoder: D,
- ) -> AsyncStreamResponse<Res>
+ fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res>
where
D: 'static + Decoder<Item = Res, Error = Error> + Send,
Res: 'static,
{
- let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder);
+ #[cfg(feature = "hyper")]
+ let stream = FramedRead::new(
+ StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()),
+ decoder,
+ );
+
+ #[cfg(feature = "actix")]
+ let stream = FramedRead::new(StreamReader::new(res.payload().from_err()), decoder);
Box::new(stream)
}
@@ -190,22 +231,33 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<(StatusCode, Chunk)>
+ ) -> AsyncResponse<(StatusCode, Bytes)>
where
Req: ApiRequest + Serialize,
{
match self.build_base_request(req, form) {
Ok(req) => {
+ #[cfg(feature = "hyper")]
let res = self
.client
.request(req)
.and_then(|res| {
let status = res.status();
- res.into_body().concat2().map(move |chunk| (status, chunk))
+ res.into_body()
+ .concat2()
+ .map(move |chunk| (status, chunk.into_bytes()))
})
.from_err();
-
+ #[cfg(feature = "actix")]
+ let res = req
+ .send()
+ .timeout(std::time::Duration::from_secs(90))
+ .from_err()
+ .and_then(|x| {
+ let status = x.status();
+ x.body().map(move |body| (status, body)).from_err()
+ });
Box::new(res)
}
Err(e) => Box::new(Err(e).into_future()),
@@ -224,8 +276,9 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
Res: 'static + Send,
- F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res> + Send,
+ F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
{
+ #[cfg(feature = "hyper")]
match self.build_base_request(req, form) {
Ok(req) => {
let res = self
@@ -244,7 +297,9 @@ impl IpfsClient {
res.into_body()
.concat2()
.from_err()
- .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
+ .and_then(|chunk| {
+ Err(Self::build_error_from_body(chunk.into_bytes()))
+ })
.into_stream(),
),
};
@@ -252,17 +307,31 @@ impl IpfsClient {
stream
})
.flatten_stream();
-
Box::new(res)
}
Err(e) => Box::new(stream::once(Err(e))),
}
+ #[cfg(feature = "actix")]
+ match self.build_base_request(req, form) {
+ Ok(req) => {
+ let res = req
+ .send()
+ .timeout(std::time::Duration::from_secs(90))
+ .from_err();
+ Box::new(res.map(process).flatten_stream())
+ }
+ Err(e) => Box::new(stream::once(Err(e))),
+ }
}
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
- fn request<Req, Res>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<Res>
+ fn request<Req, Res>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
@@ -277,7 +346,11 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// back a response with no body.
///
- fn request_empty<Req>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<()>
+ fn request_empty<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<()>
where
Req: ApiRequest + Serialize,
{
@@ -294,7 +367,11 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// back a raw String response.
///
- fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form<'static>>) -> AsyncResponse<String>
+ fn request_string<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
@@ -315,11 +392,17 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Chunk>
+ ) -> AsyncStreamResponse<Bytes>
where
Req: ApiRequest + Serialize,
{
- self.request_stream(req, form, |res| Box::new(res.into_body().from_err()))
+ #[cfg(feature = "hyper")]
+ let res = self.request_stream(req, form, |res| {
+ Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
+ });
+ #[cfg(feature = "actix")]
+ let res = self.request_stream(req, form, |res| Box::new(res.payload().from_err()));
+ res
}
/// Generic method to return a streaming response of deserialized json
@@ -595,7 +678,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Chunk> {
+ pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::BlockGet { hash }, None)
}
@@ -747,7 +830,7 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {