summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2019-12-22 02:22:52 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2019-12-22 02:22:52 -0500
commit43a543840d23d9ee1e9c4baaa6f8066e3ba69eaf (patch)
tree2985f4d9d3fc9d05d24b75048f5e94fa4138343e
parent27c25f96ac923bb4d6e02890eae08752f1c8228e (diff)
beginning to upgrade client, but blergh
-rw-r--r--ipfs-api/src/client.rs408
1 files changed, 208 insertions, 200 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 5d44c7c..1a02fc7 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -5,26 +5,31 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//
-use crate::header::TRAILER;
-#[cfg(feature = "hyper")]
-use crate::hyper_multipart::client::multipart;
-use crate::read::{JsonLineDecoder, LineDecoder, StreamReader};
-use crate::request::{self, ApiRequest};
-use crate::response::{self, Error};
+use crate::{
+ header::TRAILER,
+ read::{JsonLineDecoder, LineDecoder, StreamReader},
+ request::{self, ApiRequest},
+ response::{self, Error},
+};
#[cfg(feature = "actix")]
use actix_http::{encoding, Payload, PayloadStream};
#[cfg(feature = "actix")]
use actix_multipart::client::multipart;
use bytes::Bytes;
use futures::{
- future,
- stream::{self, Stream},
- Future, IntoFuture,
+ future, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt,
+};
+use http::{
+ uri::{InvalidUri, Uri},
+ StatusCode,
};
-use http::uri::{InvalidUri, Uri};
-use http::StatusCode;
#[cfg(feature = "hyper")]
-use hyper::client::{self, Builder, HttpConnector};
+use hyper::{
+ body,
+ client::{self, Builder, HttpConnector},
+};
+#[cfg(feature = "hyper")]
+use hyper_multipart::client::multipart;
#[cfg(feature = "hyper")]
use hyper_tls::HttpsConnector;
use multiaddr::{AddrComponent, ToMultiaddr};
@@ -36,21 +41,21 @@ use std::{
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
};
-use tokio_codec::{Decoder, FramedRead};
+use tokio_util::codec::{Decoder, FramedRead};
/// A response returned by the HTTP client.
///
#[cfg(feature = "actix")]
-type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + 'static>;
+type AsyncResponse<T> = Box<dyn Future<Output = Result<T, Error>> + 'static>;
#[cfg(feature = "hyper")]
-type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + Send + 'static>;
+type AsyncResponse<T> = Box<dyn Future<Output = Result<T, Error>> + Send + 'static>;
/// A future that returns a stream of responses.
///
#[cfg(feature = "actix")]
-type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + 'static>;
+type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + 'static>;
#[cfg(feature = "hyper")]
-type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>;
+type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + Send + 'static>;
#[cfg(feature = "actix")]
type Request = awc::ClientRequest;
@@ -130,10 +135,9 @@ impl IpfsClient {
Ok(IpfsClient {
base: base_path,
#[cfg(feature = "hyper")]
- client: {
- let connector = HttpsConnector::new(4).unwrap();
- Builder::default().keep_alive(false).build(connector)
- },
+ client: Builder::default()
+ .keep_alive(false)
+ .build(HttpsConnector::new()),
#[cfg(feature = "actix")]
client: Client::default(),
})
@@ -163,11 +167,11 @@ impl IpfsClient {
);
#[cfg(feature = "hyper")]
let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| {
- let mut builder = http::Request::builder();
- let mut builder = builder.method(Req::METHOD.clone()).uri(url);
+ let builder = http::Request::builder();
+ let builder = builder.method(Req::METHOD.clone()).uri(url);
let req = if let Some(form) = form {
- form.set_body_convert::<hyper::Body, multipart::Body>(&mut builder)
+ form.set_body_convert::<hyper::Body, multipart::Body>(builder)
} else {
builder.body(hyper::Body::empty())
};
@@ -216,21 +220,21 @@ impl IpfsClient {
/// Processes a response that returns a stream of json deserializable
/// results.
///
- fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res>
+ fn process_stream_response<D, Res>(
+ res: Response,
+ decoder: D,
+ ) -> impl Stream<Item = Result<Res, Error>>
where
D: 'static + Decoder<Item = Res, Error = Error> + Send,
Res: 'static,
{
#[cfg(feature = "hyper")]
- let stream = FramedRead::new(
- StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()),
- decoder,
- );
+ let stream = FramedRead::new(StreamReader::new(res.into_body()), decoder);
#[cfg(feature = "actix")]
- let stream = FramedRead::new(StreamReader::new(res.from_err()), decoder);
+ let stream = FramedRead::new(StreamReader::new(res), decoder);
- Box::new(stream)
+ stream
}
/// Generates a request, and returns the unprocessed response future.
@@ -239,98 +243,101 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<(StatusCode, Bytes)>
+ ) -> impl Future<Output = Result<(StatusCode, Bytes), Error>> + 'static
where
Req: ApiRequest + Serialize,
{
- match self.build_base_request(req, form) {
- Ok(req) => {
+ let client = self.client.clone();
+
+ future::ready(self.build_base_request(req, form))
+ .and_then(move |req| {
#[cfg(feature = "hyper")]
- let res = self
- .client
+ let res = client
.request(req)
.and_then(|res| {
let status = res.status();
- res.into_body()
- .concat2()
- .map(move |chunk| (status, chunk.into_bytes()))
+ body::to_bytes(res.into_body()).map_ok(move |body| (status, body))
})
- .from_err();
+ .err_into();
+
#[cfg(feature = "actix")]
let res = req
.timeout(std::time::Duration::from_secs(90))
.send()
- .from_err()
- .and_then(|mut x| {
- let status = x.status();
- x.body().map(move |body| (status, body)).from_err()
+ .err_into()
+ .and_then(|mut res| {
+ let status = res.status();
+
+ res.body().map_ok(move |body| (status, body)).err_into()
});
- Box::new(res)
- }
- Err(e) => Box::new(Err(e).into_future()),
- }
- }
- /// Generic method for making a request that expects back a streaming
- /// response.
- ///
- fn request_stream<Req, Res, F>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- process: F,
- ) -> AsyncStreamResponse<Res>
- where
- Req: ApiRequest + Serialize,
- Res: 'static + Send,
- F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
- {
- #[cfg(feature = "hyper")]
- match self.build_base_request(req, form) {
- Ok(req) => {
- let res = self
- .client
- .request(req)
- .from_err()
- .map(move |res| {
- let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> =
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- res.into_body()
- .concat2()
- .from_err()
- .and_then(|chunk| {
- Err(Self::build_error_from_body(chunk.into_bytes()))
- })
- .into_stream(),
- ),
- };
-
- stream
- })
- .flatten_stream();
- Box::new(res)
+ res
+ })
+ .err_into()
+ }
+
+ /*
+ /// Generic method for making a request that expects back a streaming
+ /// response.
+ ///
+ fn request_stream<Req, Res, F>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ process: F,
+ ) -> AsyncStreamResponse<Res>
+ where
+ Req: ApiRequest + Serialize,
+ Res: 'static + Send,
+ F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
+ {
+ #[cfg(feature = "hyper")]
+ match self.build_base_request(req, form) {
+ Ok(req) => {
+ let res = self
+ .client
+ .request(req)
+ .from_err()
+ .map(move |res| {
+ let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> =
+ match res.status() {
+ StatusCode::OK => process(res),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ res.into_body()
+ .concat2()
+ .from_err()
+ .and_then(|chunk| {
+ Err(Self::build_error_from_body(chunk.into_bytes()))
+ })
+ .into_stream(),
+ ),
+ };
+
+ stream
+ })
+ .flatten_stream();
+ Box::new(res)
+ }
+ Err(e) => Box::new(stream::once(Err(e))),
}
- Err(e) => Box::new(stream::once(Err(e))),
- }
- #[cfg(feature = "actix")]
- match self.build_base_request(req, form) {
- Ok(req) => {
- let res = req
- .timeout(std::time::Duration::from_secs(90))
- .send()
- .from_err();
- Box::new(res.map(process).flatten_stream())
+ #[cfg(feature = "actix")]
+ match self.build_base_request(req, form) {
+ Ok(req) => {
+ let res = req
+ .timeout(std::time::Duration::from_secs(90))
+ .send()
+ .from_err();
+ Box::new(res.map(process).flatten_stream())
+ }
+ Err(e) => Box::new(stream::once(Err(e))),
}
- Err(e) => Box::new(stream::once(Err(e))),
}
- }
+ */
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
@@ -339,16 +346,14 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<Res>
+ ) -> impl Future<Output = Result<Res, Error>> + 'static
where
- Req: ApiRequest + Serialize,
+ Req: ApiRequest + Serialize + 'static,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk));
-
- Box::new(res)
+ self.request_raw(req, form).map(|res| {
+ res.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk))
+ })
}
/// Generic method for making a request to the Ipfs server, and getting
@@ -358,103 +363,104 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<()>
+ ) -> impl Future<Output = Result<(), Error>> + 'static
where
- Req: ApiRequest + Serialize,
+ Req: ApiRequest + Serialize + 'static,
{
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| match status {
+ self.request_raw(req, form).map(|res| {
+ res.and_then(|(status, chunk)| match status {
StatusCode::OK => Ok(()),
_ => Err(Self::build_error_from_body(chunk)),
- });
-
- Box::new(res)
+ })
+ })
}
- /// Generic method for making a request to the Ipfs server, and getting
- /// back a raw String response.
- ///
- fn request_string<Req>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<String>
- where
- Req: ApiRequest + Serialize,
- {
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| match status {
- StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
- });
+ /*
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw String response.
+ ///
+ fn request_string<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncResponse<String>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let res = self
+ .request_raw(req, form)
+ .and_then(|(status, chunk)| match status {
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ _ => Err(Self::build_error_from_body(chunk)),
+ });
- Box::new(res)
- }
+ Box::new(res)
+ }
- /// Generic method for making a request to the Ipfs server, and getting
- /// back a raw stream of bytes.
- ///
- fn request_stream_bytes<Req>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Bytes>
- where
- Req: ApiRequest + Serialize,
- {
- #[cfg(feature = "hyper")]
- let res = self.request_stream(req, form, |res| {
- Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
- });
- #[cfg(feature = "actix")]
- let res = self.request_stream(req, form, |res| Box::new(res.from_err()));
- res
- }
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw stream of bytes.
+ ///
+ fn request_stream_bytes<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncStreamResponse<Bytes>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ #[cfg(feature = "hyper")]
+ let res = self.request_stream(req, form, |res| {
+ Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
+ });
+ #[cfg(feature = "actix")]
+ let res = self.request_stream(req, form, |res| Box::new(res.from_err()));
+ res
+ }
- /// Generic method to return a streaming response of deserialized json
- /// objects delineated by new line separators.
- ///
- fn request_stream_json<Req, Res>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Res>
- where
- Req: ApiRequest + Serialize,
- for<'de> Res: 'static + Deserialize<'de> + Send,
- {
- self.request_stream(req, form, |res| {
- let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
- // Response has the Trailer header set. The StreamError trailer
- // is used to indicate that there was an error while streaming
- // data with Ipfs.
- //
- if trailer == "X-Stream-Error" {
- true
- } else {
- let err = Error::UnrecognizedTrailerHeader(
- String::from_utf8_lossy(trailer.as_ref()).into(),
- );
-
- // There was an unrecognized trailer value. If that is the case,
- // create a stream that immediately errors.
- //
- return Box::new(stream::once(Err(err)));
- }
- } else {
- false
- };
+ /// Generic method to return a streaming response of deserialized json
+ /// objects delineated by new line separators.
+ ///
+ fn request_stream_json<Req, Res>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> AsyncStreamResponse<Res>
+ where
+ Req: ApiRequest + Serialize,
+ for<'de> Res: 'static + Deserialize<'de> + Send,
+ {
+ self.request_stream(req, form, |res| {
+ let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
+ // Response has the Trailer header set. The StreamError trailer
+ // is used to indicate that there was an error while streaming
+ // data with Ipfs.
+ //
+ if trailer == "X-Stream-Error" {
+ true
+ } else {
+ let err = Error::UnrecognizedTrailerHeader(
+ String::from_utf8_lossy(trailer.as_ref()).into(),
+ );
+
+ // There was an unrecognized trailer value. If that is the case,
+ // create a stream that immediately errors.
+ //
+ return Box::new(stream::once(Err(err)));
+ }
+ } else {
+ false
+ };
+
+ Box::new(IpfsClient::process_stream_response(
+ res,
+ JsonLineDecoder::new(parse_stream_error),
+ ))
+ })
+ }
+ }
- Box::new(IpfsClient::process_stream_response(
- res,
- JsonLineDecoder::new(parse_stream_error),
- ))
- })
- }
+ */
}
-
impl IpfsClient {
/// Add file to Ipfs.
///
@@ -474,9 +480,9 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse>
+ pub fn add<R>(&self, data: R) -> impl Future<Output = Result<response::AddResponse, Error>>
where
- R: 'static + Read + Send,
+ R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
@@ -485,6 +491,7 @@ impl IpfsClient {
self.request(&request::Add, Some(form))
}
+ /*
/// Add a path to Ipfs. Can be a file or directory.
/// A hard limit of 128 open file descriptors is set such
/// that any small additional files are stored in-memory.
@@ -2278,4 +2285,5 @@ impl IpfsClient {
pub fn version(&self) -> AsyncResponse<response::VersionResponse> {
self.request(&request::Version, None)
}
+ */
}