summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-11-26 15:17:44 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-11-26 15:17:44 -0500
commit8e9a5d3f8f2e092ccf06dfd738575d40a1bb791b (patch)
tree6b1260564f46605bb651e22dd054dbf0fbbaf62d
parent43413b530a8e81cb5442cf05423def787cdf1811 (diff)
stream read responses
-rw-r--r--ipfs-api/src/client.rs67
-rw-r--r--ipfs-cli/src/command/block.rs8
-rw-r--r--ipfs-cli/src/command/cat.rs9
-rw-r--r--ipfs-cli/src/command/files.rs9
4 files changed, 66 insertions, 27 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 5e56dd4..d45e510 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -233,15 +233,15 @@ impl IpfsClient {
}
/// Generic method for making a request to the Ipfs server, and getting
- /// back raw bytes.
+ /// back a raw String response.
///
- fn request_bytes<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<Vec<u8>>
+ fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
let res = self.request_raw(req, form).and_then(
|(status, chunk)| match status {
- StatusCode::Ok => Ok(chunk.to_vec()),
+ StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
},
);
@@ -250,18 +250,43 @@ impl IpfsClient {
}
/// Generic method for making a request to the Ipfs server, and getting
- /// back a raw String response.
+ /// back a raw stream of bytes.
///
- fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String>
+ fn request_stream_bytes<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form>,
+ ) -> AsyncStreamResponse<Vec<u8>>
where
Req: ApiRequest + Serialize,
{
- let res = self.request_raw(req, form).and_then(
- |(status, chunk)| match status {
- StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
- },
- );
+ let res = self.build_base_request(req, form)
+ .map(|req| self.client.request(req).from_err())
+ .into_future()
+ .flatten()
+ .map(|res| {
+ let stream: Box<Stream<Item = Vec<u8>, Error = _>> = match res.status() {
+ // If the server responded OK, the data can be streamed back.
+ //
+ StatusCode::Ok => Box::new(res.body().map(|chunk| chunk.to_vec()).from_err()),
+
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ res.body()
+ .concat2()
+ .from_err()
+ .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
+ .into_stream(),
+ ),
+
+ };
+
+ stream
+ })
+ .flatten_stream();
Box::new(res)
}
@@ -352,8 +377,8 @@ impl IpfsClient {
/// Gets a raw IPFS block.
///
#[inline]
- pub fn block_get(&self, hash: &str) -> AsyncResponse<response::BlockGetResponse> {
- self.request_bytes(&request::BlockGet { hash }, None)
+ pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<response::BlockGetResponse> {
+ self.request_stream_bytes(&request::BlockGet { hash }, None)
}
/// Store input as an IPFS block.
@@ -408,8 +433,8 @@ impl IpfsClient {
/// Returns the contents of an Ipfs object.
///
#[inline]
- pub fn cat(&self, path: &str) -> AsyncResponse<response::CatResponse> {
- self.request_bytes(&request::Cat { path }, None)
+ pub fn cat(&self, path: &str) -> AsyncStreamResponse<response::CatResponse> {
+ self.request_stream_bytes(&request::Cat { path }, None)
}
/// List available commands that the server accepts.
@@ -599,8 +624,8 @@ impl IpfsClient {
/// Read a file in MFS.
///
#[inline]
- pub fn files_read(&self, path: &str) -> AsyncResponse<response::FilesReadResponse> {
- self.request_bytes(&request::FilesRead { path }, None)
+ pub fn files_read(&self, path: &str) -> AsyncStreamResponse<response::FilesReadResponse> {
+ self.request_stream_bytes(&request::FilesRead { path }, None)
}
/// Remove a file in MFS.
@@ -672,8 +697,8 @@ impl IpfsClient {
/// Download Ipfs object.
///
#[inline]
- pub fn get(&self, path: &str) -> AsyncResponse<response::GetResponse> {
- self.request_bytes(&request::Get { path }, None)
+ pub fn get(&self, path: &str) -> AsyncStreamResponse<response::GetResponse> {
+ self.request_stream_bytes(&request::Get { path }, None)
}
/// Returns information about a peer.
@@ -908,8 +933,8 @@ impl IpfsClient {
/// Export a tar file from Ipfs.
///
#[inline]
- pub fn tar_cat(&self, path: &str) -> AsyncResponse<response::TarCatResponse> {
- self.request_bytes(&request::TarCat { path }, None)
+ pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<response::TarCatResponse> {
+ self.request_stream_bytes(&request::TarCat { path }, None)
}
/// Returns information about the Ipfs server version.
diff --git a/ipfs-cli/src/command/block.rs b/ipfs-cli/src/command/block.rs
index 8cf004e..0a181ad 100644
--- a/ipfs-cli/src/command/block.rs
+++ b/ipfs-cli/src/command/block.rs
@@ -8,6 +8,7 @@
use clap::{App, ArgMatches};
use command::{verify_file, EXPECTED_API, EXPECTED_FILE};
+use futures::stream::Stream;
use ipfs_api::IpfsClient;
use std::fs::File;
use tokio_core::reactor::Core;
@@ -41,9 +42,12 @@ pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) {
match args.subcommand() {
("get", Some(args)) => {
let key = args.value_of("KEY").unwrap();
- let block = core.run(client.block_get(key)).expect(EXPECTED_API);
+ let req = client.block_get(key).for_each(|chunk| {
+ println!("{}", String::from_utf8_lossy(&chunk));
+ Ok(())
+ });
- println!("{}", String::from_utf8_lossy(&block));
+ core.run(req).expect(EXPECTED_API);
}
("put", Some(args)) => {
let path = args.value_of("INPUT").unwrap();
diff --git a/ipfs-cli/src/command/cat.rs b/ipfs-cli/src/command/cat.rs
index 55872b0..559bd24 100644
--- a/ipfs-cli/src/command/cat.rs
+++ b/ipfs-cli/src/command/cat.rs
@@ -8,6 +8,7 @@
use clap::{App, ArgMatches};
use command::EXPECTED_API;
+use futures::stream::Stream;
use ipfs_api::IpfsClient;
use tokio_core::reactor::Core;
@@ -23,7 +24,11 @@ pub fn signature<'a, 'b>() -> App<'a, 'b> {
pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) {
let path = args.value_of("PATH").unwrap();
- let data = core.run(client.cat(&path)).expect(EXPECTED_API);
+ let req = client.cat(&path).for_each(|chunk| {
+ println!("{}", String::from_utf8_lossy(&chunk));
- println!("{}", String::from_utf8_lossy(&data));
+ Ok(())
+ });
+
+ core.run(req).expect(EXPECTED_API);
}
diff --git a/ipfs-cli/src/command/files.rs b/ipfs-cli/src/command/files.rs
index 03bef88..7cdf3de 100644
--- a/ipfs-cli/src/command/files.rs
+++ b/ipfs-cli/src/command/files.rs
@@ -8,6 +8,7 @@
use clap::{App, ArgMatches};
use command::{verify_file, EXPECTED_API, EXPECTED_FILE};
+use futures::stream::Stream;
use ipfs_api::IpfsClient;
use std::fs::File;
use tokio_core::reactor::Core;
@@ -123,9 +124,13 @@ pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) {
}
("read", Some(args)) => {
let path = args.value_of("PATH").unwrap();
- let data = core.run(client.files_read(&path)).expect(EXPECTED_API);
+ let req = client.files_read(&path).for_each(|chunk| {
+ println!("{}", String::from_utf8_lossy(&chunk));
- println!("{}", String::from_utf8_lossy(&data));
+ Ok(())
+ });
+
+ core.run(req).expect(EXPECTED_API);
}
("rm", Some(args)) => {
let path = args.value_of("PATH").unwrap();