diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2017-11-26 15:17:44 -0500 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2017-11-26 15:17:44 -0500 |
commit | 8e9a5d3f8f2e092ccf06dfd738575d40a1bb791b (patch) | |
tree | 6b1260564f46605bb651e22dd054dbf0fbbaf62d | |
parent | 43413b530a8e81cb5442cf05423def787cdf1811 (diff) |
stream read responses
-rw-r--r-- | ipfs-api/src/client.rs | 67 | ||||
-rw-r--r-- | ipfs-cli/src/command/block.rs | 8 | ||||
-rw-r--r-- | ipfs-cli/src/command/cat.rs | 9 | ||||
-rw-r--r-- | ipfs-cli/src/command/files.rs | 9 |
4 files changed, 66 insertions, 27 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 5e56dd4..d45e510 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -233,15 +233,15 @@ impl IpfsClient { } /// Generic method for making a request to the Ipfs server, and getting - /// back raw bytes. + /// back a raw String response. /// - fn request_bytes<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<Vec<u8>> + fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String> where Req: ApiRequest + Serialize, { let res = self.request_raw(req, form).and_then( |(status, chunk)| match status { - StatusCode::Ok => Ok(chunk.to_vec()), + StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from), _ => Err(Self::build_error_from_body(chunk)), }, ); @@ -250,18 +250,43 @@ impl IpfsClient { } /// Generic method for making a request to the Ipfs server, and getting - /// back a raw String response. + /// back a raw stream of bytes. /// - fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String> + fn request_stream_bytes<Req>( + &self, + req: &Req, + form: Option<multipart::Form>, + ) -> AsyncStreamResponse<Vec<u8>> where Req: ApiRequest + Serialize, { - let res = self.request_raw(req, form).and_then( - |(status, chunk)| match status { - StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), - }, - ); + let res = self.build_base_request(req, form) + .map(|req| self.client.request(req).from_err()) + .into_future() + .flatten() + .map(|res| { + let stream: Box<Stream<Item = Vec<u8>, Error = _>> = match res.status() { + // If the server responded OK, the data can be streamed back. + // + StatusCode::Ok => Box::new(res.body().map(|chunk| chunk.to_vec()).from_err()), + + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.body() + .concat2() + .from_err() + .and_then(|chunk| Err(Self::build_error_from_body(chunk))) + .into_stream(), + ), + + }; + + stream + }) + .flatten_stream(); Box::new(res) } @@ -352,8 +377,8 @@ impl IpfsClient { /// Gets a raw IPFS block. /// #[inline] - pub fn block_get(&self, hash: &str) -> AsyncResponse<response::BlockGetResponse> { - self.request_bytes(&request::BlockGet { hash }, None) + pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<response::BlockGetResponse> { + self.request_stream_bytes(&request::BlockGet { hash }, None) } /// Store input as an IPFS block. @@ -408,8 +433,8 @@ impl IpfsClient { /// Returns the contents of an Ipfs object. /// #[inline] - pub fn cat(&self, path: &str) -> AsyncResponse<response::CatResponse> { - self.request_bytes(&request::Cat { path }, None) + pub fn cat(&self, path: &str) -> AsyncStreamResponse<response::CatResponse> { + self.request_stream_bytes(&request::Cat { path }, None) } /// List available commands that the server accepts. @@ -599,8 +624,8 @@ impl IpfsClient { /// Read a file in MFS. /// #[inline] - pub fn files_read(&self, path: &str) -> AsyncResponse<response::FilesReadResponse> { - self.request_bytes(&request::FilesRead { path }, None) + pub fn files_read(&self, path: &str) -> AsyncStreamResponse<response::FilesReadResponse> { + self.request_stream_bytes(&request::FilesRead { path }, None) } /// Remove a file in MFS. @@ -672,8 +697,8 @@ impl IpfsClient { /// Download Ipfs object. /// #[inline] - pub fn get(&self, path: &str) -> AsyncResponse<response::GetResponse> { - self.request_bytes(&request::Get { path }, None) + pub fn get(&self, path: &str) -> AsyncStreamResponse<response::GetResponse> { + self.request_stream_bytes(&request::Get { path }, None) } /// Returns information about a peer. @@ -908,8 +933,8 @@ impl IpfsClient { /// Export a tar file from Ipfs. /// #[inline] - pub fn tar_cat(&self, path: &str) -> AsyncResponse<response::TarCatResponse> { - self.request_bytes(&request::TarCat { path }, None) + pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<response::TarCatResponse> { + self.request_stream_bytes(&request::TarCat { path }, None) } /// Returns information about the Ipfs server version. diff --git a/ipfs-cli/src/command/block.rs b/ipfs-cli/src/command/block.rs index 8cf004e..0a181ad 100644 --- a/ipfs-cli/src/command/block.rs +++ b/ipfs-cli/src/command/block.rs @@ -8,6 +8,7 @@ use clap::{App, ArgMatches}; use command::{verify_file, EXPECTED_API, EXPECTED_FILE}; +use futures::stream::Stream; use ipfs_api::IpfsClient; use std::fs::File; use tokio_core::reactor::Core; @@ -41,9 +42,12 @@ pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) { match args.subcommand() { ("get", Some(args)) => { let key = args.value_of("KEY").unwrap(); - let block = core.run(client.block_get(key)).expect(EXPECTED_API); + let req = client.block_get(key).for_each(|chunk| { + println!("{}", String::from_utf8_lossy(&chunk)); + Ok(()) + }); - println!("{}", String::from_utf8_lossy(&block)); + core.run(req).expect(EXPECTED_API); } ("put", Some(args)) => { let path = args.value_of("INPUT").unwrap(); diff --git a/ipfs-cli/src/command/cat.rs b/ipfs-cli/src/command/cat.rs index 55872b0..559bd24 100644 --- a/ipfs-cli/src/command/cat.rs +++ b/ipfs-cli/src/command/cat.rs @@ -8,6 +8,7 @@ use clap::{App, ArgMatches}; use command::EXPECTED_API; +use futures::stream::Stream; use ipfs_api::IpfsClient; use tokio_core::reactor::Core; @@ -23,7 +24,11 @@ pub fn signature<'a, 'b>() -> App<'a, 'b> { pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) { let path = args.value_of("PATH").unwrap(); - let data = core.run(client.cat(&path)).expect(EXPECTED_API); + let req = client.cat(&path).for_each(|chunk| { + println!("{}", String::from_utf8_lossy(&chunk)); - println!("{}", String::from_utf8_lossy(&data)); + Ok(()) + }); + + core.run(req).expect(EXPECTED_API); } diff --git a/ipfs-cli/src/command/files.rs b/ipfs-cli/src/command/files.rs index 03bef88..7cdf3de 100644 --- a/ipfs-cli/src/command/files.rs +++ b/ipfs-cli/src/command/files.rs @@ -8,6 +8,7 @@ use clap::{App, ArgMatches}; use command::{verify_file, EXPECTED_API, EXPECTED_FILE}; +use futures::stream::Stream; use ipfs_api::IpfsClient; use std::fs::File; use tokio_core::reactor::Core; @@ -123,9 +124,13 @@ pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) { } ("read", Some(args)) => { let path = args.value_of("PATH").unwrap(); - let data = core.run(client.files_read(&path)).expect(EXPECTED_API); + let req = client.files_read(&path).for_each(|chunk| { + println!("{}", String::from_utf8_lossy(&chunk)); - println!("{}", String::from_utf8_lossy(&data)); + Ok(()) + }); + + core.run(req).expect(EXPECTED_API); } ("rm", Some(args)) => { let path = args.value_of("PATH").unwrap(); |