From 5cb64f09b0b90f464d98f1e60a3b040ec231bd28 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Fri, 1 Dec 2017 00:20:44 -0500 Subject: reduce some duplicate code, and remove unnecessary functions --- ipfs-api/src/client.rs | 219 +++++++++++++++++++++---------------------------- 1 file changed, 95 insertions(+), 124 deletions(-) (limited to 'ipfs-api/src/client.rs') diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index a31bead..e4fef28 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -6,8 +6,8 @@ // copied, modified, or distributed except according to those terms. // -use futures::Stream; use futures::future::{Future, IntoFuture}; +use futures::stream::{self, Stream}; use header::Trailer; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; @@ -145,58 +145,77 @@ impl IpfsClient { Box::new(stream) } - /// Sends a request and returns the raw response. - /// - /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder` - /// instance. - /// - fn send_request(&self, req: Request) -> AsyncResponse<(StatusCode, Chunk)> { - let res = self.client - .request(req) - .and_then(|res| { - let status = res.status(); - - res.body().concat2().map(move |chunk| (status, chunk)) - }) - .from_err(); - - Box::new(res) - } - - /// Sends a request and deserializes the response into Json. - /// - /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder` - /// instance. + /// Generates a request, and returns the unprocessed response future. /// - fn send_request_json(&self, req: Request) -> AsyncResponse + fn request_raw( + &self, + req: &Req, + form: Option, + ) -> AsyncResponse<(StatusCode, Chunk)> where - for<'de> Res: 'static + Deserialize<'de>, + Req: ApiRequest + Serialize, { - let res = self.send_request(req).into_future().and_then( - |(status, chunk)| { - IpfsClient::process_json_response(status, chunk) - }, - ); - - Box::new(res) + match self.build_base_request(req, form) { + Ok(req) => { + let res = self.client + .request(req) + .and_then(|res| { + let status = res.status(); + + res.body().concat2().map(move |chunk| (status, chunk)) + }) + .from_err(); + + Box::new(res) + } + Err(e) => Box::new(Err(e).into_future()), + } } - /// Generates a request, and returns the unprocessed response future. + /// Generic method for making a request that expects back a streaming + /// response. /// - fn request_raw( + fn request_stream( &self, req: &Req, form: Option, - ) -> AsyncResponse<(StatusCode, Chunk)> + process: F, + ) -> AsyncStreamResponse where Req: ApiRequest + Serialize, + Res: 'static, + F: 'static + Fn(hyper::Response) -> AsyncStreamResponse, { - let res = self.build_base_request(req, form) - .map(|req| self.send_request(req)) - .into_future() - .flatten(); + match self.build_base_request(req, form) { + Ok(req) => { + let res = self.client + .request(req) + .from_err() + .map(move |res| { + let stream: Box> = match res.status() { + StatusCode::Ok => process(res), + + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.body() + .concat2() + .from_err() + .and_then(|chunk| Err(Self::build_error_from_body(chunk))) + .into_stream(), + ), + }; - Box::new(res) + stream + }) + .flatten_stream(); + + Box::new(res) + } + Err(e) => Box::new(stream::once(Err(e))), + } } /// Generic method for making a request to the Ipfs server, and getting @@ -207,10 +226,9 @@ impl IpfsClient { Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, { - let res = self.build_base_request(req, form) - .map(|req| self.send_request_json(req)) - .into_future() - .flatten(); + let res = self.request_raw(req, form).and_then(|(status, chunk)| { + IpfsClient::process_json_response(status, chunk) + }); Box::new(res) } @@ -249,6 +267,7 @@ impl IpfsClient { Box::new(res) } + /// Generic method for making a request to the Ipfs server, and getting /// back a raw stream of bytes. /// @@ -260,41 +279,13 @@ impl IpfsClient { where Req: ApiRequest + Serialize, { - let res = self.build_base_request(req, form) - .map(|req| self.client.request(req).from_err()) - .into_future() - .flatten() - .map(|res| { - let stream: Box> = match res.status() { - // If the server responded OK, the data can be streamed back. - // - StatusCode::Ok => Box::new(res.body().map(|chunk| chunk).from_err()), - - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.body() - .concat2() - .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) - .into_stream(), - ), - - }; - - stream - }) - .flatten_stream(); - - Box::new(res) + self.request_stream(req, form, |res| Box::new(res.body().from_err())) } /// Generic method to return a streaming response of deserialized json /// objects delineated by new line separators. /// - fn request_stream( + fn request_stream_json( &self, req: &Req, form: Option, @@ -303,44 +294,24 @@ impl IpfsClient { Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, { - let res = self.build_base_request(req, form) - .map(|req| self.client.request(req).from_err()) - .into_future() - .flatten() - .map(|res| { - let stream: Box> = match res.status() { - StatusCode::Ok => { - let parse_stream_error = if let Some(trailer) = res.headers().get() { - // Response has the Trailer header set. The StreamError trailer - // is used to indicate that there was an error while streaming - // data with Ipfs. - // - match trailer { - &Trailer::StreamError => true, - } - } else { - false - }; - - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) - } - _ => Box::new( - res.body() - .concat2() - .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) - .into_stream(), - ), - }; - - stream - }) - .flatten_stream(); - - Box::new(res) + self.request_stream(req, form, |res| { + let parse_stream_error = if let Some(trailer) = res.headers().get() { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + match trailer { + &Trailer::StreamError => true, + } + } else { + false + }; + + Box::new(IpfsClient::process_stream_response( + res, + JsonLineDecoder::new(parse_stream_error), + )) + }) } } @@ -844,7 +815,7 @@ impl IpfsClient { /// #[inline] pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse { - self.request_stream(&request::DhtFindPeer { peer }, None) + self.request_stream_json(&request::DhtFindPeer { peer }, None) } /// Find peers in the DHT that can provide a specific value given a key. @@ -869,7 +840,7 @@ impl IpfsClient { /// #[inline] pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse { - self.request_stream(&request::DhtFindProvs { key }, None) + self.request_stream_json(&request::DhtFindProvs { key }, None) } /// Query the DHT for a given key. @@ -894,7 +865,7 @@ impl IpfsClient { /// #[inline] pub fn dht_get(&self, key: &str) -> AsyncStreamResponse { - self.request_stream(&request::DhtGet { key }, None) + self.request_stream_json(&request::DhtGet { key }, None) } /// Announce to the network that you are providing a given value. @@ -919,7 +890,7 @@ impl IpfsClient { /// #[inline] pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse { - self.request_stream(&request::DhtProvide { key }, None) + self.request_stream_json(&request::DhtProvide { key }, None) } /// Write a key/value pair to the DHT. @@ -943,7 +914,7 @@ impl IpfsClient { /// #[inline] pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse { - self.request_stream(&request::DhtPut { key, value }, None) + self.request_stream_json(&request::DhtPut { key, value }, None) } /// Find the closest peer given the peer ID by querying the DHT. @@ -968,7 +939,7 @@ impl IpfsClient { /// #[inline] pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse { - self.request_stream(&request::DhtQuery { peer }, None) + self.request_stream_json(&request::DhtQuery { peer }, None) } /// Clear inactive requests from the log. @@ -1198,7 +1169,7 @@ impl IpfsClient { /// #[inline] pub fn filestore_dups(&self) -> AsyncStreamResponse { - self.request_stream(&request::FilestoreDups, None) + self.request_stream_json(&request::FilestoreDups, None) } /// List objects in filestore. @@ -1208,7 +1179,7 @@ impl IpfsClient { &self, cid: &Option<&str>, ) -> AsyncStreamResponse { - self.request_stream(&request::FilestoreLs { cid }, None) + self.request_stream_json(&request::FilestoreLs { cid }, None) } /// Verify objects in filestore. @@ -1218,7 +1189,7 @@ impl IpfsClient { &self, cid: &Option<&str>, ) -> AsyncStreamResponse { - self.request_stream(&request::FilestoreVerify { cid }, None) + self.request_stream_json(&request::FilestoreVerify { cid }, None) } /// Download Ipfs object. @@ -1356,7 +1327,7 @@ impl IpfsClient { peer: &str, count: &Option, ) -> AsyncStreamResponse { - self.request_stream(&request::Ping { peer, count }, None) + self.request_stream_json(&request::Ping { peer, count }, None) } /// List subscribed pubsub topics. @@ -1395,14 +1366,14 @@ impl IpfsClient { topic: &str, discover: &Option, ) -> AsyncStreamResponse { - self.request_stream(&request::PubsubSub { topic, discover }, None) + self.request_stream_json(&request::PubsubSub { topic, discover }, None) } /// Gets a list of local references. /// #[inline] pub fn refs_local(&self) -> AsyncStreamResponse { - self.request_stream(&request::RefsLocal, None) + self.request_stream_json(&request::RefsLocal, None) } /// Returns bitswap stats. -- cgit v1.2.3