diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2017-12-01 00:20:44 -0500 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2017-12-01 00:20:44 -0500 |
commit | 5cb64f09b0b90f464d98f1e60a3b040ec231bd28 (patch) | |
tree | 2fdeec7517e6ee0606d09cf8c0d7a8e6136a6519 | |
parent | 9b0b950414fbde2170a00220b8aa2ea6fd8860d2 (diff) |
reduce some duplicate code, and remove unnecessary functions
-rw-r--r-- | ipfs-api/src/client.rs | 219 |
1 files changed, 95 insertions, 124 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index a31bead..e4fef28 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -6,8 +6,8 @@ // copied, modified, or distributed except according to those terms. // -use futures::Stream; use futures::future::{Future, IntoFuture}; +use futures::stream::{self, Stream}; use header::Trailer; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; @@ -145,58 +145,77 @@ impl IpfsClient { Box::new(stream) } - /// Sends a request and returns the raw response. - /// - /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder` - /// instance. - /// - fn send_request(&self, req: Request<multipart::Body>) -> AsyncResponse<(StatusCode, Chunk)> { - let res = self.client - .request(req) - .and_then(|res| { - let status = res.status(); - - res.body().concat2().map(move |chunk| (status, chunk)) - }) - .from_err(); - - Box::new(res) - } - - /// Sends a request and deserializes the response into Json. - /// - /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder` - /// instance. + /// Generates a request, and returns the unprocessed response future. /// - fn send_request_json<Res>(&self, req: Request<multipart::Body>) -> AsyncResponse<Res> + fn request_raw<Req>( + &self, + req: &Req, + form: Option<multipart::Form>, + ) -> AsyncResponse<(StatusCode, Chunk)> where - for<'de> Res: 'static + Deserialize<'de>, + Req: ApiRequest + Serialize, { - let res = self.send_request(req).into_future().and_then( - |(status, chunk)| { - IpfsClient::process_json_response(status, chunk) - }, - ); - - Box::new(res) + match self.build_base_request(req, form) { + Ok(req) => { + let res = self.client + .request(req) + .and_then(|res| { + let status = res.status(); + + res.body().concat2().map(move |chunk| (status, chunk)) + }) + .from_err(); + + Box::new(res) + } + Err(e) => Box::new(Err(e).into_future()), + } } - /// Generates a request, and returns the unprocessed response future. + /// Generic method for making a request that expects back a streaming + /// response. /// - fn request_raw<Req>( + fn request_stream<Req, Res, F>( &self, req: &Req, form: Option<multipart::Form>, - ) -> AsyncResponse<(StatusCode, Chunk)> + process: F, + ) -> AsyncStreamResponse<Res> where Req: ApiRequest + Serialize, + Res: 'static, + F: 'static + Fn(hyper::Response) -> AsyncStreamResponse<Res>, { - let res = self.build_base_request(req, form) - .map(|req| self.send_request(req)) - .into_future() - .flatten(); + match self.build_base_request(req, form) { + Ok(req) => { + let res = self.client + .request(req) + .from_err() + .map(move |res| { + let stream: Box<Stream<Item = Res, Error = _>> = match res.status() { + StatusCode::Ok => process(res), + + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => Box::new( + res.body() + .concat2() + .from_err() + .and_then(|chunk| Err(Self::build_error_from_body(chunk))) + .into_stream(), + ), + }; - Box::new(res) + stream + }) + .flatten_stream(); + + Box::new(res) + } + Err(e) => Box::new(stream::once(Err(e))), + } } /// Generic method for making a request to the Ipfs server, and getting @@ -207,10 +226,9 @@ impl IpfsClient { Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, { - let res = self.build_base_request(req, form) - .map(|req| self.send_request_json(req)) - .into_future() - .flatten(); + let res = self.request_raw(req, form).and_then(|(status, chunk)| { + IpfsClient::process_json_response(status, chunk) + }); Box::new(res) } @@ -249,6 +267,7 @@ impl IpfsClient { Box::new(res) } + /// Generic method for making a request to the Ipfs server, and getting /// back a raw stream of bytes. /// @@ -260,41 +279,13 @@ impl IpfsClient { where Req: ApiRequest + Serialize, { - let res = self.build_base_request(req, form) - .map(|req| self.client.request(req).from_err()) - .into_future() - .flatten() - .map(|res| { - let stream: Box<Stream<Item = Chunk, Error = _>> = match res.status() { - // If the server responded OK, the data can be streamed back. - // - StatusCode::Ok => Box::new(res.body().map(|chunk| chunk).from_err()), - - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.body() - .concat2() - .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) - .into_stream(), - ), - - }; - - stream - }) - .flatten_stream(); - - Box::new(res) + self.request_stream(req, form, |res| Box::new(res.body().from_err())) } /// Generic method to return a streaming response of deserialized json /// objects delineated by new line separators. /// - fn request_stream<Req, Res>( + fn request_stream_json<Req, Res>( &self, req: &Req, form: Option<multipart::Form>, @@ -303,44 +294,24 @@ impl IpfsClient { Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, { - let res = self.build_base_request(req, form) - .map(|req| self.client.request(req).from_err()) - .into_future() - .flatten() - .map(|res| { - let stream: Box<Stream<Item = Res, Error = _>> = match res.status() { - StatusCode::Ok => { - let parse_stream_error = if let Some(trailer) = res.headers().get() { - // Response has the Trailer header set. The StreamError trailer - // is used to indicate that there was an error while streaming - // data with Ipfs. - // - match trailer { - &Trailer::StreamError => true, - } - } else { - false - }; - - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) - } - _ => Box::new( - res.body() - .concat2() - .from_err() - .and_then(|chunk| Err(Self::build_error_from_body(chunk))) - .into_stream(), - ), - }; - - stream - }) - .flatten_stream(); - - Box::new(res) + self.request_stream(req, form, |res| { + let parse_stream_error = if let Some(trailer) = res.headers().get() { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + match trailer { + &Trailer::StreamError => true, + } + } else { + false + }; + + Box::new(IpfsClient::process_stream_response( + res, + JsonLineDecoder::new(parse_stream_error), + )) + }) } } @@ -844,7 +815,7 @@ impl IpfsClient { /// #[inline] pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse<response::DhtFindPeerResponse> { - self.request_stream(&request::DhtFindPeer { peer }, None) + self.request_stream_json(&request::DhtFindPeer { peer }, None) } /// Find peers in the DHT that can provide a specific value given a key. @@ -869,7 +840,7 @@ impl IpfsClient { /// #[inline] pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse<response::DhtFindProvsResponse> { - self.request_stream(&request::DhtFindProvs { key }, None) + self.request_stream_json(&request::DhtFindProvs { key }, None) } /// Query the DHT for a given key. @@ -894,7 +865,7 @@ impl IpfsClient { /// #[inline] pub fn dht_get(&self, key: &str) -> AsyncStreamResponse<response::DhtGetResponse> { - self.request_stream(&request::DhtGet { key }, None) + self.request_stream_json(&request::DhtGet { key }, None) } /// Announce to the network that you are providing a given value. @@ -919,7 +890,7 @@ impl IpfsClient { /// #[inline] pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> { - self.request_stream(&request::DhtProvide { key }, None) + self.request_stream_json(&request::DhtProvide { key }, None) } /// Write a key/value pair to the DHT. @@ -943,7 +914,7 @@ impl IpfsClient { /// #[inline] pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse<response::DhtPutResponse> { - self.request_stream(&request::DhtPut { key, value }, None) + self.request_stream_json(&request::DhtPut { key, value }, None) } /// Find the closest peer given the peer ID by querying the DHT. @@ -968,7 +939,7 @@ impl IpfsClient { /// #[inline] pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse<response::DhtQueryResponse> { - self.request_stream(&request::DhtQuery { peer }, None) + self.request_stream_json(&request::DhtQuery { peer }, None) } /// Clear inactive requests from the log. @@ -1198,7 +1169,7 @@ impl IpfsClient { /// #[inline] pub fn filestore_dups(&self) -> AsyncStreamResponse<response::FilestoreDupsResponse> { - self.request_stream(&request::FilestoreDups, None) + self.request_stream_json(&request::FilestoreDups, None) } /// List objects in filestore. @@ -1208,7 +1179,7 @@ impl IpfsClient { &self, cid: &Option<&str>, ) -> AsyncStreamResponse<response::FilestoreLsResponse> { - self.request_stream(&request::FilestoreLs { cid }, None) + self.request_stream_json(&request::FilestoreLs { cid }, None) } /// Verify objects in filestore. @@ -1218,7 +1189,7 @@ impl IpfsClient { &self, cid: &Option<&str>, ) -> AsyncStreamResponse<response::FilestoreVerifyResponse> { - self.request_stream(&request::FilestoreVerify { cid }, None) + self.request_stream_json(&request::FilestoreVerify { cid }, None) } /// Download Ipfs object. @@ -1356,7 +1327,7 @@ impl IpfsClient { peer: &str, count: &Option<usize>, ) -> AsyncStreamResponse<response::PingResponse> { - self.request_stream(&request::Ping { peer, count }, None) + self.request_stream_json(&request::Ping { peer, count }, None) } /// List subscribed pubsub topics. @@ -1395,14 +1366,14 @@ impl IpfsClient { topic: &str, discover: &Option<bool>, ) -> AsyncStreamResponse<response::PubsubSubResponse> { - self.request_stream(&request::PubsubSub { topic, discover }, None) + self.request_stream_json(&request::PubsubSub { topic, discover }, None) } /// Gets a list of local references. /// #[inline] pub fn refs_local(&self) -> AsyncStreamResponse<response::RefsLocalResponse> { - self.request_stream(&request::RefsLocal, None) + self.request_stream_json(&request::RefsLocal, None) } /// Returns bitswap stats. |