summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2019-12-22 23:15:27 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2019-12-22 23:15:27 -0500
commit3065848300cc6fd19693ab86344d99c06a55a6fa (patch)
tree4b75b27e97b730c7f392140557f18ca83d71c801
parent143e072e4f6e107950ab6e7eb2de1bd57de65221 (diff)
get all the core methods working!
-rw-r--r--ipfs-api/src/client.rs191
1 files changed, 96 insertions, 95 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 0e4c0c2..7ddc205 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -17,7 +17,7 @@ use actix_http::{encoding, Payload, PayloadStream};
use actix_multipart::client::multipart;
use bytes::Bytes;
use futures::{
- future, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt,
+ future, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
};
use http::{
uri::{InvalidUri, Uri},
@@ -121,6 +121,7 @@ impl IpfsClient {
}
/// Creates a new `IpfsClient` for any given URI.
+ ///
#[inline]
pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(uri)?;
@@ -287,62 +288,66 @@ impl IpfsClient {
req: Req,
form: Option<multipart::Form<'static>>,
process: F,
- ) -> impl Stream<Item = Result<Res, Error>>
+ ) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
- F: 'static + Fn(Response) -> AsyncStreamResponse<Res>,
+ F: 'static + Send + Fn(Response) -> AsyncStreamResponse<Res>,
{
let request = future::ready(self.build_base_request(req, form));
- #[cfg(feature = "hyper")]
- {
- let client = self.client.clone();
+ let response = {
+ #[cfg(feature = "hyper")]
+ {
+ let client = self.client.clone();
+
+ request
+ .and_then(move |req| client.request(req).err_into())
+ .map_ok(move |res| {
+ match res.status() {
+ StatusCode::OK => process(res),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ body::to_bytes(res.into_body())
+ .boxed()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream(),
+ ),
+ }
+ })
+ .try_flatten_stream()
+ }
+ #[cfg(feature = "actix")]
+ {
+ request
+ .and_then(|req| req.send().err_into())
+ .map_ok(move |mut res| {
+ match res.status() {
+ StatusCode::OK => process(res),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ res.body()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream(),
+ ),
+ }
+ })
+ .try_flatten_stream()
+ }
+ };
- request
- .and_then(move |req| client.request(req).err_into())
- .map_ok(move |res| {
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- body::to_bytes(res.into_body())
- .boxed()
- .map(|maybe_body| match maybe_body {
- Ok(body) => Err(Self::process_error_from_body(body)),
- Err(e) => Err(e.into()),
- })
- .into_stream(),
- ),
- }
- })
- .try_flatten_stream()
- }
- #[cfg(feature = "actix")]
- {
- request
- .and_then(|req| req.send().err_into())
- .map_ok(move |mut res| {
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- res.body()
- .map(|maybe_body| match maybe_body {
- Ok(body) => Err(Self::process_error_from_body(body)),
- Err(e) => Err(e.into()),
- })
- .into_stream(),
- ),
- }
- })
- .try_flatten_stream()
- }
+ Box::new(response)
}
/// Generic method for making a request to the Ipfs server, and getting
@@ -407,7 +412,7 @@ impl IpfsClient {
&self,
req: Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Stream<Item = Result<Bytes, Error>>
+ ) -> AsyncStreamResponse<Bytes>
where
Req: ApiRequest + Serialize,
{
@@ -421,50 +426,46 @@ impl IpfsClient {
}
}
- /*
- /// Generic method to return a streaming response of deserialized json
- /// objects delineated by new line separators.
- ///
- fn request_stream_json<Req, Res>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Res>
- where
- Req: ApiRequest + Serialize,
- for<'de> Res: 'static + Deserialize<'de> + Send,
- {
- self.request_stream(req, form, |res| {
- let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
- // Response has the Trailer header set. The StreamError trailer
- // is used to indicate that there was an error while streaming
- // data with Ipfs.
- //
- if trailer == "X-Stream-Error" {
- true
- } else {
- let err = Error::UnrecognizedTrailerHeader(
- String::from_utf8_lossy(trailer.as_ref()).into(),
- );
-
- // There was an unrecognized trailer value. If that is the case,
- // create a stream that immediately errors.
- //
- return Box::new(stream::once(Err(err)));
- }
- } else {
- false
- };
-
- Box::new(IpfsClient::process_stream_response(
- res,
- JsonLineDecoder::new(parse_stream_error),
- ))
- })
- }
- }
+ /// Generic method to return a streaming response of deserialized json
+ /// objects delineated by new line separators.
+ ///
+ fn request_stream_json<Req, Res>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncStreamResponse<Res>
+ where
+ Req: ApiRequest + Serialize,
+ for<'de> Res: 'static + Deserialize<'de> + Send,
+ {
+ self.request_stream(req, form, |res| {
+ let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
+ // Response has the Trailer header set. The StreamError trailer
+ // is used to indicate that there was an error while streaming
+ // data with Ipfs.
+ //
+ if trailer == "X-Stream-Error" {
+ true
+ } else {
+ let err = Error::UnrecognizedTrailerHeader(
+ String::from_utf8_lossy(trailer.as_ref()).into(),
+ );
+
+ // There was an unrecognized trailer value. If that is the case,
+ // create a stream that immediately errors.
+ //
+ return Box::new(future::err(err).into_stream());
+ }
+ } else {
+ false
+ };
- */
+ Box::new(IpfsClient::process_stream_response(
+ res,
+ JsonLineDecoder::new(parse_stream_error),
+ ))
+ })
+ }
}
impl IpfsClient {