summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/client.rs
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-11-26 15:17:44 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-11-26 15:17:44 -0500
commit8e9a5d3f8f2e092ccf06dfd738575d40a1bb791b (patch)
tree6b1260564f46605bb651e22dd054dbf0fbbaf62d /ipfs-api/src/client.rs
parent43413b530a8e81cb5442cf05423def787cdf1811 (diff)
stream read responses
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r--ipfs-api/src/client.rs67
1 files changed, 46 insertions, 21 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 5e56dd4..d45e510 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -233,15 +233,15 @@ impl IpfsClient {
}
/// Generic method for making a request to the Ipfs server, and getting
- /// back raw bytes.
+ /// back a raw String response.
///
- fn request_bytes<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<Vec<u8>>
+ fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
let res = self.request_raw(req, form).and_then(
|(status, chunk)| match status {
- StatusCode::Ok => Ok(chunk.to_vec()),
+ StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
},
);
@@ -250,18 +250,43 @@ impl IpfsClient {
}
/// Generic method for making a request to the Ipfs server, and getting
- /// back a raw String response.
+ /// back a raw stream of bytes.
///
- fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String>
+ fn request_stream_bytes<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form>,
+ ) -> AsyncStreamResponse<Vec<u8>>
where
Req: ApiRequest + Serialize,
{
- let res = self.request_raw(req, form).and_then(
- |(status, chunk)| match status {
- StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
- },
- );
+ let res = self.build_base_request(req, form)
+ .map(|req| self.client.request(req).from_err())
+ .into_future()
+ .flatten()
+ .map(|res| {
+ let stream: Box<Stream<Item = Vec<u8>, Error = _>> = match res.status() {
+ // If the server responded OK, the data can be streamed back.
+ //
+ StatusCode::Ok => Box::new(res.body().map(|chunk| chunk.to_vec()).from_err()),
+
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ res.body()
+ .concat2()
+ .from_err()
+ .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
+ .into_stream(),
+ ),
+
+ };
+
+ stream
+ })
+ .flatten_stream();
Box::new(res)
}
@@ -352,8 +377,8 @@ impl IpfsClient {
/// Gets a raw IPFS block.
///
#[inline]
- pub fn block_get(&self, hash: &str) -> AsyncResponse<response::BlockGetResponse> {
- self.request_bytes(&request::BlockGet { hash }, None)
+ pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<response::BlockGetResponse> {
+ self.request_stream_bytes(&request::BlockGet { hash }, None)
}
/// Store input as an IPFS block.
@@ -408,8 +433,8 @@ impl IpfsClient {
/// Returns the contents of an Ipfs object.
///
#[inline]
- pub fn cat(&self, path: &str) -> AsyncResponse<response::CatResponse> {
- self.request_bytes(&request::Cat { path }, None)
+ pub fn cat(&self, path: &str) -> AsyncStreamResponse<response::CatResponse> {
+ self.request_stream_bytes(&request::Cat { path }, None)
}
/// List available commands that the server accepts.
@@ -599,8 +624,8 @@ impl IpfsClient {
/// Read a file in MFS.
///
#[inline]
- pub fn files_read(&self, path: &str) -> AsyncResponse<response::FilesReadResponse> {
- self.request_bytes(&request::FilesRead { path }, None)
+ pub fn files_read(&self, path: &str) -> AsyncStreamResponse<response::FilesReadResponse> {
+ self.request_stream_bytes(&request::FilesRead { path }, None)
}
/// Remove a file in MFS.
@@ -672,8 +697,8 @@ impl IpfsClient {
/// Download Ipfs object.
///
#[inline]
- pub fn get(&self, path: &str) -> AsyncResponse<response::GetResponse> {
- self.request_bytes(&request::Get { path }, None)
+ pub fn get(&self, path: &str) -> AsyncStreamResponse<response::GetResponse> {
+ self.request_stream_bytes(&request::Get { path }, None)
}
/// Returns information about a peer.
@@ -908,8 +933,8 @@ impl IpfsClient {
/// Export a tar file from Ipfs.
///
#[inline]
- pub fn tar_cat(&self, path: &str) -> AsyncResponse<response::TarCatResponse> {
- self.request_bytes(&request::TarCat { path }, None)
+ pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<response::TarCatResponse> {
+ self.request_stream_bytes(&request::TarCat { path }, None)
}
/// Returns information about the Ipfs server version.