diff options
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r-- | ipfs-api/src/client.rs | 1507 |
1 files changed, 629 insertions, 878 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 5d44c7c..1072a33 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -5,26 +5,25 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. // -use crate::header::TRAILER; -#[cfg(feature = "hyper")] -use crate::hyper_multipart::client::multipart; -use crate::read::{JsonLineDecoder, LineDecoder, StreamReader}; -use crate::request::{self, ApiRequest}; -use crate::response::{self, Error}; -#[cfg(feature = "actix")] -use actix_http::{encoding, Payload, PayloadStream}; +use crate::{ + header::TRAILER, + read::{JsonLineDecoder, LineDecoder, StreamReader}, + request::{self, ApiRequest}, + response::{self, Error}, + Client, Request, Response, +}; #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; -use futures::{ - future, - stream::{self, Stream}, - Future, IntoFuture, +use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use http::{ + uri::{InvalidUri, Uri}, + StatusCode, }; -use http::uri::{InvalidUri, Uri}; -use http::StatusCode; #[cfg(feature = "hyper")] -use hyper::client::{self, Builder, HttpConnector}; +use hyper::{body, client::Builder}; +#[cfg(feature = "hyper")] +use hyper_multipart::client::multipart; #[cfg(feature = "hyper")] use hyper_tls::HttpsConnector; use multiaddr::{AddrComponent, ToMultiaddr}; @@ -32,40 +31,13 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::{ fs, - io::Read, + io::{Cursor, Read}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, }; -use tokio_codec::{Decoder, FramedRead}; - -/// A response returned by the HTTP client. -/// -#[cfg(feature = "actix")] -type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + 'static>; -#[cfg(feature = "hyper")] -type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + Send + 'static>; - -/// A future that returns a stream of responses. -/// -#[cfg(feature = "actix")] -type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + 'static>; -#[cfg(feature = "hyper")] -type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>; - -#[cfg(feature = "actix")] -type Request = awc::ClientRequest; -#[cfg(feature = "hyper")] -type Request = http::Request<hyper::Body>; +use tokio_util::codec::{Decoder, FramedRead}; -#[cfg(feature = "actix")] -type Response = awc::ClientResponse<encoding::Decoder<Payload<PayloadStream>>>; -#[cfg(feature = "hyper")] -type Response = http::Response<hyper::Body>; - -#[cfg(feature = "actix")] -type Client = awc::Client; -#[cfg(feature = "hyper")] -type Client = client::Client<HttpsConnector<HttpConnector>, hyper::Body>; +const FILE_DESCRIPTOR_LIMIT: usize = 127; /// Asynchronous Ipfs client. /// @@ -123,19 +95,26 @@ impl IpfsClient { } /// Creates a new `IpfsClient` for any given URI. + /// #[inline] pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> { let base_path = IpfsClient::build_base_path(uri)?; + let client = { + #[cfg(feature = "hyper")] + { + Builder::default() + .keep_alive(false) + .build(HttpsConnector::new()) + } + #[cfg(feature = "actix")] + { + Client::default() + } + }; Ok(IpfsClient { base: base_path, - #[cfg(feature = "hyper")] - client: { - let connector = HttpsConnector::new(4).unwrap(); - Builder::default().keep_alive(false).build(connector) - }, - #[cfg(feature = "actix")] - client: Client::default(), + client, }) } @@ -149,7 +128,7 @@ impl IpfsClient { /// fn build_base_request<Req>( &self, - req: &Req, + req: Req, form: Option<multipart::Form<'static>>, ) -> Result<Request, Error> where @@ -161,39 +140,43 @@ impl IpfsClient { Req::PATH, ::serde_urlencoded::to_string(req)? ); + #[cfg(feature = "hyper")] - let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| { - let mut builder = http::Request::builder(); - let mut builder = builder.method(Req::METHOD.clone()).uri(url); + { + url.parse::<Uri>().map_err(From::from).and_then(move |url| { + let builder = http::Request::builder(); + let builder = builder.method(Req::METHOD.clone()).uri(url); + let req = if let Some(form) = form { + form.set_body_convert::<hyper::Body, multipart::Body>(builder) + } else { + builder.body(hyper::Body::empty()) + }; + + req.map_err(From::from) + }) + } + #[cfg(feature = "actix")] + { let req = if let Some(form) = form { - form.set_body_convert::<hyper::Body, multipart::Body>(&mut builder) + self.client + .request(Req::METHOD.clone(), url) + .content_type(form.content_type()) } else { - builder.body(hyper::Body::empty()) + self.client.request(Req::METHOD.clone(), url) }; - req.map_err(From::from) - }); - #[cfg(feature = "actix")] - let req = if let Some(form) = form { - Ok(self - .client - .request(Req::METHOD.clone(), url) - .content_type(form.content_type())) - } else { - Ok(self.client.request(Req::METHOD.clone(), url)) - }; - - req + Ok(req.timeout(std::time::Duration::from_secs(90))) + } } /// Builds an Api error from a response body. /// #[inline] - fn build_error_from_body(chunk: Bytes) -> Error { - match serde_json::from_slice(&chunk) { + fn process_error_from_body(body: Bytes) -> Error { + match serde_json::from_slice(&body) { Ok(e) => Error::Api(e), - Err(_) => match String::from_utf8(chunk.to_vec()) { + Err(_) => match String::from_utf8(body.to_vec()) { Ok(s) => Error::Uncategorized(s), Err(e) => e.into(), }, @@ -203,229 +186,206 @@ impl IpfsClient { /// Processes a response that expects a json encoded body, returning an /// error or a deserialized json response. /// - fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error> + fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Error> where for<'de> Res: 'static + Deserialize<'de>, { match status { - StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), + StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), + _ => Err(Self::process_error_from_body(body)), } } /// Processes a response that returns a stream of json deserializable /// results. /// - fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res> + fn process_stream_response<D, Res>( + res: Response, + decoder: D, + ) -> impl Stream<Item = Result<Res, Error>> where - D: 'static + Decoder<Item = Res, Error = Error> + Send, - Res: 'static, + D: Decoder<Item = Res, Error = Error> + Send, { #[cfg(feature = "hyper")] - let stream = FramedRead::new( - StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()), - decoder, - ); - + { + FramedRead::new(StreamReader::new(res.into_body()), decoder) + } #[cfg(feature = "actix")] - let stream = FramedRead::new(StreamReader::new(res.from_err()), decoder); - - Box::new(stream) + { + FramedRead::new(StreamReader::new(res), decoder) + } } /// Generates a request, and returns the unprocessed response future. /// - fn request_raw<Req>( + async fn request_raw<Req>( &self, - req: &Req, + req: Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<(StatusCode, Bytes)> + ) -> Result<(StatusCode, Bytes), Error> where Req: ApiRequest + Serialize, { - match self.build_base_request(req, form) { - Ok(req) => { - #[cfg(feature = "hyper")] - let res = self - .client - .request(req) - .and_then(|res| { - let status = res.status(); - - res.into_body() - .concat2() - .map(move |chunk| (status, chunk.into_bytes())) - }) - .from_err(); - #[cfg(feature = "actix")] - let res = req - .timeout(std::time::Duration::from_secs(90)) - .send() - .from_err() - .and_then(|mut x| { - let status = x.status(); - x.body().map(move |body| (status, body)).from_err() - }); - Box::new(res) - } - Err(e) => Box::new(Err(e).into_future()), - } - } + let req = self.build_base_request(req, form)?; - /// Generic method for making a request that expects back a streaming - /// response. - /// - fn request_stream<Req, Res, F>( - &self, - req: &Req, - form: Option<multipart::Form<'static>>, - process: F, - ) -> AsyncStreamResponse<Res> - where - Req: ApiRequest + Serialize, - Res: 'static + Send, - F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send, - { #[cfg(feature = "hyper")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = self - .client - .request(req) - .from_err() - .map(move |res| { - let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> = - match res.status() { - StatusCode::OK => process(res), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => Box::new( - res.into_body() - .concat2() - .from_err() - .and_then(|chunk| { - Err(Self::build_error_from_body(chunk.into_bytes())) - }) - .into_stream(), - ), - }; - - stream - }) - .flatten_stream(); - Box::new(res) - } - Err(e) => Box::new(stream::once(Err(e))), + { + let res = self.client.request(req).await?; + let status = res.status(); + let body = body::to_bytes(res.into_body()).await?; + + Ok((status, body)) } #[cfg(feature = "actix")] - match self.build_base_request(req, form) { - Ok(req) => { - let res = req - .timeout(std::time::Duration::from_secs(90)) - .send() - .from_err(); - Box::new(res.map(process).flatten_stream()) - } - Err(e) => Box::new(stream::once(Err(e))), + { + let mut res = req.send().await?; + let status = res.status(); + let body = res.body().await?; + + Ok((status, body)) } } /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request<Req, Res>( + async fn request<Req, Res>( &self, - req: &Req, + req: Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<Res> + ) -> Result<Res, Error> where Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de> + Send, + for<'de> Res: 'static + Deserialize<'de>, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk)); + let (status, chunk) = self.request_raw(req, form).await?; - Box::new(res) + IpfsClient::process_json_response(status, chunk) } /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// - fn request_empty<Req>( + async fn request_empty<Req>( &self, - req: &Req, + req: Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<()> + ) -> Result<(), Error> where Req: ApiRequest + Serialize, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { - StatusCode::OK => Ok(()), - _ => Err(Self::build_error_from_body(chunk)), - }); + let (status, chunk) = self.request_raw(req, form).await?; - Box::new(res) + match status { + StatusCode::OK => Ok(()), + _ => Err(Self::process_error_from_body(chunk)), + } } /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// - fn request_string<Req>( + async fn request_string<Req>( &self, - req: &Req, + req: Req, form: Option<multipart::Form<'static>>, - ) -> AsyncResponse<String> + ) -> Result<String, Error> where Req: ApiRequest + Serialize, { - let res = self - .request_raw(req, form) - .and_then(|(status, chunk)| match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::build_error_from_body(chunk)), - }); + let (status, chunk) = self.request_raw(req, form).await?; - Box::new(res) + match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::process_error_from_body(chunk)), + } } +} - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw stream of bytes. +impl IpfsClient { + /// Generic method for making a request that expects back a streaming + /// response. /// - fn request_stream_bytes<Req>( + fn request_stream<Res, F, OutStream>( &self, - req: &Req, - form: Option<multipart::Form<'static>>, - ) -> AsyncStreamResponse<Bytes> + req: Request, + process: F, + ) -> impl Stream<Item = Result<Res, Error>> where - Req: ApiRequest + Serialize, + OutStream: Stream<Item = Result<Res, Error>>, + F: 'static + Fn(Response) -> OutStream, { #[cfg(feature = "hyper")] - let res = self.request_stream(req, form, |res| { - Box::new(res.into_body().from_err().map(|c| c.into_bytes())) - }); + { + self.client + .request(req) + .err_into() + .map_ok(move |res| { + match res.status() { + StatusCode::OK => process(res).right_stream(), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => body::to_bytes(res.into_body()) + .boxed() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream() + .left_stream(), + } + }) + .try_flatten_stream() + } #[cfg(feature = "actix")] - let res = self.request_stream(req, form, |res| Box::new(res.from_err())); - res + { + req.send() + .err_into() + .map_ok(move |mut res| { + match res.status() { + StatusCode::OK => process(res).right_stream(), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => res + .body() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream() + .left_stream(), + } + }) + .try_flatten_stream() + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// + fn request_stream_bytes(&self, req: Request) -> impl Stream<Item = Result<Bytes, Error>> { + #[cfg(feature = "hyper")] + { + self.request_stream(req, |res| res.into_body().err_into()) + } + #[cfg(feature = "actix")] + { + self.request_stream(req, |res| res.err_into()) + } } /// Generic method to return a streaming response of deserialized json /// objects delineated by new line separators. /// - fn request_stream_json<Req, Res>( - &self, - req: &Req, - form: Option<multipart::Form<'static>>, - ) -> AsyncStreamResponse<Res> + fn request_stream_json<Res>(&self, req: Request) -> impl Stream<Item = Result<Res, Error>> where - Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de> + Send, { - self.request_stream(req, form, |res| { + self.request_stream(req, |res| { let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { // Response has the Trailer header set. The StreamError trailer // is used to indicate that there was an error while streaming @@ -441,48 +401,63 @@ impl IpfsClient { // There was an unrecognized trailer value. If that is the case, // create a stream that immediately errors. // - return Box::new(stream::once(Err(err))); + return future::err(err).into_stream().left_stream(); } } else { false }; - Box::new(IpfsClient::process_stream_response( - res, - JsonLineDecoder::new(parse_stream_error), - )) + IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) + .right_stream() }) } } +// Implements a call to the IPFS that returns a streaming body response. +// Implementing this in a macro is necessary because the Rust compiler +// can't reason about the lifetime of the request instance properly. It +// thinks that the request needs to live as long as the returned stream, +// but in reality, the request instance is only used to build the Hyper +// or Actix request. +// +macro_rules! impl_stream_api_response { + (($self:ident, $req:expr, $form:expr) => $call:ident) => { + impl_stream_api_response! { + ($self, $req, $form) |req| => { $self.$call(req) } + } + }; + (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => { + match $self.build_base_request($req, $form) { + Ok($var) => $impl.right_stream(), + Err(e) => return future::err(e).into_stream().left_stream(), + } + }; +} + impl IpfsClient { /// Add file to Ipfs. /// /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// use std::io::Cursor; /// - /// # fn main() { /// let client = IpfsClient::default(); /// let data = Cursor::new("Hello World!"); - /// let req = client.add(data); - /// # } + /// let res = client.add(data); /// ``` /// #[inline] - pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse> + pub async fn add<R>(&self, data: R) -> Result<response::AddResponse, Error> where - R: 'static + Read + Send, + R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("path", data); - self.request(&request::Add, Some(form)) + self.request(request::Add, Some(form)).await } /// Add a path to Ipfs. Can be a file or directory. @@ -492,53 +467,45 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); /// let path = "./src"; - /// let req = client.add_path(path); - /// # } + /// let res = client.add_path(path); /// ``` /// #[inline] - pub fn add_path<P>(&self, path: P) -> AsyncResponse<response::AddResponse> + pub async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Error> where P: AsRef<Path>, { - let mut form = multipart::Form::default(); - let prefix = path.as_ref().parent(); - let mut paths_to_add: Vec<(PathBuf, u64)> = vec![]; for path in walkdir::WalkDir::new(path.as_ref()) { match path { - Ok(entry) => { + Ok(entry) if entry.file_type().is_file() => { if entry.file_type().is_file() { - let file_size = - entry.metadata().map(|metadata| metadata.len()).unwrap_or(0); + let file_size = entry + .metadata() + .map(|metadata| metadata.len()) + .map_err(|e| Error::Io(e.into()))?; + paths_to_add.push((entry.path().to_path_buf(), file_size)); } } - Err(err) => { - return Box::new(future::err(Error::Io(err.into()))); - } + Ok(_) => (), + Err(err) => return Err(Error::Io(err.into())), } } paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse()); let mut it = 0; - const FILE_DESCRIPTOR_LIMIT: usize = 127; + let mut form = multipart::Form::default(); for (path, file_size) in paths_to_add { - let file = std::fs::File::open(&path); - if let Err(err) = file { - return Box::new(future::err(err.into())); - } + let mut file = fs::File::open(&path)?; let file_name = match prefix { Some(prefix) => path.strip_prefix(prefix).unwrap(), None => path.as_path(), @@ -546,22 +513,20 @@ impl IpfsClient { .to_string_lossy(); if it < FILE_DESCRIPTOR_LIMIT { - form.add_reader_file("path", file.unwrap(), file_name); + form.add_reader_file("path", file, file_name); + it += 1; } else { let mut buf = Vec::with_capacity(file_size as usize); - if let Err(err) = file.unwrap().read_to_end(&mut buf) { - return Box::new(future::err(err.into())); - } - form.add_reader_file("path", std::io::Cursor::new(buf), file_name); + let _ = file.read_to_end(&mut buf)?; + + form.add_reader_file("path", Cursor::new(buf), file_name); } } - Box::new( - self.request_stream_json(&request::Add, Some(form)) - .collect() - .map(|mut responses: Vec<response::AddResponse>| responses.pop().unwrap()), - ) + let req = self.build_base_request(request::Add, Some(form))?; + + self.request_stream_json(req).try_collect().await } /// Returns the current ledger for a peer. @@ -569,19 +534,18 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"); - /// # } + /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"); /// ``` /// #[inline] - pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse<response::BitswapLedgerResponse> { - self.request(&request::BitswapLedger { peer }, None) + pub async fn bitswap_ledger( + &self, + peer: &str, + ) -> Result<response::BitswapLedgerResponse, Error> { + self.request(request::BitswapLedger { peer }, None).await } /// Triggers a reprovide. @@ -589,19 +553,15 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bitswap_reprovide(); - /// # } + /// let res = client.bitswap_reprovide(); /// ``` /// #[inline] - pub fn bitswap_reprovide(&self) -> AsyncResponse<response::BitswapReprovideResponse> { - self.request_empty(&request::BitswapReprovide, None) + pub async fn bitswap_reprovide(&self) -> Result<response::BitswapReprovideResponse, Error> { + self.request_empty(request::BitswapReprovide, None).await } /// Returns some stats about the bitswap agent. @@ -609,19 +569,15 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bitswap_stat(); - /// # } + /// let res = client.bitswap_stat(); /// ``` /// #[inline] - pub fn bitswap_stat(&self) -> AsyncResponse<response::BitswapStatResponse> { - self.request(&request::BitswapStat, None) + pub async fn bitswap_stat(&self) -> Result<response::BitswapStatResponse, Error> { + self.request(request::BitswapStat, None).await } /// Remove a given block from your wantlist. @@ -629,19 +585,19 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// # } + /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] - pub fn bitswap_unwant(&self, key: &str) -> AsyncResponse<response::BitswapUnwantResponse> { - self.request_empty(&request::BitswapUnwant { key }, None) + pub async fn bitswap_unwant( + &self, + key: &str, + ) -> Result<response::BitswapUnwantResponse, Error> { + self.request_empty(request::BitswapUnwant { key }, None) + .await } /// Shows blocks on the wantlist for you or the specified peer. @@ -649,22 +605,20 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bitswap_wantlist(Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")); - /// # } + /// let res = client.bitswap_wantlist( + /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ") + /// ); /// ``` /// #[inline] - pub fn bitswap_wantlist( + pub async fn bitswap_wantlist( &self, peer: Option<&str>, - ) -> AsyncResponse<response::BitswapWantlistResponse> { - self.request(&request::BitswapWantlist { peer }, None) + ) -> Result<response::BitswapWantlistResponse, Error> { + self.request(request::BitswapWantlist { peer }, None).await } /// Gets a raw IPFS block. @@ -672,22 +626,22 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate futures; - /// # extern crate ipfs_api; - /// # - /// use futures::Stream; + /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; - /// let req = client.block_get(hash).concat2(); - /// # } + /// let res = client + /// .block_get(hash) + /// .map_ok(|chunk| chunk.to_vec()) + /// .try_concat(); /// ``` /// #[inline] - pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Bytes> { - self.request_stream_bytes(&request::BlockGet { hash }, None) + pub fn block_get(&self, hash: &str) -> impl Stream<Item = Result<Bytes, Error>> { + impl_stream_api_response! { + (self, request::BlockGet { hash }, None) => request_stream_bytes + } } /// Store input as an IPFS block. @@ -695,28 +649,24 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// use std::io::Cursor; /// - /// # fn main() { /// let client = IpfsClient::default(); /// let data = Cursor::new("Hello World!"); - /// let req = client.block_put(data); - /// # } + /// let res = client.block_put(data); /// ``` /// #[inline] - pub fn block_put<R>(&self, data: R) -> AsyncResponse<response::BlockPutResponse> + pub async fn block_put<R>(&self, data: R) -> Result<response::BlockPutResponse, Error> where - R: 'static + Read + Send, + R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("data", data); - self.request(&request::BlockPut, Some(form)) + self.request(request::BlockPut, Some(form)).await } /// Removes an IPFS block. @@ -724,19 +674,15 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// # } + /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] - pub fn block_rm(&self, hash: &str) -> AsyncResponse<response::BlockRmResponse> { - self.request(&request::BlockRm { hash }, None) + pub async fn block_rm(&self, hash: &str) -> Result<response::BlockRmResponse, Error> { + self.request(request::BlockRm { hash }, None).await } /// Prints information about a raw IPFS block. @@ -744,19 +690,15 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// # } + /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] - pub fn block_stat(&self, hash: &str) -> AsyncResponse<response::BlockStatResponse> { - self.request(&request::BlockStat { hash }, None) + pub async fn block_stat(&self, hash: &str) -> Result<response::BlockStatResponse, Error> { + self.request(request::BlockStat { hash }, None).await } /// Add default peers to the bootstrap list. @@ -764,19 +706,17 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bootstrap_add_default(); - /// # } + /// let res = client.bootstrap_add_default(); /// ``` /// #[inline] - pub fn bootstrap_add_default(&self) -> AsyncResponse<response::BootstrapAddDefaultResponse> { - self.request(&request::BootstrapAddDefault, None) + pub async fn bootstrap_add_default( + &self, + ) -> Result<response::BootstrapAddDefaultResponse, Error> { + self.request(request::BootstrapAddDefault, None).await } /// Lists peers in bootstrap list. @@ -784,19 +724,15 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bootstrap_list(); - /// # } + /// let res = client.bootstrap_list(); /// ``` /// #[inline] - pub fn bootstrap_list(&self) -> AsyncResponse<response::BootstrapListResponse> { - self.request(&request::BootstrapList, None) + pub async fn bootstrap_list(&self) -> Result<response::BootstrapListResponse, Error> { + self.request(request::BootstrapList, None).await } /// Removes all peers in bootstrap list. @@ -804,19 +740,15 @@ impl IpfsClient { /// # Examples /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); - /// let req = client.bootstrap_rm_all(); - /// # } + /// let res = client.bootstrap_rm_all(); /// ``` /// #[inline] - pub fn bootstrap_rm_all(&self) -> AsyncResponse<response::BootstrapRmAllResponse> { - self.request(&request::BootstrapRmAll, None) + pu |