diff options
-rw-r--r-- | ipfs-api/src/client/from_uri.rs | 113 | ||||
-rw-r--r-- | ipfs-api/src/client/internal.rs | 2020 | ||||
-rw-r--r-- | ipfs-api/src/client/mod.rs | 2035 |
3 files changed, 2138 insertions, 2030 deletions
diff --git a/ipfs-api/src/client/from_uri.rs b/ipfs-api/src/client/from_uri.rs new file mode 100644 index 0000000..6762e84 --- /dev/null +++ b/ipfs-api/src/client/from_uri.rs @@ -0,0 +1,113 @@ +// Copyright 2020 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// +use http::uri::{Builder, InvalidUri, PathAndQuery, Scheme, Uri}; +use parity_multiaddr::{Multiaddr, Protocol}; +use std::{ + fs, + net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6}, +}; + +const VERSION_PATH_V0: &'static str = "/api/v0"; + +/// Builds the base url path for the Ipfs api. +/// +fn build_base_path(builder: Builder) -> Result<Uri, http::Error> { + builder.path_and_query(VERSION_PATH_V0).build() +} + +pub trait TryFromUri: Sized { + /// Builds a new client from a base URI to the IPFS API. + /// + fn build_with_base_uri(uri: Uri) -> Self; + + /// Creates a new client from a str. + /// + /// Note: This constructor will overwrite the path/query part of the URI. + /// + fn from_str(uri: &str) -> Result<Self, InvalidUri> { + let uri: Uri = uri.parse()?; + let mut parts = uri.into_parts(); + + parts.path_and_query = Some(PathAndQuery::from_static(VERSION_PATH_V0)); + + Ok(Self::build_with_base_uri(Uri::from_parts(parts).unwrap())) + } + + /// Creates a new client from a host name and port. + /// + fn from_host_and_port(host: &str, port: u16) -> Result<Self, http::Error> { + let authority = format!("{}:{}", host, port); + let builder = Builder::new() + .scheme(Scheme::HTTP) + .authority(&authority[..]); + + build_base_path(builder).map(Self::build_with_base_uri) + } + + /// Creates a new client from an IPV4 address and port number. + /// + fn from_ipv4(scheme: Scheme, addr: SocketAddrV4) -> Result<Self, http::Error> { + let authority = format!("{}", addr); + let builder = Builder::new().scheme(scheme).authority(&authority[..]); + + build_base_path(builder).map(Self::build_with_base_uri) + } + + /// Creates a new client from an IPV6 addr and port number. + /// + fn from_ipv6(scheme: Scheme, addr: SocketAddrV6) -> Result<Self, http::Error> { + let authority = format!("{}", addr); + let builder = Builder::new().scheme(scheme).authority(&authority[..]); + + build_base_path(builder).map(Self::build_with_base_uri) + } + + /// Creates a new client from an IP address and port number. + /// + fn from_socket(scheme: Scheme, socket_addr: SocketAddr) -> Result<Self, http::Error> { + match socket_addr { + SocketAddr::V4(addr) => Self::from_ipv4(scheme, addr), + SocketAddr::V6(addr) => Self::from_ipv6(scheme, addr), + } + } + + /// Creates a new client connected to the endpoint specified in ~/.ipfs/api. + /// + fn from_multiaddr(multiaddr: Multiaddr) -> Option<Self> { + let mut addr: Option<IpAddr> = None; + let mut port: Option<u16> = None; + + for addr_component in multiaddr.iter() { + match addr_component { + Protocol::Ip4(v4addr) => addr = Some(v4addr.into()), + Protocol::Ip6(v6addr) => addr = Some(v6addr.into()), + Protocol::Tcp(tcpport) => port = Some(tcpport), + _ => { + return None; + } + } + } + + if let (Some(addr), Some(port)) = (addr, port) { + Some(Self::from_socket(Scheme::HTTP, SocketAddr::new(addr, port)).unwrap()) + } else { + None + } + } + + /// Creates a new client connected to the endpoint specified in ~/.ipfs/api. + /// + #[inline] + fn from_ipfs_config() -> Option<Self> { + dirs::home_dir() + .map(|home_dir| home_dir.join(".ipfs").join("api")) + .and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok()) + .and_then(|multiaddr_str| parity_multiaddr::from_url(&multiaddr_str).ok()) + .and_then(Self::from_multiaddr) + } +} diff --git a/ipfs-api/src/client/internal.rs b/ipfs-api/src/client/internal.rs new file mode 100644 index 0000000..5667ab2 --- /dev/null +++ b/ipfs-api/src/client/internal.rs @@ -0,0 +1,2020 @@ +// Copyright 2017 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// +use crate::{ + client::TryFromUri, + header::TRAILER, + read::{JsonLineDecoder, LineDecoder, StreamReader}, + request::{self, ApiRequest}, + response::{self, Error}, + Client, Request, Response, +}; +#[cfg(feature = "actix")] +use actix_multipart::client::multipart; +use bytes::Bytes; +use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use http::{ + uri::{InvalidUri, Scheme, Uri}, + StatusCode, +}; +#[cfg(feature = "hyper")] +use hyper::{body, client::Builder}; +#[cfg(feature = "hyper")] +use hyper_multipart::client::multipart; +#[cfg(feature = "hyper")] +use hyper_tls::HttpsConnector; +use serde::{Deserialize, Serialize}; +use serde_json; +#[cfg(feature = "actix")] +use std::time::Duration; +use std::{ + fs::File, + io::{Cursor, Read}, + net::SocketAddr, + path::{Path, PathBuf}, +}; +use tokio_util::codec::{Decoder, FramedRead}; + +const FILE_DESCRIPTOR_LIMIT: usize = 127; + +#[cfg(feature = "actix")] +const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90); + +/// Asynchronous Ipfs client. +/// +#[derive(Clone)] +pub struct IpfsClient { + base: Uri, + client: Client, +} + +impl TryFromUri for IpfsClient { + /// Creates a new `IpfsClient` for any given URI. + /// + fn build_with_base_uri(uri: Uri) -> IpfsClient { + let client = { + #[cfg(feature = "hyper")] + { + Builder::default() + .pool_max_idle_per_host(0) + .build(HttpsConnector::new()) + } + #[cfg(feature = "actix")] + { + Client::default() + } + }; + + IpfsClient { base: uri, client } + } +} + +impl Default for IpfsClient { + /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api. + /// If not found, tries to connect to `localhost:5001`. + /// + fn default() -> IpfsClient { + Self::from_ipfs_config() + .unwrap_or_else(|| Self::from_host_and_port("localhost", 5001).unwrap()) + } +} + +impl IpfsClient { + /// Creates a new `IpfsClient`. + /// + #[deprecated( + since = "0.7.2", + note = "Please use [`client::TryFromUri::from_host_and_port`]" + )] + pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> { + let uri = format!("http://{}:{}", host, port); + + // Using from_str instead of from_host_and_port internally to preserve the error type. + Self::from_str(&uri[..]) + } + + #[deprecated(since = "0.7.2", note = "Please use [`client::TryFromUri::from_uri`]")] + pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> { + Self::from_str(uri) + } + + #[deprecated( + since = "0.7.2", + note = "Please use [`client::TryFromUri::from_socket`]" + )] + pub fn from(socket_addr: SocketAddr) -> IpfsClient { + Self::from_socket(Scheme::HTTP, socket_addr).unwrap() + } +} + +impl IpfsClient { + /// Builds the url for an api call. + /// + fn build_base_request<Req>( + &self, + req: Req, + form: Option<multipart::Form<'static>>, + ) -> Result<Request, Error> + where + Req: ApiRequest + Serialize, + { + let url = format!( + "{}{}?{}", + self.base, + Req::PATH, + ::serde_urlencoded::to_string(req)? + ); + + #[cfg(feature = "hyper")] + { + url.parse::<Uri>().map_err(From::from).and_then(move |url| { + let builder = http::Request::builder().method(http::Method::POST).uri(url); + + let req = if let Some(form) = form { + form.set_body_convert::<hyper::Body, multipart::Body>(builder) + } else { + builder.body(hyper::Body::empty()) + }; + + req.map_err(From::from) + }) + } + #[cfg(feature = "actix")] + { + let req = if let Some(form) = form { + self.client + .post(url) + .timeout(ACTIX_REQUEST_TIMEOUT) + .content_type(form.content_type()) + .send_body(multipart::Body::from(form)) + } else { + self.client.post(url).timeout(ACTIX_REQUEST_TIMEOUT).send() + }; + + Ok(req) + } + } + + /// Builds an Api error from a response body. + /// + #[inline] + fn process_error_from_body(body: Bytes) -> Error { + match serde_json::from_slice(&body) { + Ok(e) => Error::Api(e), + Err(_) => match String::from_utf8(body.to_vec()) { + Ok(s) => Error::Uncategorized(s), + Err(e) => e.into(), + }, + } + } + + /// Processes a response that expects a json encoded body, returning an + /// error or a deserialized json response. + /// + fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Error> + where + for<'de> Res: 'static + Deserialize<'de>, + { + match status { + StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), + _ => Err(Self::process_error_from_body(body)), + } + } + + /// Processes a response that returns a stream of json deserializable + /// results. + /// + fn process_stream_response<D, Res>( + res: Response, + decoder: D, + ) -> impl Stream<Item = Result<Res, Error>> + where + D: Decoder<Item = Res, Error = Error> + Send, + { + #[cfg(feature = "hyper")] + { + FramedRead::new(StreamReader::new(res.into_body()), decoder) + } + #[cfg(feature = "actix")] + { + FramedRead::new(StreamReader::new(res), decoder) + } + } + + /// Generates a request, and returns the unprocessed response future. + /// + async fn request_raw<Req>( + &self, + req: Req, + form: Option<multipart::Form<'static>>, + ) -> Result<(StatusCode, Bytes), Error> + where + Req: ApiRequest + Serialize, + { + let req = self.build_base_request(req, form)?; + + #[cfg(feature = "hyper")] + { + let res = self.client.request(req).await?; + let status = res.status(); + let body = body::to_bytes(res.into_body()).await?; + + Ok((status, body)) + } + #[cfg(feature = "actix")] + { + let mut res = req.await?; + let status = res.status(); + let body = res.body().await?; + + Ok((status, body)) + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// a deserializable response. + /// + async fn request<Req, Res>( + &self, + req: Req, + form: Option<multipart::Form<'static>>, + ) -> Result<Res, Error> + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de>, + { + let (status, chunk) = self.request_raw(req, form).await?; + + IpfsClient::process_json_response(status, chunk) + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a response with no body. + /// + async fn request_empty<Req>( + &self, + req: Req, + form: Option<multipart::Form<'static>>, + ) -> Result<(), Error> + where + Req: ApiRequest + Serialize, + { + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => Ok(()), + _ => Err(Self::process_error_from_body(chunk)), + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw String response. + /// + async fn request_string<Req>( + &self, + req: Req, + form: Option<multipart::Form<'static>>, + ) -> Result<String, Error> + where + Req: ApiRequest + Serialize, + { + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), + _ => Err(Self::process_error_from_body(chunk)), + } + } +} + +impl IpfsClient { + /// Generic method for making a request that expects back a streaming + /// response. + /// + fn request_stream<Res, F, OutStream>( + &self, + req: Request, + process: F, + ) -> impl Stream<Item = Result<Res, Error>> + where + OutStream: Stream<Item = Result<Res, Error>>, + F: 'static + Fn(Response) -> OutStream, + { + #[cfg(feature = "hyper")] + { + self.client + .request(req) + .err_into() + .map_ok(move |res| { + match res.status() { + StatusCode::OK => process(res).right_stream(), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => body::to_bytes(res.into_body()) + .boxed() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream() + .left_stream(), + } + }) + .try_flatten_stream() + } + #[cfg(feature = "actix")] + { + req.err_into() + .map_ok(move |mut res| { + match res.status() { + StatusCode::OK => process(res).right_stream(), + // If the server responded with an error status code, the body + // still needs to be read so an error can be built. This block will + // read the entire body stream, then immediately return an error. + // + _ => res + .body() + .map(|maybe_body| match maybe_body { + Ok(body) => Err(Self::process_error_from_body(body)), + Err(e) => Err(e.into()), + }) + .into_stream() + .left_stream(), + } + }) + .try_flatten_stream() + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw stream of bytes. + /// + fn request_stream_bytes(&self, req: Request) -> impl Stream<Item = Result<Bytes, Error>> { + #[cfg(feature = "hyper")] + { + self.request_stream(req, |res| res.into_body().err_into()) + } + #[cfg(feature = "actix")] + { + self.request_stream(req, |res| res.err_into()) + } + } + + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream_json<Res>(&self, req: Request) -> impl Stream<Item = Result<Res, Error>> + where + for<'de> Res: 'static + Deserialize<'de> + Send, + { + self.request_stream(req, |res| { + let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + if trailer == "X-Stream-Error" { + true + } else { + let err = Error::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return future::err(err).into_stream().left_stream(); + } + } else { + false + }; + + IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) + .right_stream() + }) + } +} + +// Implements a call to the IPFS that returns a streaming body response. +// Implementing this in a macro is necessary because the Rust compiler +// can't reason about the lifetime of the request instance properly. It +// thinks that the request needs to live as long as the returned stream, +// but in reality, the request instance is only used to build the Hyper +// or Actix request. +// +macro_rules! impl_stream_api_response { + (($self:ident, $req:expr, $form:expr) => $call:ident) => { + impl_stream_api_response! { + ($self, $req, $form) |req| => { $self.$call(req) } + } + }; + (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => { + match $self.build_base_request($req, $form) { + Ok($var) => $impl.right_stream(), + Err(e) => return future::err(e).into_stream().left_stream(), + } + }; +} + +impl IpfsClient { + /// Add file to Ipfs. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let data = Cursor::new("Hello World!"); + /// let res = client.add(data); + /// ``` + /// + #[inline] + pub async fn add<R>(&self, data: R) -> Result<response::AddResponse, Error> + where + R: 'static + Read + Send + Sync, + { + let mut form = multipart::Form::default(); + + form.add_reader("path", data); + + self.request(request::Add, Some(form)).await + } + + /// Add a path to Ipfs. Can be a file or directory. + /// A hard limit of 128 open file descriptors is set such + /// that any small additional files are stored in-memory. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let path = "./src"; + /// let res = client.add_path(path); + /// ``` + /// + #[inline] + pub async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Error> + where + P: AsRef<Path>, + { + let prefix = path.as_ref().parent(); + let mut paths_to_add: Vec<(PathBuf, u64)> = vec![]; + + for path in walkdir::WalkDir::new(path.as_ref()) { + match path { + Ok(entry) if entry.file_type().is_file() => { + if entry.file_type().is_file() { + let file_size = entry + .metadata() + .map(|metadata| metadata.len()) + .map_err(|e| Error::Io(e.into()))?; + + paths_to_add.push((entry.path().to_path_buf(), file_size)); + } + } + Ok(_) => (), + Err(err) => return Err(Error::Io(err.into())), + } + } + + paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse()); + + let mut it = 0; + let mut form = multipart::Form::default(); + + for (path, file_size) in paths_to_add { + let mut file = File::open(&path)?; + let file_name = match prefix { + Some(prefix) => path.strip_prefix(prefix).unwrap(), + None => path.as_path(), + } + .to_string_lossy(); + + if it < FILE_DESCRIPTOR_LIMIT { + form.add_reader_file("path", file, file_name); + + it += 1; + } else { + let mut buf = Vec::with_capacity(file_size as usize); + let _ = file.read_to_end(&mut buf)?; + + form.add_reader_file("path", Cursor::new(buf), file_name); + } + } + + let req = self.build_base_request(request::Add, Some(form))?; + + self.request_stream_json(req).try_collect().await + } + + /// Returns the current ledger for a peer. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"); + /// ``` + /// + #[inline] + pub async fn bitswap_ledger( + &self, + peer: &str, + ) -> Result<response::BitswapLedgerResponse, Error> { + self.request(request::BitswapLedger { peer }, None).await + } + + /// Triggers a reprovide. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_reprovide(); + /// ``` + /// + #[inline] + pub async fn bitswap_reprovide(&self) -> Result<response::BitswapReprovideResponse, Error> { + self.request_empty(request::BitswapReprovide, None).await + } + + /// Returns some stats about the bitswap agent. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_stat(); + /// ``` + /// + #[inline] + pub async fn bitswap_stat(&self) -> Result<response::BitswapStatResponse, Error> { + self.request(request::BitswapStat, None).await + } + + /// Remove a given block from your wantlist. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + #[inline] + pub async fn bitswap_unwant( + &self, + key: &str, + ) -> Result<response::BitswapUnwantResponse, Error> { + self.request_empty(request::BitswapUnwant { key }, None) + .await + } + + /// Shows blocks on the wantlist for you or the specified peer. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_wantlist( + /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ") + /// ); + /// ``` + /// + #[inline] + pub async fn bitswap_wantlist( + &self, + peer: Option<&str>, + ) -> Result<response::BitswapWantlistResponse, Error> { + self.request(request::BitswapWantlist { peer }, None).await + } + + /// Gets a raw IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client + /// .block_get(hash) + /// .map_ok(|chunk| chunk.to_vec()) + /// .try_concat(); + /// ``` + /// + #[inline] + pub fn block_get(&self, hash: &str) -> impl Stream<Item = Result<Bytes, Error>> { + impl_stream_api_response! { + (self, request::BlockGet { hash }, None) => request_stream_bytes + } + } + + /// Store input as an IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let data = Cursor::new("Hello World!"); + /// let res = client.block_put(data); + /// ``` + /// + #[inline] + pub async fn block_put<R>(&self, data: R) -> Result<response::BlockPutResponse, Error> + where + R: 'static + Read + Send + Sync, + { + let mut form = multipart::Form::default(); + + form.add_reader("data", data); + + self.request(request::BlockPut, Some(form)).await + } + + /// Removes an IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + #[inline] + pub async fn block_rm(&self, hash: &str) -> Result<response::BlockRmResponse, Error> { + self.request(request::BlockRm { hash }, None).await + } + + /// Prints information about a raw IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + #[inline] + pub async fn block_stat(&self, hash: &str) -> Result<response::BlockStatResponse, Error> { + self.request(request::BlockStat { hash }, None).await + } + + /// Add default peers to the bootstrap list. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bootstrap_add_default(); + /// ``` + /// + #[inline] + pub async fn bootstrap_add_default( + &self, + ) -> Result<response::BootstrapAddDefaultResponse, Error> { + self.request(request::BootstrapAddDefault, None).await + } + + /// Lists peers in bootstrap list. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bootstrap_list(); + /// ``` + /// + #[inline] + pub async fn bootstrap_list(&self) -> Result<response::BootstrapListResponse, Error> { + self.request(request::BootstrapList, None).await + } + + /// Removes all peers in bootstrap list. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bootstrap_rm_all(); + /// ``` + /// + #[inline] + pub async fn bootstrap_rm_all(&self) -> Result<response::BootstrapRmAllResponse, Error> { + self.request(request::BootstrapRmAll, None).await + } + + /// Returns the contents of an Ipfs object. + /// + /// # Examples + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client + /// .cat(hash) + /// .map_ok(|chunk| chunk.to_vec()) + /// .try_concat(); + /// ``` + /// + #[inline] + pub fn cat(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> { + impl_stream_api_response! { + (self, request::Cat { path }, None) => request_stream_bytes + } + } + + /// List available commands that the server accepts. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.commands(); + /// ``` + /// + #[inline] + pub async fn commands(&self) -> Result<response::CommandsResponse, Error> { + self.request(request::Commands, None).await + } + + /// Opens the config file for editing (on the server). + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_edit(); + /// ``` + /// + #[inline] + pub async fn config_edit(&self) -> Result<response::ConfigEditResponse, Error> { + self.request(request::ConfigEdit, None).await + } + + /// Replace the config file. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let config = Cursor::new("{..json..}"); + /// let res = client.config_replace(config); + /// ``` + /// + #[inline] + pub async fn config_replace<R>(&self, data: R) -> Result<response::ConfigReplaceResponse, Error> + where + R: 'static + Read + Send + Sync, + { + let mut form = multipart::Form::default(); + + form.add_reader("file", data); + + self.request_empty(request::ConfigReplace, Some(form)).await + } + + /// Show the current config of the server. + /// + /// Returns an unparsed json string, due to an unclear spec. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_show(); + /// ``` + /// + #[inline] + pub async fn config_show(&self) -> Result<response::ConfigShowResponse, Error> { + self.request_string(request::ConfigShow, None).await + } + + /// Returns information about a dag node in Ipfs. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.dag_get("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + #[inline] + pub async fn dag_get(&self, path: &str) -> Result<response::DagGetResponse, Error> { + self.request(request::DagGet { path }, None).await + } + + // TODO /dag routes are experimental, and there isn't a whole lot of + // documentation available for how this route works. + // + // /// Add a DAG node to Ipfs. + // /// + // #[inline] + // pub fn dag_put<R>(&self, data: R) -> AsyncResponse<response::DagPutResponse> + // where + // R: 'static + Read + Send, + // { + // let mut form = multipart::Form::default(); + // + // form.add_reader("arg", data); + // + // self.request(&request::DagPut, Some(form)) + // } + + // TODO /dag/resolve + + /// Query the DHT for all of the multiaddresses associated with a Peer ID. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; + /// let res = client.dht_findpeer(peer).try_collect::<Vec<_>>(); + /// ``` + /// + #[inline] + pub fn dht_findpeer( + &self, + peer: &str, + ) -> impl Stream<Item = Result<response::DhtFindPeerResponse, Error>> { + impl_stream_api_response! { + (self, request::DhtFindPeer { peer }, None) => request_stream_json + } + } + + /// Find peers in the DHT that can provide a specific value given a key. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client.dht_findprovs(key).try_collect::<Vec<_>>(); + /// ``` + /// + #[inline] + pub fn dht_findprovs( + &self, + key: &str, + ) -> impl Stream<Item = Result<response::DhtFindProvsResponse, Error>> { + impl_stream_api_response! { + (self, request::DhtFindProvs { key }, None) => request_stream_json + } + } + + /// Query the DHT for a given key. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client.dht_get(key).try_collect::<Vec<_>>(); + /// ``` + /// + #[inline] + pub fn dht_get( + &self, + key: &str, + ) -> impl Stream<Item = Result<response::DhtGetResponse, Error>> { + impl_stream_api_response! { + (self, request::DhtGet { key }, None) => request_stream_json + } + } + + /// Announce to the network that you are providing a given value. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client.dht_provide(key).try_collect::<Vec<_>>(); + /// ``` + /// + #[inline] + pub fn dht_provide( + &self, + key: &str, + ) -> impl Stream<Item = Result<response::DhtProvideResponse, Error>> { + impl_stream_api_response! { + (self, request::DhtProvide { key }, None) => request_stream_json + } + } + + /// Write a key/value pair to the DHT. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.dht_put("test", "Hello World!").try_collect::<Vec<_ |