summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-12-01 00:20:44 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-12-01 00:20:44 -0500
commit5cb64f09b0b90f464d98f1e60a3b040ec231bd28 (patch)
tree2fdeec7517e6ee0606d09cf8c0d7a8e6136a6519
parent9b0b950414fbde2170a00220b8aa2ea6fd8860d2 (diff)
reduce some duplicate code, and remove unnecessary functions
-rw-r--r--ipfs-api/src/client.rs219
1 files changed, 95 insertions, 124 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index a31bead..e4fef28 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -6,8 +6,8 @@
// copied, modified, or distributed except according to those terms.
//
-use futures::Stream;
use futures::future::{Future, IntoFuture};
+use futures::stream::{self, Stream};
use header::Trailer;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
@@ -145,58 +145,77 @@ impl IpfsClient {
Box::new(stream)
}
- /// Sends a request and returns the raw response.
- ///
- /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder`
- /// instance.
- ///
- fn send_request(&self, req: Request<multipart::Body>) -> AsyncResponse<(StatusCode, Chunk)> {
- let res = self.client
- .request(req)
- .and_then(|res| {
- let status = res.status();
-
- res.body().concat2().map(move |chunk| (status, chunk))
- })
- .from_err();
-
- Box::new(res)
- }
-
- /// Sends a request and deserializes the response into Json.
- ///
- /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder`
- /// instance.
+ /// Generates a request, and returns the unprocessed response future.
///
- fn send_request_json<Res>(&self, req: Request<multipart::Body>) -> AsyncResponse<Res>
+ fn request_raw<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form>,
+ ) -> AsyncResponse<(StatusCode, Chunk)>
where
- for<'de> Res: 'static + Deserialize<'de>,
+ Req: ApiRequest + Serialize,
{
- let res = self.send_request(req).into_future().and_then(
- |(status, chunk)| {
- IpfsClient::process_json_response(status, chunk)
- },
- );
-
- Box::new(res)
+ match self.build_base_request(req, form) {
+ Ok(req) => {
+ let res = self.client
+ .request(req)
+ .and_then(|res| {
+ let status = res.status();
+
+ res.body().concat2().map(move |chunk| (status, chunk))
+ })
+ .from_err();
+
+ Box::new(res)
+ }
+ Err(e) => Box::new(Err(e).into_future()),
+ }
}
- /// Generates a request, and returns the unprocessed response future.
+ /// Generic method for making a request that expects back a streaming
+ /// response.
///
- fn request_raw<Req>(
+ fn request_stream<Req, Res, F>(
&self,
req: &Req,
form: Option<multipart::Form>,
- ) -> AsyncResponse<(StatusCode, Chunk)>
+ process: F,
+ ) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
+ Res: 'static,
+ F: 'static + Fn(hyper::Response) -> AsyncStreamResponse<Res>,
{
- let res = self.build_base_request(req, form)
- .map(|req| self.send_request(req))
- .into_future()
- .flatten();
+ match self.build_base_request(req, form) {
+ Ok(req) => {
+ let res = self.client
+ .request(req)
+ .from_err()
+ .map(move |res| {
+ let stream: Box<Stream<Item = Res, Error = _>> = match res.status() {
+ StatusCode::Ok => process(res),
+
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ res.body()
+ .concat2()
+ .from_err()
+ .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
+ .into_stream(),
+ ),
+ };
- Box::new(res)
+ stream
+ })
+ .flatten_stream();
+
+ Box::new(res)
+ }
+ Err(e) => Box::new(stream::once(Err(e))),
+ }
}
/// Generic method for making a request to the Ipfs server, and getting
@@ -207,10 +226,9 @@ impl IpfsClient {
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
{
- let res = self.build_base_request(req, form)
- .map(|req| self.send_request_json(req))
- .into_future()
- .flatten();
+ let res = self.request_raw(req, form).and_then(|(status, chunk)| {
+ IpfsClient::process_json_response(status, chunk)
+ });
Box::new(res)
}
@@ -249,6 +267,7 @@ impl IpfsClient {
Box::new(res)
}
+
/// Generic method for making a request to the Ipfs server, and getting
/// back a raw stream of bytes.
///
@@ -260,41 +279,13 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
{
- let res = self.build_base_request(req, form)
- .map(|req| self.client.request(req).from_err())
- .into_future()
- .flatten()
- .map(|res| {
- let stream: Box<Stream<Item = Chunk, Error = _>> = match res.status() {
- // If the server responded OK, the data can be streamed back.
- //
- StatusCode::Ok => Box::new(res.body().map(|chunk| chunk).from_err()),
-
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- res.body()
- .concat2()
- .from_err()
- .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
- .into_stream(),
- ),
-
- };
-
- stream
- })
- .flatten_stream();
-
- Box::new(res)
+ self.request_stream(req, form, |res| Box::new(res.body().from_err()))
}
/// Generic method to return a streaming response of deserialized json
/// objects delineated by new line separators.
///
- fn request_stream<Req, Res>(
+ fn request_stream_json<Req, Res>(
&self,
req: &Req,
form: Option<multipart::Form>,
@@ -303,44 +294,24 @@ impl IpfsClient {
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
{
- let res = self.build_base_request(req, form)
- .map(|req| self.client.request(req).from_err())
- .into_future()
- .flatten()
- .map(|res| {
- let stream: Box<Stream<Item = Res, Error = _>> = match res.status() {
- StatusCode::Ok => {
- let parse_stream_error = if let Some(trailer) = res.headers().get() {
- // Response has the Trailer header set. The StreamError trailer
- // is used to indicate that there was an error while streaming
- // data with Ipfs.
- //
- match trailer {
- &Trailer::StreamError => true,
- }
- } else {
- false
- };
-
- Box::new(IpfsClient::process_stream_response(
- res,
- JsonLineDecoder::new(parse_stream_error),
- ))
- }
- _ => Box::new(
- res.body()
- .concat2()
- .from_err()
- .and_then(|chunk| Err(Self::build_error_from_body(chunk)))
- .into_stream(),
- ),
- };
-
- stream
- })
- .flatten_stream();
-
- Box::new(res)
+ self.request_stream(req, form, |res| {
+ let parse_stream_error = if let Some(trailer) = res.headers().get() {
+ // Response has the Trailer header set. The StreamError trailer
+ // is used to indicate that there was an error while streaming
+ // data with Ipfs.
+ //
+ match trailer {
+ &Trailer::StreamError => true,
+ }
+ } else {
+ false
+ };
+
+ Box::new(IpfsClient::process_stream_response(
+ res,
+ JsonLineDecoder::new(parse_stream_error),
+ ))
+ })
}
}
@@ -844,7 +815,7 @@ impl IpfsClient {
///
#[inline]
pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse<response::DhtFindPeerResponse> {
- self.request_stream(&request::DhtFindPeer { peer }, None)
+ self.request_stream_json(&request::DhtFindPeer { peer }, None)
}
/// Find peers in the DHT that can provide a specific value given a key.
@@ -869,7 +840,7 @@ impl IpfsClient {
///
#[inline]
pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse<response::DhtFindProvsResponse> {
- self.request_stream(&request::DhtFindProvs { key }, None)
+ self.request_stream_json(&request::DhtFindProvs { key }, None)
}
/// Query the DHT for a given key.
@@ -894,7 +865,7 @@ impl IpfsClient {
///
#[inline]
pub fn dht_get(&self, key: &str) -> AsyncStreamResponse<response::DhtGetResponse> {
- self.request_stream(&request::DhtGet { key }, None)
+ self.request_stream_json(&request::DhtGet { key }, None)
}
/// Announce to the network that you are providing a given value.
@@ -919,7 +890,7 @@ impl IpfsClient {
///
#[inline]
pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> {
- self.request_stream(&request::DhtProvide { key }, None)
+ self.request_stream_json(&request::DhtProvide { key }, None)
}
/// Write a key/value pair to the DHT.
@@ -943,7 +914,7 @@ impl IpfsClient {
///
#[inline]
pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse<response::DhtPutResponse> {
- self.request_stream(&request::DhtPut { key, value }, None)
+ self.request_stream_json(&request::DhtPut { key, value }, None)
}
/// Find the closest peer given the peer ID by querying the DHT.
@@ -968,7 +939,7 @@ impl IpfsClient {
///
#[inline]
pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse<response::DhtQueryResponse> {
- self.request_stream(&request::DhtQuery { peer }, None)
+ self.request_stream_json(&request::DhtQuery { peer }, None)
}
/// Clear inactive requests from the log.
@@ -1198,7 +1169,7 @@ impl IpfsClient {
///
#[inline]
pub fn filestore_dups(&self) -> AsyncStreamResponse<response::FilestoreDupsResponse> {
- self.request_stream(&request::FilestoreDups, None)
+ self.request_stream_json(&request::FilestoreDups, None)
}
/// List objects in filestore.
@@ -1208,7 +1179,7 @@ impl IpfsClient {
&self,
cid: &Option<&str>,
) -> AsyncStreamResponse<response::FilestoreLsResponse> {
- self.request_stream(&request::FilestoreLs { cid }, None)
+ self.request_stream_json(&request::FilestoreLs { cid }, None)
}
/// Verify objects in filestore.
@@ -1218,7 +1189,7 @@ impl IpfsClient {
&self,
cid: &Option<&str>,
) -> AsyncStreamResponse<response::FilestoreVerifyResponse> {
- self.request_stream(&request::FilestoreVerify { cid }, None)
+ self.request_stream_json(&request::FilestoreVerify { cid }, None)
}
/// Download Ipfs object.
@@ -1356,7 +1327,7 @@ impl IpfsClient {
peer: &str,
count: &Option<usize>,
) -> AsyncStreamResponse<response::PingResponse> {
- self.request_stream(&request::Ping { peer, count }, None)
+ self.request_stream_json(&request::Ping { peer, count }, None)
}
/// List subscribed pubsub topics.
@@ -1395,14 +1366,14 @@ impl IpfsClient {
topic: &str,
discover: &Option<bool>,
) -> AsyncStreamResponse<response::PubsubSubResponse> {
- self.request_stream(&request::PubsubSub { topic, discover }, None)
+ self.request_stream_json(&request::PubsubSub { topic, discover }, None)
}
/// Gets a list of local references.
///
#[inline]
pub fn refs_local(&self) -> AsyncStreamResponse<response::RefsLocalResponse> {
- self.request_stream(&request::RefsLocal, None)
+ self.request_stream_json(&request::RefsLocal, None)
}
/// Returns bitswap stats.