summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/client/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api/src/client/mod.rs')
-rw-r--r--ipfs-api/src/client/mod.rs2037
1 files changed, 2037 insertions, 0 deletions
diff --git a/ipfs-api/src/client/mod.rs b/ipfs-api/src/client/mod.rs
new file mode 100644
index 0000000..0cd7796
--- /dev/null
+++ b/ipfs-api/src/client/mod.rs
@@ -0,0 +1,2037 @@
+// Copyright 2017 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+use crate::{
+ header::TRAILER,
+ read::{JsonLineDecoder, LineDecoder, StreamReader},
+ request::{self, ApiRequest},
+ response::{self, Error},
+ Client, Request, Response,
+};
+#[cfg(feature = "actix")]
+use actix_multipart::client::multipart;
+use bytes::Bytes;
+use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
+use http::{
+ uri::{InvalidUri, Uri},
+ StatusCode,
+};
+#[cfg(feature = "hyper")]
+use hyper::{body, client::Builder};
+#[cfg(feature = "hyper")]
+use hyper_multipart::client::multipart;
+#[cfg(feature = "hyper")]
+use hyper_tls::HttpsConnector;
+use parity_multiaddr::Protocol;
+use serde::{Deserialize, Serialize};
+use serde_json;
+#[cfg(feature = "actix")]
+use std::time::Duration;
+use std::{
+ fs::{self, File},
+ io::{Cursor, Read},
+ net::{IpAddr, SocketAddr},
+ path::{Path, PathBuf},
+};
+use tokio_util::codec::{Decoder, FramedRead};
+
+const FILE_DESCRIPTOR_LIMIT: usize = 127;
+
+#[cfg(feature = "actix")]
+const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
+
+/// Asynchronous Ipfs client.
+///
+#[derive(Clone)]
+pub struct IpfsClient {
+ base: Uri,
+ client: Client,
+}
+
+impl Default for IpfsClient {
+ /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api.
+ /// If not found, tries to connect to `localhost:5001`.
+ ///
+ fn default() -> IpfsClient {
+ dirs::home_dir()
+ .map(|home_dir| home_dir.join(".ipfs").join("api"))
+ .and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok())
+ .and_then(|multiaddr_str| parity_multiaddr::from_url(&multiaddr_str).ok())
+ .and_then(|multiaddr| {
+ let mut addr: Option<IpAddr> = None;
+ let mut port: Option<u16> = None;
+ for addr_component in multiaddr.iter() {
+ match addr_component {
+ Protocol::Ip4(v4addr) => addr = Some(v4addr.into()),
+ Protocol::Ip6(v6addr) => addr = Some(v6addr.into()),
+ Protocol::Tcp(tcpport) => port = Some(tcpport),
+ _ => {
+ return None;
+ }
+ }
+ }
+ if let (Some(addr), Some(port)) = (addr, port) {
+ Some(SocketAddr::new(addr, port))
+ } else {
+ None
+ }
+ })
+ .map(IpfsClient::from)
+ .unwrap_or_else(|| IpfsClient::new("localhost", 5001).unwrap())
+ }
+}
+
+impl From<SocketAddr> for IpfsClient {
+ fn from(socket_addr: SocketAddr) -> Self {
+ IpfsClient::new(&socket_addr.ip().to_string(), socket_addr.port()).unwrap()
+ }
+}
+
+impl IpfsClient {
+ /// Creates a new `IpfsClient`.
+ ///
+ #[inline]
+ pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> {
+ Self::new_from_uri(format!("http://{}:{}", host, port).as_str())
+ }
+
+ /// Creates a new `IpfsClient` for any given URI.
+ ///
+ #[inline]
+ pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> {
+ let base_path = IpfsClient::build_base_path(uri)?;
+ let client = {
+ #[cfg(feature = "hyper")]
+ {
+ Builder::default()
+ .pool_max_idle_per_host(0)
+ .build(HttpsConnector::new())
+ }
+ #[cfg(feature = "actix")]
+ {
+ Client::default()
+ }
+ };
+
+ Ok(IpfsClient {
+ base: base_path,
+ client,
+ })
+ }
+
+ /// Builds the base url path for the Ipfs api.
+ ///
+ fn build_base_path(uri: &str) -> Result<Uri, InvalidUri> {
+ format!("{}/api/v0", uri).parse()
+ }
+
+ /// Builds the url for an api call.
+ ///
+ fn build_base_request<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<Request, Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let url = format!(
+ "{}{}?{}",
+ self.base,
+ Req::PATH,
+ ::serde_urlencoded::to_string(req)?
+ );
+
+ #[cfg(feature = "hyper")]
+ {
+ url.parse::<Uri>().map_err(From::from).and_then(move |url| {
+ let builder = http::Request::builder().method(http::Method::POST).uri(url);
+
+ let req = if let Some(form) = form {
+ form.set_body_convert::<hyper::Body, multipart::Body>(builder)
+ } else {
+ builder.body(hyper::Body::empty())
+ };
+
+ req.map_err(From::from)
+ })
+ }
+ #[cfg(feature = "actix")]
+ {
+ let req = if let Some(form) = form {
+ self.client
+ .post(url)
+ .timeout(ACTIX_REQUEST_TIMEOUT)
+ .content_type(form.content_type())
+ .send_body(multipart::Body::from(form))
+ } else {
+ self.client.post(url).timeout(ACTIX_REQUEST_TIMEOUT).send()
+ };
+
+ Ok(req)
+ }
+ }
+
+ /// Builds an Api error from a response body.
+ ///
+ #[inline]
+ fn process_error_from_body(body: Bytes) -> Error {
+ match serde_json::from_slice(&body) {
+ Ok(e) => Error::Api(e),
+ Err(_) => match String::from_utf8(body.to_vec()) {
+ Ok(s) => Error::Uncategorized(s),
+ Err(e) => e.into(),
+ },
+ }
+ }
+
+ /// Processes a response that expects a json encoded body, returning an
+ /// error or a deserialized json response.
+ ///
+ fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Error>
+ where
+ for<'de> Res: 'static + Deserialize<'de>,
+ {
+ match status {
+ StatusCode::OK => serde_json::from_slice(&body).map_err(From::from),
+ _ => Err(Self::process_error_from_body(body)),
+ }
+ }
+
+ /// Processes a response that returns a stream of json deserializable
+ /// results.
+ ///
+ fn process_stream_response<D, Res>(
+ res: Response,
+ decoder: D,
+ ) -> impl Stream<Item = Result<Res, Error>>
+ where
+ D: Decoder<Item = Res, Error = Error> + Send,
+ {
+ #[cfg(feature = "hyper")]
+ {
+ FramedRead::new(StreamReader::new(res.into_body()), decoder)
+ }
+ #[cfg(feature = "actix")]
+ {
+ FramedRead::new(StreamReader::new(res), decoder)
+ }
+ }
+
+ /// Generates a request, and returns the unprocessed response future.
+ ///
+ async fn request_raw<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<(StatusCode, Bytes), Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let req = self.build_base_request(req, form)?;
+
+ #[cfg(feature = "hyper")]
+ {
+ let res = self.client.request(req).await?;
+ let status = res.status();
+ let body = body::to_bytes(res.into_body()).await?;
+
+ Ok((status, body))
+ }
+ #[cfg(feature = "actix")]
+ {
+ let mut res = req.await?;
+ let status = res.status();
+ let body = res.body().await?;
+
+ Ok((status, body))
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// a deserializable response.
+ ///
+ async fn request<Req, Res>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<Res, Error>
+ where
+ Req: ApiRequest + Serialize,
+ for<'de> Res: 'static + Deserialize<'de>,
+ {
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ IpfsClient::process_json_response(status, chunk)
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a response with no body.
+ ///
+ async fn request_empty<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<(), Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw String response.
+ ///
+ async fn request_string<Req>(
+ &self,
+ req: Req,
+ form: Option<multipart::Form<'static>>,
+ ) -> Result<String, Error>
+ where
+ Req: ApiRequest + Serialize,
+ {
+ let (status, chunk) = self.request_raw(req, form).await?;
+
+ match status {
+ StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
+ _ => Err(Self::process_error_from_body(chunk)),
+ }
+ }
+}
+
+impl IpfsClient {
+ /// Generic method for making a request that expects back a streaming
+ /// response.
+ ///
+ fn request_stream<Res, F, OutStream>(
+ &self,
+ req: Request,
+ process: F,
+ ) -> impl Stream<Item = Result<Res, Error>>
+ where
+ OutStream: Stream<Item = Result<Res, Error>>,
+ F: 'static + Fn(Response) -> OutStream,
+ {
+ #[cfg(feature = "hyper")]
+ {
+ self.client
+ .request(req)
+ .err_into()
+ .map_ok(move |res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => body::to_bytes(res.into_body())
+ .boxed()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream()
+ }
+ #[cfg(feature = "actix")]
+ {
+ req.err_into()
+ .map_ok(move |mut res| {
+ match res.status() {
+ StatusCode::OK => process(res).right_stream(),
+ // If the server responded with an error status code, the body
+ // still needs to be read so an error can be built. This block will
+ // read the entire body stream, then immediately return an error.
+ //
+ _ => res
+ .body()
+ .map(|maybe_body| match maybe_body {
+ Ok(body) => Err(Self::process_error_from_body(body)),
+ Err(e) => Err(e.into()),
+ })
+ .into_stream()
+ .left_stream(),
+ }
+ })
+ .try_flatten_stream()
+ }
+ }
+
+ /// Generic method for making a request to the Ipfs server, and getting
+ /// back a raw stream of bytes.
+ ///
+ fn request_stream_bytes(&self, req: Request) -> impl Stream<Item = Result<Bytes, Error>> {
+ #[cfg(feature = "hyper")]
+ {
+ self.request_stream(req, |res| res.into_body().err_into())
+ }
+ #[cfg(feature = "actix")]
+ {
+ self.request_stream(req, |res| res.err_into())
+ }
+ }
+
+ /// Generic method to return a streaming response of deserialized json
+ /// objects delineated by new line separators.
+ ///
+ fn request_stream_json<Res>(&self, req: Request) -> impl Stream<Item = Result<Res, Error>>
+ where
+ for<'de> Res: 'static + Deserialize<'de> + Send,
+ {
+ self.request_stream(req, |res| {
+ let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
+ // Response has the Trailer header set. The StreamError trailer
+ // is used to indicate that there was an error while streaming
+ // data with Ipfs.
+ //
+ if trailer == "X-Stream-Error" {
+ true
+ } else {
+ let err = Error::UnrecognizedTrailerHeader(
+ String::from_utf8_lossy(trailer.as_ref()).into(),
+ );
+
+ // There was an unrecognized trailer value. If that is the case,
+ // create a stream that immediately errors.
+ //
+ return future::err(err).into_stream().left_stream();
+ }
+ } else {
+ false
+ };
+
+ IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
+ .right_stream()
+ })
+ }
+}
+
+// Implements a call to the IPFS that returns a streaming body response.
+// Implementing this in a macro is necessary because the Rust compiler
+// can't reason about the lifetime of the request instance properly. It
+// thinks that the request needs to live as long as the returned stream,
+// but in reality, the request instance is only used to build the Hyper
+// or Actix request.
+//
+macro_rules! impl_stream_api_response {
+ (($self:ident, $req:expr, $form:expr) => $call:ident) => {
+ impl_stream_api_response! {
+ ($self, $req, $form) |req| => { $self.$call(req) }
+ }
+ };
+ (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => {
+ match $self.build_base_request($req, $form) {
+ Ok($var) => $impl.right_stream(),
+ Err(e) => return future::err(e).into_stream().left_stream(),
+ }
+ };
+}
+
+impl IpfsClient {
+ /// Add file to Ipfs.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ /// use std::io::Cursor;
+ ///
+ /// let client = IpfsClient::default();
+ /// let data = Cursor::new("Hello World!");
+ /// let res = client.add(data);
+ /// ```
+ ///
+ #[inline]
+ pub async fn add<R>(&self, data: R) -> Result<response::AddResponse, Error>
+ where
+ R: 'static + Read + Send + Sync,
+ {
+ let mut form = multipart::Form::default();
+
+ form.add_reader("path", data);
+
+ self.request(request::Add, Some(form)).await
+ }
+
+ /// Add a path to Ipfs. Can be a file or directory.
+ /// A hard limit of 128 open file descriptors is set such
+ /// that any small additional files are stored in-memory.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let path = "./src";
+ /// let res = client.add_path(path);
+ /// ```
+ ///
+ #[inline]
+ pub async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Error>
+ where
+ P: AsRef<Path>,
+ {
+ let prefix = path.as_ref().parent();
+ let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
+
+ for path in walkdir::WalkDir::new(path.as_ref()) {
+ match path {
+ Ok(entry) if entry.file_type().is_file() => {
+ if entry.file_type().is_file() {
+ let file_size = entry
+ .metadata()
+ .map(|metadata| metadata.len())
+ .map_err(|e| Error::Io(e.into()))?;
+
+ paths_to_add.push((entry.path().to_path_buf(), file_size));
+ }
+ }
+ Ok(_) => (),
+ Err(err) => return Err(Error::Io(err.into())),
+ }
+ }
+
+ paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
+
+ let mut it = 0;
+ let mut form = multipart::Form::default();
+
+ for (path, file_size) in paths_to_add {
+ let mut file = File::open(&path)?;
+ let file_name = match prefix {
+ Some(prefix) => path.strip_prefix(prefix).unwrap(),
+ None => path.as_path(),
+ }
+ .to_string_lossy();
+
+ if it < FILE_DESCRIPTOR_LIMIT {
+ form.add_reader_file("path", file, file_name);
+
+ it += 1;
+ } else {
+ let mut buf = Vec::with_capacity(file_size as usize);
+ let _ = file.read_to_end(&mut buf)?;
+
+ form.add_reader_file("path", Cursor::new(buf), file_name);
+ }
+ }
+
+ let req = self.build_base_request(request::Add, Some(form))?;
+
+ self.request_stream_json(req).try_collect().await
+ }
+
+ /// Returns the current ledger for a peer.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ");
+ /// ```
+ ///
+ #[inline]
+ pub async fn bitswap_ledger(
+ &self,
+ peer: &str,
+ ) -> Result<response::BitswapLedgerResponse, Error> {
+ self.request(request::BitswapLedger { peer }, None).await
+ }
+
+ /// Triggers a reprovide.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bitswap_reprovide();
+ /// ```
+ ///
+ #[inline]
+ pub async fn bitswap_reprovide(&self) -> Result<response::BitswapReprovideResponse, Error> {
+ self.request_empty(request::BitswapReprovide, None).await
+ }
+
+ /// Returns some stats about the bitswap agent.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bitswap_stat();
+ /// ```
+ ///
+ #[inline]
+ pub async fn bitswap_stat(&self) -> Result<response::BitswapStatResponse, Error> {
+ self.request(request::BitswapStat, None).await
+ }
+
+ /// Remove a given block from your wantlist.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
+ /// ```
+ ///
+ #[inline]
+ pub async fn bitswap_unwant(
+ &self,
+ key: &str,
+ ) -> Result<response::BitswapUnwantResponse, Error> {
+ self.request_empty(request::BitswapUnwant { key }, None)
+ .await
+ }
+
+ /// Shows blocks on the wantlist for you or the specified peer.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bitswap_wantlist(
+ /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")
+ /// );
+ /// ```
+ ///
+ #[inline]
+ pub async fn bitswap_wantlist(
+ &self,
+ peer: Option<&str>,
+ ) -> Result<response::BitswapWantlistResponse, Error> {
+ self.request(request::BitswapWantlist { peer }, None).await
+ }
+
+ /// Gets a raw IPFS block.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA";
+ /// let res = client
+ /// .block_get(hash)
+ /// .map_ok(|chunk| chunk.to_vec())
+ /// .try_concat();
+ /// ```
+ ///
+ #[inline]
+ pub fn block_get(&self, hash: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::BlockGet { hash }, None) => request_stream_bytes
+ }
+ }
+
+ /// Store input as an IPFS block.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ /// use std::io::Cursor;
+ ///
+ /// let client = IpfsClient::default();
+ /// let data = Cursor::new("Hello World!");
+ /// let res = client.block_put(data);
+ /// ```
+ ///
+ #[inline]
+ pub async fn block_put<R>(&self, data: R) -> Result<response::BlockPutResponse, Error>
+ where
+ R: 'static + Read + Send + Sync,
+ {
+ let mut form = multipart::Form::default();
+
+ form.add_reader("data", data);
+
+ self.request(request::BlockPut, Some(form)).await
+ }
+
+ /// Removes an IPFS block.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
+ /// ```
+ ///
+ #[inline]
+ pub async fn block_rm(&self, hash: &str) -> Result<response::BlockRmResponse, Error> {
+ self.request(request::BlockRm { hash }, None).await
+ }
+
+ /// Prints information about a raw IPFS block.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
+ /// ```
+ ///
+ #[inline]
+ pub async fn block_stat(&self, hash: &str) -> Result<response::BlockStatResponse, Error> {
+ self.request(request::BlockStat { hash }, None).await
+ }
+
+ /// Add default peers to the bootstrap list.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bootstrap_add_default();
+ /// ```
+ ///
+ #[inline]
+ pub async fn bootstrap_add_default(
+ &self,
+ ) -> Result<response::BootstrapAddDefaultResponse, Error> {
+ self.request(request::BootstrapAddDefault, None).await
+ }
+
+ /// Lists peers in bootstrap list.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bootstrap_list();
+ /// ```
+ ///
+ #[inline]
+ pub async fn bootstrap_list(&self) -> Result<response::BootstrapListResponse, Error> {
+ self.request(request::BootstrapList, None).await
+ }
+
+ /// Removes all peers in bootstrap list.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.bootstrap_rm_all();
+ /// ```
+ ///
+ #[inline]
+ pub async fn bootstrap_rm_all(&self) -> Result<response::BootstrapRmAllResponse, Error> {
+ self.request(request::BootstrapRmAll, None).await
+ }
+
+ /// Returns the contents of an Ipfs object.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA";
+ /// let res = client
+ /// .cat(hash)
+ /// .map_ok(|chunk| chunk.to_vec())
+ /// .try_concat();
+ /// ```
+ ///
+ #[inline]
+ pub fn cat(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
+ impl_stream_api_response! {
+ (self, request::Cat { path }, None) => request_stream_bytes
+ }
+ }
+
+ /// List available commands that the server accepts.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.commands();
+ /// ```
+ ///
+ #[inline]
+ pub async fn commands(&self) -> Result<response::CommandsResponse, Error> {
+ self.request(request::Commands, None).await
+ }
+
+ /// Opens the config file for editing (on the server).
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.config_edit();
+ /// ```
+ ///
+ #[inline]
+ pub async fn config_edit(&self) -> Result<response::ConfigEditResponse, Error> {
+ self.request(request::ConfigEdit, None).await
+ }
+
+ /// Replace the config file.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ /// use std::io::Cursor;
+ ///
+ /// let client = IpfsClient::default();
+ /// let config = Cursor::new("{..json..}");
+ /// let res = client.config_replace(config);
+ /// ```
+ ///
+ #[inline]
+ pub async fn config_replace<R>(&self, data: R) -> Result<response::ConfigReplaceResponse, Error>
+ where
+ R: 'static + Read + Send + Sync,
+ {
+ let mut form = multipart::Form::default();
+
+ form.add_reader("file", data);
+
+ self.request_empty(request::ConfigReplace, Some(form)).await
+ }
+
+ /// Show the current config of the server.
+ ///
+ /// Returns an unparsed json string, due to an unclear spec.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.config_show();
+ /// ```
+ ///
+ #[inline]
+ pub async fn config_show(&self) -> Result<response::ConfigShowResponse, Error> {
+ self.request_string(request::ConfigShow, None).await
+ }
+
+ /// Returns information about a dag node in Ipfs.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.dag_get("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA");
+ /// ```
+ ///
+ #[inline]
+ pub async fn dag_get(&self, path: &str) -> Result<response::DagGetResponse, Error> {
+ self.request(request::DagGet { path }, None).await
+ }
+
+ // TODO /dag routes are experimental, and there isn't a whole lot of
+ // documentation available for how this route works.
+ //
+ // /// Add a DAG node to Ipfs.
+ // ///
+ // #[inline]
+ // pub fn dag_put<R>(&self, data: R) -> AsyncResponse<response::DagPutResponse>
+ // where
+ // R: 'static + Read + Send,
+ // {
+ // let mut form = multipart::Form::default();
+ //
+ // form.add_reader("arg", data);
+ //
+ // self.request(&request::DagPut, Some(form))
+ // }
+
+ // TODO /dag/resolve
+
+ /// Query the DHT for all of the multiaddresses associated with a Peer ID.
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM";
+ /// let res = client.dht_findpeer(peer).try_collect::<Vec<_>>();
+ /// ```
+ ///
+ #[inline]
+ pub fn dht_findpeer(
+ &self,
+ peer: &str,
+ ) -> impl Stream<Item = Result<response::DhtFindPeerResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtFindPeer { peer }, None) => request_stream_json
+ }
+ }
+
+ /// Find peers in the DHT that can provide a specific value given a key.
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA";
+ /// let res = client.dht_findprovs(key).try_collect::<Vec<_>>();
+ /// ```
+ ///
+ #[inline]
+ pub fn dht_findprovs(
+ &self,
+ key: &str,
+ ) -> impl Stream<Item = Result<response::DhtFindProvsResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtFindProvs { key }, None) => request_stream_json
+ }
+ }
+
+ /// Query the DHT for a given key.
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA";
+ /// let res = client.dht_get(key).try_collect::<Vec<_>>();
+ /// ```
+ ///
+ #[inline]
+ pub fn dht_get(
+ &self,
+ key: &str,
+ ) -> impl Stream<Item = Result<response::DhtGetResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtGet { key }, None) => request_stream_json
+ }
+ }
+
+ /// Announce to the network that you are providing a given value.
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA";
+ /// let res = client.dht_provide(key).try_collect::<Vec<_>>();
+ /// ```
+ ///
+ #[inline]
+ pub fn dht_provide(
+ &self,
+ key: &str,
+ ) -> impl Stream<Item = Result<response::DhtProvideResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtProvide { key }, None) => request_stream_json
+ }
+ }
+
+ /// Write a key/value pair to the DHT.
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.dht_put("test", "Hello World!").try_collect::<Vec<_>>();
+ /// ```
+ ///
+ #[inline]
+ pub fn dht_put(
+ &self,
+ key: &str,
+ value: &str,
+ ) -> impl Stream<Item = Result<response::DhtPutResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtPut { key, value }, None) => request_stream_json
+ }
+ }
+
+ /// Find the closest peer given the peer ID by querying the DHT.
+ ///
+ /// ```no_run
+ /// use futures::TryStreamExt;
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM";
+ /// let res = client.dht_query(peer).try_collect::<Vec<_>>();
+ /// ```
+ ///
+ #[inline]
+ pub fn dht_query(
+ &self,
+ peer: &str,
+ ) -> impl Stream<Item = Result<response::DhtQueryResponse, Error>> {
+ impl_stream_api_response! {
+ (self, request::DhtQuery { peer }, None) => request_stream_json
+ }
+ }
+
+ /// Clear inactive requests from the log.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.diag_cmds_clear();
+ /// ```
+ ///
+ #[inline]
+ pub async fn diag_cmds_clear(&self) -> Result<response::DiagCmdsClearResponse, Error> {
+ self.request_empty(request::DiagCmdsClear, None).await
+ }
+
+ /// Set how long to keep inactive requests in the log.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.diag_cmds_set_time("1m");
+ /// ```
+ ///
+ #[inline]
+ pub async fn diag_cmds_set_time(
+ &self,
+ time: &str,
+ ) -> Result<response::DiagCmdsSetTimeResponse, Error> {
+ self.request_empty(request::DiagCmdsSetTime { time }, None)
+ .await
+ }
+
+ /// Print system diagnostic information.
+ ///
+ /// Note: There isn't good documentation on what this call is supposed to return.
+ /// It might be platform dependent, but if it isn't, this can be fixed to return
+ /// an actual object.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.diag_sys();
+ /// ```
+ ///
+ #[inline]
+ pub async fn diag_sys(&self) -> Result<response::DiagSysResponse, Error> {
+ self.request_string(request::DiagSys, None).await
+ }
+
+ /// Resolve DNS link.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.dns("ipfs.io", true);
+ /// ```
+ ///
+ #[inline]
+ pub async fn dns(&self, link: &str, recursive: bool) -> Result<response::DnsResponse, Error> {
+ self.request(request::Dns { link, recursive }, None).await
+ }
+
+ /// List directory for Unix filesystem objects.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.file_ls("/ipns/ipfs.io");
+ /// ```
+ ///
+ #[inline]
+ pub async fn file_ls(&self, path: &str) -> Result<response::FileLsResponse, Error> {
+ self.request(request::FileLs { path }, None).await
+ }
+
+ /// Copy files into MFS.
+ ///
+ /// ```no_run
+ /// use ipfs_api::IpfsClient;
+ ///
+ /// let client = IpfsClient::default();
+ /// let res = client.files_cp("/path/to/file", "/dest");
+ /// ```
+ ///
+ #[inline]
+ pub async fn files_cp(
+ &self,