summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2019-12-22 22:10:07 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2019-12-22 22:10:07 -0500
commite522d1612af5116f8143eff046eb5fa60b86d938 (patch)
treee4d02d7eb27ffaeec34f1eef14aeeeb080979d01
parent43a543840d23d9ee1e9c4baaa6f8066e3ba69eaf (diff)
fleshing out almost all of the core functionality
-rw-r--r--ipfs-api/src/client.rs365
-rw-r--r--ipfs-api/src/lib.rs8
2 files changed, 192 insertions, 181 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 1a02fc7..65aba7e 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -43,19 +43,12 @@ use std::{
};
use tokio_util::codec::{Decoder, FramedRead};
-/// A response returned by the HTTP client.
-///
-#[cfg(feature = "actix")]
-type AsyncResponse<T> = Box<dyn Future<Output = Result<T, Error>> + 'static>;
-#[cfg(feature = "hyper")]
-type AsyncResponse<T> = Box<dyn Future<Output = Result<T, Error>> + Send + 'static>;
-
/// A future that returns a stream of responses.
///
#[cfg(feature = "actix")]
-type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + 'static>;
+type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + Unpin + 'static>;
#[cfg(feature = "hyper")]
-type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + Send + 'static>;
+type AsyncStreamResponse<T> = Box<dyn Stream<Item = Result<T, Error>> + Unpin + Send + 'static>;
#[cfg(feature = "actix")]
type Request = awc::ClientRequest;
@@ -165,39 +158,42 @@ impl IpfsClient {
Req::PATH,
::serde_urlencoded::to_string(req)?
);
+
#[cfg(feature = "hyper")]
- let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| {
- let builder = http::Request::builder();
- let builder = builder.method(Req::METHOD.clone()).uri(url);
+ {
+ url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let builder = http::Request::builder();
+ let builder = builder.method(Req::METHOD.clone()).uri(url);
- let req = if let Some(form) = form {
- form.set_body_convert::<hyper::Body, multipart::Body>(builder)
- } else {
- builder.body(hyper::Body::empty())
- };
+ let req = if let Some(form) = form {
+ form.set_body_convert::<hyper::Body, multipart::Body>(builder)
+ } else {
+ builder.body(hyper::Body::empty())
+ };
- req.map_err(From::from)
- });
+ req.map_err(From::from)
+ })
+ }
#[cfg(feature = "actix")]
- let req = if let Some(form) = form {
- Ok(self
- .client
- .request(Req::METHOD.clone(), url)
- .content_type(form.content_type()))
- } else {
- Ok(self.client.request(Req::METHOD.clone(), url))
- };
-
- req
+ {
+ if let Some(form) = form {
+ Ok(self
+ .client
+ .request(Req::METHOD.clone(), url)
+ .content_type(form.content_type()))
+ } else {
+ Ok(self.client.request(Req::METHOD.clone(), url))
+ }
+ }
}
/// Builds an Api error from a response body.
///
#[inline]
- fn build_error_from_body(chunk: Bytes) -> Error {
- match serde_json::from_slice(&chunk) {
+ fn process_error_from_body(body: Bytes) -> Error {
+ match serde_json::from_slice(&body) {
Ok(e) => Error::Api(e),
- Err(_) => match String::from_utf8(chunk.to_vec()) {
+ Err(_) => match String::from_utf8(body.to_vec()) {
Ok(s) => Error::Uncategorized(s),
Err(e) => e.into(),
},
@@ -207,13 +203,13 @@ impl IpfsClient {
/// Processes a response that expects a json encoded body, returning an
/// error or a deserialized json response.
///
- fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error>
+ fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
- StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
+ StatusCode::OK => serde_json::from_slice(&body).map_err(From::from),
+ _ => Err(Self::process_error_from_body(body)),
}
}
@@ -229,12 +225,13 @@ impl IpfsClient {
Res: 'static,
{
#[cfg(feature = "hyper")]
- let stream = FramedRead::new(StreamReader::new(res.into_body()), decoder);
-
+ {
+ FramedRead::new(StreamReader::new(res.into_body()), decoder)
+ }
#[cfg(feature = "actix")]
- let stream = FramedRead::new(StreamReader::new(res), decoder);
-
- stream
+ {
+ FramedRead::new(StreamReader::new(res), decoder)
+ }
}
/// Generates a request, and returns the unprocessed response future.
@@ -243,101 +240,112 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<(StatusCode, Bytes), Error>> + 'static
+ ) -> impl Future<Output = Result<(StatusCode, Bytes), Error>>
where
Req: ApiRequest + Serialize,
{
- let client = self.client.clone();
-
- future::ready(self.build_base_request(req, form))
- .and_then(move |req| {
- #[cfg(feature = "hyper")]
- let res = client
- .request(req)
- .and_then(|res| {
- let status = res.status();
-
- body::to_bytes(res.into_body()).map_ok(move |body| (status, body))
- })
- .err_into();
-
- #[cfg(feature = "actix")]
- let res = req
- .timeout(std::time::Duration::from_secs(90))
- .send()
- .err_into()
- .and_then(|mut res| {
- let status = res.status();
+ let request = future::ready(self.build_base_request(req, form));
- res.body().map_ok(move |body| (status, body)).err_into()
- });
+ #[cfg(feature = "hyper")]
+ {
+ let client = self.client.clone();
- res
- })
- .err_into()
+ request
+ .and_then(move |req| client.request(req).err_into())
+ .and_then(|res| {
+ let status = res.status();
+
+ body::to_bytes(res.into_body())
+ .map_ok(move |body| (status, body))
+ .err_into()
+ })
+ }
+ #[cfg(feature = "actix")]
+ {
+ request
+ .and_then(|req| {
+ req.timeout(std::time::Duration::from_secs(90))
+ .send()
+ .err_into()
+ })
+ .and_then(|mut res| {
+ let status = res.status();
+
+ res.body().map_ok(move |body| (status, body)).err_into()
+ })
+ }
}
- /*
- /// Generic method for making a request that expects back a streaming
- /// response.
- ///
- fn request_stream<Req, Res, F>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- process: F,
- ) -> AsyncStreamResponse<Res>
- where
- Req: ApiRequest + Serialize,
- Res: 'static + Send,
- F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
+ /// Generic method for making a request that expects back a streaming
+ /// response.
+ ///
+ fn request_stream<Req, Res, F>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ process: F,
+ ) -> impl Stream<Item = Result<Res, Error>> + '_
+ where
+ Req: 'static + ApiRequest + Serialize,
+ Res: 'static + Send,
+ F: 'static + Fn(Response) -> AsyncStreamResponse<Res>,
+ {
+ let request = future::ready(self.build_base_request(req, form));
+
+ #[cfg(feature = "hyper")]
{
- #[cfg(feature = "hyper")]
- match self.build_base_request(req, form) {
- Ok(req) => {
- let res = self
- .client
- .request(req)
- .from_err()
- .map(move |res| {
- let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> =
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- res.into_body()
- .concat2()
- .from_err()
- .and_then(|chunk| {
- Err(Self::build_error_from_body(chunk.into_bytes()))
- })
- .into_stream(),
- ),
- };
-
- stream
- })
- .flatten_stream();
- Box::new(res)
- }
- Err(e) => Box::new(stream::once(Err(e))),
- }
- #[cfg(feature = "actix")]
- match self.build_base_request(req, form) {
- Ok(req) => {
- let res = req
- .timeout(std::time::Duration::from_secs(90))
+ let client = self.client.clone();
+
+ request
+ .and_then(move |req| self.client.request(req).err_into())
+ .map_ok(move |res| {
+ match res.status() {
+ StatusCode::OK => process(res),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ body::to_bytes(res.into_body())
+ .boxed()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream(),
+ ),
+ }
+ })
+ .try_flatten_stream()
+ }
+ #[cfg(feature = "actix")]
+ {
+ request
+ .and_then(|req| {
+ req.timeout(std::time::Duration::from_secs(90))
.send()
- .from_err();
- Box::new(res.map(process).flatten_stream())
- }
- Err(e) => Box::new(stream::once(Err(e))),
- }
+ .err_into()
+ })
+ .map_ok(move |mut res| {
+ match res.status() {
+ StatusCode::OK => process(res),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => Box::new(
+ res.body()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream(),
+ ),
+ }
+ })
+ .try_flatten_stream()
}
- */
+ }
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
@@ -346,9 +354,9 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<Res, Error>> + 'static
+ ) -> impl Future<Output = Result<Res, Error>>
where
- Req: ApiRequest + Serialize + 'static,
+ Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
self.request_raw(req, form).map(|res| {
@@ -363,60 +371,59 @@ impl IpfsClient {
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
- ) -> impl Future<Output = Result<(), Error>> + 'static
+ ) -> impl Future<Output = Result<(), Error>>
where
- Req: ApiRequest + Serialize + 'static,
+ Req: ApiRequest + Serialize,
{
self.request_raw(req, form).map(|res| {
res.and_then(|(status, chunk)| match status {
StatusCode::OK => Ok(()),
- _ => Err(Self::build_error_from_body(chunk)),
+ _ => Err(Self::process_error_from_body(chunk)),
})
})
}
- /*
- /// Generic method for making a request to the Ipfs server, and getting
- /// back a raw String response.
- ///
- fn request_string<Req>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<String>
- where
- Req: ApiRequest + Serialize,
- {
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| match status {
- StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
- });
-
- Box::new(res)
- }
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw String response.
+ ///
+ fn request_string<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> impl Future<Output = Result<String, Error>>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ self.request_raw(req, form).map(|res| {
+ res.and_then(|(status, chunk)| match status {
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ _ => Err(Self::process_error_from_body(chunk)),
+ })
+ })
+ }
- /// Generic method for making a request to the Ipfs server, and getting
- /// back a raw stream of bytes.
- ///
- fn request_stream_bytes<Req>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Bytes>
- where
- Req: ApiRequest + Serialize,
- {
- #[cfg(feature = "hyper")]
- let res = self.request_stream(req, form, |res| {
- Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
- });
- #[cfg(feature = "actix")]
- let res = self.request_stream(req, form, |res| Box::new(res.from_err()));
- res
- }
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw stream of bytes.
+ ///
+ fn request_stream_bytes<Req>(
+ &self,
+ req: &Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> impl Stream<Item = Result<Bytes, Error>> + '_
+ where
+ Req: 'static + ApiRequest + Serialize,
+ {
+ #[cfg(feature = "hyper")]
+ {
+ self.request_stream(req, form, |res| Box::new(res.into_body().err_into()))
+ }
+ #[cfg(feature = "actix")]
+ {
+ self.request_stream(req, form, |res| Box::new(res.err_into()))
+ }
+ }
+ /*
/// Generic method to return a streaming response of deserialized json
/// objects delineated by new line separators.
///
@@ -461,6 +468,7 @@ impl IpfsClient {
*/
}
+
impl IpfsClient {
/// Add file to Ipfs.
///
@@ -570,7 +578,9 @@ impl IpfsClient {
.map(|mut responses: Vec<response::AddResponse>| responses.pop().unwrap()),
)
}
+ */
+ /*
/// Returns the current ledger for a peer.
///
/// # Examples
@@ -587,9 +597,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse<response::BitswapLedgerResponse> {
+ pub fn bitswap_ledger(
+ &self,
+ peer: &str,
+ ) -> impl Future<Output = Result<response::BitswapLedgerResponse, Error>> {
self.request(&request::BitswapLedger { peer }, None)
}
+ */
/// Triggers a reprovide.
///
@@ -607,7 +621,9 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn bitswap_reprovide(&self) -> AsyncResponse<response::BitswapReprovideResponse> {
+ pub fn bitswap_reprovide(
+ &self,
+ ) -> impl Future<Output = Result<response::BitswapReprovideResponse, Error>> {
self.request_empty(&request::BitswapReprovide, None)
}
@@ -627,10 +643,13 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn bitswap_stat(&self) -> AsyncResponse<response::BitswapStatResponse> {
+ pub fn bitswap_stat(
+ &self,
+ ) -> impl Future<Output = Result<response::BitswapStatResponse, Error>> {
self.request(&request::BitswapStat, None)
}
+ /*
/// Remove a given block from your wantlist.
///
/// # Examples
diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs
index f7b7947..03f43c3 100644
--- a/ipfs-api/src/lib.rs
+++ b/ipfs-api/src/lib.rs
@@ -168,22 +168,14 @@
//!
#[cfg(feature = "actix")]
-extern crate actix_http;
-#[cfg(feature = "actix")]
extern crate actix_multipart_rfc7578 as actix_multipart;
#[cfg(feature = "actix")]
-extern crate awc;
-#[cfg(feature = "actix")]
#[macro_use]
extern crate derive_more;
#[cfg(feature = "hyper")]
-extern crate hyper;
-#[cfg(feature = "hyper")]
extern crate hyper_multipart_rfc7578 as hyper_multipart;
#[cfg(feature = "hyper")]
-extern crate hyper_tls;
-#[cfg(feature = "hyper")]
#[macro_use]
extern crate failure;