summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r--ipfs-api/src/client.rs1507
1 files changed, 629 insertions, 878 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 5d44c7c..1072a33 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -5,26 +5,25 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//
-use crate::header::TRAILER;
-#[cfg(feature = "hyper")]
-use crate::hyper_multipart::client::multipart;
-use crate::read::{JsonLineDecoder, LineDecoder, StreamReader};
-use crate::request::{self, ApiRequest};
-use crate::response::{self, Error};
-#[cfg(feature = "actix")]
-use actix_http::{encoding, Payload, PayloadStream};
+use crate::{
+ header::TRAILER,
+ read::{JsonLineDecoder, LineDecoder, StreamReader},
+ request::{self, ApiRequest},
+ response::{self, Error},
+ Client, Request, Response,
+};
#[cfg(feature = "actix")]
use actix_multipart::client::multipart;
use bytes::Bytes;
-use futures::{
- future,
- stream::{self, Stream},
- Future, IntoFuture,
+use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
+use http::{
+ uri::{InvalidUri, Uri},
+ StatusCode,
};
-use http::uri::{InvalidUri, Uri};
-use http::StatusCode;
#[cfg(feature = "hyper")]
-use hyper::client::{self, Builder, HttpConnector};
+use hyper::{body, client::Builder};
+#[cfg(feature = "hyper")]
+use hyper_multipart::client::multipart;
#[cfg(feature = "hyper")]
use hyper_tls::HttpsConnector;
use multiaddr::{AddrComponent, ToMultiaddr};
@@ -32,40 +31,13 @@ use serde::{Deserialize, Serialize};
use serde_json;
use std::{
fs,
- io::Read,
+ io::{Cursor, Read},
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
};
-use tokio_codec::{Decoder, FramedRead};
-
-/// A response returned by the HTTP client.
-///
-#[cfg(feature = "actix")]
-type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + 'static>;
-#[cfg(feature = "hyper")]
-type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + Send + 'static>;
-
-/// A future that returns a stream of responses.
-///
-#[cfg(feature = "actix")]
-type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + 'static>;
-#[cfg(feature = "hyper")]
-type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>;
-
-#[cfg(feature = "actix")]
-type Request = awc::ClientRequest;
-#[cfg(feature = "hyper")]
-type Request = http::Request<hyper::Body>;
+use tokio_util::codec::{Decoder, FramedRead};
-#[cfg(feature = "actix")]
-type Response = awc::ClientResponse<encoding::Decoder<Payload<PayloadStream>>>;
-#[cfg(feature = "hyper")]
-type Response = http::Response<hyper::Body>;
-
-#[cfg(feature = "actix")]
-type Client = awc::Client;
-#[cfg(feature = "hyper")]
-type Client = client::Client<HttpsConnector<HttpConnector>, hyper::Body>;
+const FILE_DESCRIPTOR_LIMIT: usize = 127;
/// Asynchronous Ipfs client.
///
@@ -123,19 +95,26 @@ impl IpfsClient {
}
/// Creates a new `IpfsClient` for any given URI.
+ ///
#[inline]
pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(uri)?;
+ let client = {
+ #[cfg(feature = "hyper")]
+ {
+ Builder::default()
+ .keep_alive(false)
+ .build(HttpsConnector::new())
+ }
+ #[cfg(feature = "actix")]
+ {
+ Client::default()
+ }
+ };
Ok(IpfsClient {
base: base_path,
- #[cfg(feature = "hyper")]
- client: {
- let connector = HttpsConnector::new(4).unwrap();
- Builder::default().keep_alive(false).build(connector)
- },
- #[cfg(feature = "actix")]
- client: Client::default(),
+ client,
})
}
@@ -149,7 +128,7 @@ impl IpfsClient {
///
fn build_base_request<Req>(
&self,
- req: &Req,
+ req: Req,
form: Option<multipart::Form<'static>>,
) -> Result<Request, Error>
where
@@ -161,39 +140,43 @@ impl IpfsClient {
Req::PATH,
::serde_urlencoded::to_string(req)?
);
+
#[cfg(feature = "hyper")]
- let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| {
- let mut builder = http::Request::builder();
- let mut builder = builder.method(Req::METHOD.clone()).uri(url);
+ {
+ url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let builder = http::Request::builder();
+ let builder = builder.method(Req::METHOD.clone()).uri(url);
+ let req = if let Some(form) = form {
+ form.set_body_convert::<hyper::Body, multipart::Body>(builder)
+ } else {
+ builder.body(hyper::Body::empty())
+ };
+
+ req.map_err(From::from)
+ })
+ }
+ #[cfg(feature = "actix")]
+ {
let req = if let Some(form) = form {
- form.set_body_convert::<hyper::Body, multipart::Body>(&mut builder)
+ self.client
+ .request(Req::METHOD.clone(), url)
+ .content_type(form.content_type())
} else {
- builder.body(hyper::Body::empty())
+ self.client.request(Req::METHOD.clone(), url)
};
- req.map_err(From::from)
- });
- #[cfg(feature = "actix")]
- let req = if let Some(form) = form {
- Ok(self
- .client
- .request(Req::METHOD.clone(), url)
- .content_type(form.content_type()))
- } else {
- Ok(self.client.request(Req::METHOD.clone(), url))
- };
-
- req
+ Ok(req.timeout(std::time::Duration::from_secs(90)))
+ }
}
/// Builds an Api error from a response body.
///
#[inline]
- fn build_error_from_body(chunk: Bytes) -> Error {
- match serde_json::from_slice(&chunk) {
+ fn process_error_from_body(body: Bytes) -> Error {
+ match serde_json::from_slice(&body) {
Ok(e) => Error::Api(e),
- Err(_) => match String::from_utf8(chunk.to_vec()) {
+ Err(_) => match String::from_utf8(body.to_vec()) {
Ok(s) => Error::Uncategorized(s),
Err(e) => e.into(),
},
@@ -203,229 +186,206 @@ impl IpfsClient {
/// Processes a response that expects a json encoded body, returning an
/// error or a deserialized json response.
///
- fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error>
+ fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
- StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
+ StatusCode::OK => serde_json::from_slice(&body).map_err(From::from),
+ _ => Err(Self::process_error_from_body(body)),
}
}
/// Processes a response that returns a stream of json deserializable
/// results.
///
- fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res>
+ fn process_stream_response<D, Res>(
+ res: Response,
+ decoder: D,
+ ) -> impl Stream<Item = Result<Res, Error>>
where
- D: 'static + Decoder<Item = Res, Error = Error> + Send,
- Res: 'static,
+ D: Decoder<Item = Res, Error = Error> + Send,
{
#[cfg(feature = "hyper")]
- let stream = FramedRead::new(
- StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()),
- decoder,
- );
-
+ {
+ FramedRead::new(StreamReader::new(res.into_body()), decoder)
+ }
#[cfg(feature = "actix")]
- let stream = FramedRead::new(StreamReader::new(res.from_err()), decoder);
-
- Box::new(stream)
+ {
+ FramedRead::new(StreamReader::new(res), decoder)
+ }
}
/// Generates a request, and returns the unprocessed response future.
///
- fn request_raw<Req>(
+ async fn request_raw<Req>(
&self,
- req: &Req,
+ req: Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<(StatusCode, Bytes)>
+ ) -> Result<(StatusCode, Bytes), Error>
where
Req: ApiRequest + Serialize,
{
- match self.build_base_request(req, form) {
- Ok(req) => {
- #[cfg(feature = "hyper")]
- let res = self
- .client
- .request(req)
- .and_then(|res| {
- let status = res.status();
-
- res.into_body()
- .concat2()
- .map(move |chunk| (status, chunk.into_bytes()))
- })
- .from_err();
- #[cfg(feature = "actix")]
- let res = req
- .timeout(std::time::Duration::from_secs(90))
- .send()
- .from_err()
- .and_then(|mut x| {
- let status = x.status();
- x.body().map(move |body| (status, body)).from_err()
- });
- Box::new(res)
- }
- Err(e) => Box::new(Err(e).into_future()),
- }
- }
+ let req = self.build_base_request(req, form)?;
- /// Generic method for making a request that expects back a streaming
- /// response.
- ///
- fn request_stream<Req, Res, F>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- process: F,
- ) -> AsyncStreamResponse<Res>
- where
- Req: ApiRequest + Serialize,
- Res: 'static + Send,
- F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
- {
#[cfg(feature = "hyper")]
- match self.build_base_request(req, form) {
- Ok(req) => {
- let res = self
- .client
- .request(req)
- .from_err()
- .map(move |res| {
- let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> =
- match res.status() {
- StatusCode::OK => process(res),
- // If the server responded with an error status code, the body
- // still needs to be read so an error can be built. This block will
- // read the entire body stream, then immediately return an error.
- //
- _ => Box::new(
- res.into_body()
- .concat2()
- .from_err()
- .and_then(|chunk| {
- Err(Self::build_error_from_body(chunk.into_bytes()))
- })
- .into_stream(),
- ),
- };
-
- stream
- })
- .flatten_stream();
- Box::new(res)
- }
- Err(e) => Box::new(stream::once(Err(e))),
+ {
+ let res = self.client.request(req).await?;
+ let status = res.status();
+ let body = body::to_bytes(res.into_body()).await?;
+
+ Ok((status, body))
}
#[cfg(feature = "actix")]
- match self.build_base_request(req, form) {
- Ok(req) => {
- let res = req
- .timeout(std::time::Duration::from_secs(90))
- .send()
- .from_err();
- Box::new(res.map(process).flatten_stream())
- }
- Err(e) => Box::new(stream::once(Err(e))),
+ {
+ let mut res = req.send().await?;
+ let status = res.status();
+ let body = res.body().await?;
+
+ Ok((status, body))
}
}
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
- fn request<Req, Res>(
+ async fn request<Req, Res>(
&self,
- req: &Req,
+ req: Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<Res>
+ ) -> Result<Res, Error>
where
Req: ApiRequest + Serialize,
- for<'de> Res: 'static + Deserialize<'de> + Send,
+ for<'de> Res: 'static + Deserialize<'de>,
{
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk));
+ let (status, chunk) = self.request_raw(req, form).await?;
- Box::new(res)
+ IpfsClient::process_json_response(status, chunk)
}
/// Generic method for making a request to the Ipfs server, and getting
/// back a response with no body.
///
- fn request_empty<Req>(
+ async fn request_empty<Req>(
&self,
- req: &Req,
+ req: Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<()>
+ ) -> Result<(), Error>
where
Req: ApiRequest + Serialize,
{
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| match status {
- StatusCode::OK => Ok(()),
- _ => Err(Self::build_error_from_body(chunk)),
- });
+ let (status, chunk) = self.request_raw(req, form).await?;
- Box::new(res)
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
}
/// Generic method for making a request to the Ipfs server, and getting
/// back a raw String response.
///
- fn request_string<Req>(
+ async fn request_string<Req>(
&self,
- req: &Req,
+ req: Req,
form: Option<multipart::Form<'static>>,
- ) -> AsyncResponse<String>
+ ) -> Result<String, Error>
where
Req: ApiRequest + Serialize,
{
- let res = self
- .request_raw(req, form)
- .and_then(|(status, chunk)| match status {
- StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
- _ => Err(Self::build_error_from_body(chunk)),
- });
+ let (status, chunk) = self.request_raw(req, form).await?;
- Box::new(res)
+ match status {
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
}
+}
- /// Generic method for making a request to the Ipfs server, and getting
- /// back a raw stream of bytes.
+impl IpfsClient {
+ /// Generic method for making a request that expects back a streaming
+ /// response.
///
- fn request_stream_bytes<Req>(
+ fn request_stream<Res, F, OutStream>(
&self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Bytes>
+ req: Request,
+ process: F,
+ ) -> impl Stream<Item = Result<Res, Error>>
where
- Req: ApiRequest + Serialize,
+ OutStream: Stream<Item = Result<Res, Error>>,
+ F: 'static + Fn(Response) -> OutStream,
{
#[cfg(feature = "hyper")]
- let res = self.request_stream(req, form, |res| {
- Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
- });
+ {
+ self.client
+ .request(req)
+ .err_into()
+ .map_ok(move |res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => body::to_bytes(res.into_body())
+ .boxed()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream()
+ }
#[cfg(feature = "actix")]
- let res = self.request_stream(req, form, |res| Box::new(res.from_err()));
- res
+ {
+ req.send()
+ .err_into()
+ .map_ok(move |mut res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => res
+ .body()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream()
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw stream of bytes.
+ ///
+ fn request_stream_bytes(&self, req: Request) -> impl Stream<Item = Result<Bytes, Error>> {
+ #[cfg(feature = "hyper")]
+ {
+ self.request_stream(req, |res| res.into_body().err_into())
+ }
+ #[cfg(feature = "actix")]
+ {
+ self.request_stream(req, |res| res.err_into())
+ }
}
/// Generic method to return a streaming response of deserialized json
/// objects delineated by new line separators.
///
- fn request_stream_json<Req, Res>(
- &self,
- req: &Req,
- form: Option<multipart::Form<'static>>,
- ) -> AsyncStreamResponse<Res>
+ fn request_stream_json<Res>(&self, req: Request) -> impl Stream<Item = Result<Res, Error>>
where
- Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
- self.request_stream(req, form, |res| {
+ self.request_stream(req, |res| {
let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
// Response has the Trailer header set. The StreamError trailer
// is used to indicate that there was an error while streaming
@@ -441,48 +401,63 @@ impl IpfsClient {
// There was an unrecognized trailer value. If that is the case,
// create a stream that immediately errors.
//
- return Box::new(stream::once(Err(err)));
+ return future::err(err).into_stream().left_stream();
}
} else {
false
};
- Box::new(IpfsClient::process_stream_response(
- res,
- JsonLineDecoder::new(parse_stream_error),
- ))
+ IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
+ .right_stream()
})
}
}
+// Implements a call to the IPFS that returns a streaming body response.
+// Implementing this in a macro is necessary because the Rust compiler
+// can't reason about the lifetime of the request instance properly. It
+// thinks that the request needs to live as long as the returned stream,
+// but in reality, the request instance is only used to build the Hyper
+// or Actix request.
+//
+macro_rules! impl_stream_api_response {
+ (($self:ident, $req:expr, $form:expr) => $call:ident) => {
+ impl_stream_api_response! {
+ ($self, $req, $form) |req| => { $self.$call(req) }
+ }
+ };
+ (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => {
+ match $self.build_base_request($req, $form) {
+ Ok($var) => $impl.right_stream(),
+ Err(e) => return future::err(e).into_stream().left_stream(),
+ }
+ };
+}
+
impl IpfsClient {
/// Add file to Ipfs.
///
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
/// use std::io::Cursor;
///
- /// # fn main() {
/// let client = IpfsClient::default();
/// let data = Cursor::new("Hello World!");
- /// let req = client.add(data);
- /// # }
+ /// let res = client.add(data);
/// ```
///
#[inline]
- pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse>
+ pub async fn add<R>(&self, data: R) -> Result<response::AddResponse, Error>
where
- R: 'static + Read + Send,
+ R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
- self.request(&request::Add, Some(form))
+ self.request(request::Add, Some(form)).await
}
/// Add a path to Ipfs. Can be a file or directory.
@@ -492,53 +467,45 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
/// let path = "./src";
- /// let req = client.add_path(path);
- /// # }
+ /// let res = client.add_path(path);
/// ```
///
#[inline]
- pub fn add_path<P>(&self, path: P) -> AsyncResponse<response::AddResponse>
+ pub async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Error>
where
P: AsRef<Path>,
{
- let mut form = multipart::Form::default();
-
let prefix = path.as_ref().parent();
-
let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
for path in walkdir::WalkDir::new(path.as_ref()) {
match path {
- Ok(entry) => {
+ Ok(entry) if entry.file_type().is_file() => {
if entry.file_type().is_file() {
- let file_size =
- entry.metadata().map(|metadata| metadata.len()).unwrap_or(0);
+ let file_size = entry
+ .metadata()
+ .map(|metadata| metadata.len())
+ .map_err(|e| Error::Io(e.into()))?;
+
paths_to_add.push((entry.path().to_path_buf(), file_size));
}
}
- Err(err) => {
- return Box::new(future::err(Error::Io(err.into())));
- }
+ Ok(_) => (),
+ Err(err) => return Err(Error::Io(err.into())),
}
}
paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
let mut it = 0;
- const FILE_DESCRIPTOR_LIMIT: usize = 127;
+ let mut form = multipart::Form::default();
for (path, file_size) in paths_to_add {
- let file = std::fs::File::open(&path);
- if let Err(err) = file {
- return Box::new(future::err(err.into()));
- }
+ let mut file = fs::File::open(&path)?;
let file_name = match prefix {
Some(prefix) => path.strip_prefix(prefix).unwrap(),
None => path.as_path(),
@@ -546,22 +513,20 @@ impl IpfsClient {
.to_string_lossy();
if it < FILE_DESCRIPTOR_LIMIT {
- form.add_reader_file("path", file.unwrap(), file_name);
+ form.add_reader_file("path", file, file_name);
+
it += 1;
} else {
let mut buf = Vec::with_capacity(file_size as usize);
- if let Err(err) = file.unwrap().read_to_end(&mut buf) {
- return Box::new(future::err(err.into()));
- }
- form.add_reader_file("path", std::io::Cursor::new(buf), file_name);
+ let _ = file.read_to_end(&mut buf)?;
+
+ form.add_reader_file("path", Cursor::new(buf), file_name);
}
}
- Box::new(
- self.request_stream_json(&request::Add, Some(form))
- .collect()
- .map(|mut responses: Vec<response::AddResponse>| responses.pop().unwrap()),
- )
+ let req = self.build_base_request(request::Add, Some(form))?;
+
+ self.request_stream_json(req).try_collect().await
}
/// Returns the current ledger for a peer.
@@ -569,19 +534,18 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ");
- /// # }
+ /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ");
/// ```
///
#[inline]
- pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse<response::BitswapLedgerResponse> {
- self.request(&request::BitswapLedger { peer }, None)
+ pub async fn bitswap_ledger(
+ &self,
+ peer: &str,
+ ) -> Result<response::BitswapLedgerResponse, Error> {
+ self.request(request::BitswapLedger { peer }, None).await
}
/// Triggers a reprovide.
@@ -589,19 +553,15 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bitswap_reprovide();
- /// # }
+ /// let res = client.bitswap_reprovide();
/// ```
///
#[inline]
- pub fn bitswap_reprovide(&self) -> AsyncResponse<response::BitswapReprovideResponse> {
- self.request_empty(&request::BitswapReprovide, None)
+ pub async fn bitswap_reprovide(&self) -> Result<response::BitswapReprovideResponse, Error> {
+ self.request_empty(request::BitswapReprovide, None).await
}
/// Returns some stats about the bitswap agent.
@@ -609,19 +569,15 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bitswap_stat();
- /// # }
+ /// let res = client.bitswap_stat();
/// ```
///
#[inline]
- pub fn bitswap_stat(&self) -> AsyncResponse<response::BitswapStatResponse> {
- self.request(&request::BitswapStat, None)
+ pub async fn bitswap_stat(&self) -> Result<response::BitswapStatResponse, Error> {
+ self.request(request::BitswapStat, None).await
}
/// Remove a given block from your wantlist.
@@ -629,19 +585,19 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
- /// # }
+ /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
/// ```
///
#[inline]
- pub fn bitswap_unwant(&self, key: &str) -> AsyncResponse<response::BitswapUnwantResponse> {
- self.request_empty(&request::BitswapUnwant { key }, None)
+ pub async fn bitswap_unwant(
+ &self,
+ key: &str,
+ ) -> Result<response::BitswapUnwantResponse, Error> {
+ self.request_empty(request::BitswapUnwant { key }, None)
+ .await
}
/// Shows blocks on the wantlist for you or the specified peer.
@@ -649,22 +605,20 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bitswap_wantlist(Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"));
- /// # }
+ /// let res = client.bitswap_wantlist(
+ /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")
+ /// );
/// ```
///
#[inline]
- pub fn bitswap_wantlist(
+ pub async fn bitswap_wantlist(
&self,
peer: Option<&str>,
- ) -> AsyncResponse<response::BitswapWantlistResponse> {
- self.request(&request::BitswapWantlist { peer }, None)
+ ) -> Result<response::BitswapWantlistResponse, Error> {
+ self.request(request::BitswapWantlist { peer }, None).await
}
/// Gets a raw IPFS block.
@@ -672,22 +626,22 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate futures;
- /// # extern crate ipfs_api;
- /// #
- /// use futures::Stream;
+ /// use futures::TryStreamExt;
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
/// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA";
- /// let req = client.block_get(hash).concat2();
- /// # }
+ /// let res = client
+ /// .block_get(hash)
+ /// .map_ok(|chunk| chunk.to_vec())
+ /// .try_concat();
/// ```
///
#[inline]
- pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Bytes> {
- self.request_stream_bytes(&request::BlockGet { hash }, None)
+ pub fn block_get(&self, hash: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::BlockGet { hash }, None) => request_stream_bytes
+ }
}
/// Store input as an IPFS block.
@@ -695,28 +649,24 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
/// use std::io::Cursor;
///
- /// # fn main() {
/// let client = IpfsClient::default();
/// let data = Cursor::new("Hello World!");
- /// let req = client.block_put(data);
- /// # }
+ /// let res = client.block_put(data);
/// ```
///
#[inline]
- pub fn block_put<R>(&self, data: R) -> AsyncResponse<response::BlockPutResponse>
+ pub async fn block_put<R>(&self, data: R) -> Result<response::BlockPutResponse, Error>
where
- R: 'static + Read + Send,
+ R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
- self.request(&request::BlockPut, Some(form))
+ self.request(request::BlockPut, Some(form)).await
}
/// Removes an IPFS block.
@@ -724,19 +674,15 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
- /// # }
+ /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
/// ```
///
#[inline]
- pub fn block_rm(&self, hash: &str) -> AsyncResponse<response::BlockRmResponse> {
- self.request(&request::BlockRm { hash }, None)
+ pub async fn block_rm(&self, hash: &str) -> Result<response::BlockRmResponse, Error> {
+ self.request(request::BlockRm { hash }, None).await
}
/// Prints information about a raw IPFS block.
@@ -744,19 +690,15 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
- /// # }
+ /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
/// ```
///
#[inline]
- pub fn block_stat(&self, hash: &str) -> AsyncResponse<response::BlockStatResponse> {
- self.request(&request::BlockStat { hash }, None)
+ pub async fn block_stat(&self, hash: &str) -> Result<response::BlockStatResponse, Error> {
+ self.request(request::BlockStat { hash }, None).await
}
/// Add default peers to the bootstrap list.
@@ -764,19 +706,17 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bootstrap_add_default();
- /// # }
+ /// let res = client.bootstrap_add_default();
/// ```
///
#[inline]
- pub fn bootstrap_add_default(&self) -> AsyncResponse<response::BootstrapAddDefaultResponse> {
- self.request(&request::BootstrapAddDefault, None)
+ pub async fn bootstrap_add_default(
+ &self,
+ ) -> Result<response::BootstrapAddDefaultResponse, Error> {
+ self.request(request::BootstrapAddDefault, None).await
}
/// Lists peers in bootstrap list.
@@ -784,19 +724,15 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bootstrap_list();
- /// # }
+ /// let res = client.bootstrap_list();
/// ```
///
#[inline]
- pub fn bootstrap_list(&self) -> AsyncResponse<response::BootstrapListResponse> {
- self.request(&request::BootstrapList, None)
+ pub async fn bootstrap_list(&self) -> Result<response::BootstrapListResponse, Error> {
+ self.request(request::BootstrapList, None).await
}
/// Removes all peers in bootstrap list.
@@ -804,19 +740,15 @@ impl IpfsClient {
/// # Examples
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
- /// let req = client.bootstrap_rm_all();
- /// # }
+ /// let res = client.bootstrap_rm_all();
/// ```
///
#[inline]
- pub fn bootstrap_rm_all(&self) -> AsyncResponse<response::BootstrapRmAllResponse> {
- self.request(&request::BootstrapRmAll, None)
+ pu