// Copyright 2017 rust-ipfs-api Developers // // Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be // copied, modified, or distributed except according to those terms. // use crate::{ header::TRAILER, read::{JsonLineDecoder, LineDecoder, StreamReader}, request::{self, ApiRequest}, response::{self, Error}, Client, Request, Response, }; #[cfg(feature = "actix")] use actix_multipart::client::multipart; use bytes::Bytes; use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use http::{ uri::{InvalidUri, Uri}, StatusCode, }; #[cfg(feature = "hyper")] use hyper::{body, client::Builder}; #[cfg(feature = "hyper")] use hyper_multipart::client::multipart; #[cfg(feature = "hyper")] use hyper_tls::HttpsConnector; use parity_multiaddr::Protocol; use serde::{Deserialize, Serialize}; use serde_json; #[cfg(feature = "actix")] use std::time::Duration; use std::{ fs::{self, File}, io::{Cursor, Read}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, }; use tokio_util::codec::{Decoder, FramedRead}; const FILE_DESCRIPTOR_LIMIT: usize = 127; #[cfg(feature = "actix")] const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90); /// Asynchronous Ipfs client. /// #[derive(Clone)] pub struct IpfsClient { base: Uri, client: Client, } impl Default for IpfsClient { /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api. /// If not found, tries to connect to `localhost:5001`. /// fn default() -> IpfsClient { dirs::home_dir() .map(|home_dir| home_dir.join(".ipfs").join("api")) .and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok()) .and_then(|multiaddr_str| parity_multiaddr::from_url(&multiaddr_str).ok()) .and_then(|multiaddr| { let mut addr: Option = None; let mut port: Option = None; for addr_component in multiaddr.iter() { match addr_component { Protocol::Ip4(v4addr) => addr = Some(v4addr.into()), Protocol::Ip6(v6addr) => addr = Some(v6addr.into()), Protocol::Tcp(tcpport) => port = Some(tcpport), _ => { return None; } } } if let (Some(addr), Some(port)) = (addr, port) { Some(SocketAddr::new(addr, port)) } else { None } }) .map(IpfsClient::from) .unwrap_or_else(|| IpfsClient::new("localhost", 5001).unwrap()) } } impl From for IpfsClient { fn from(socket_addr: SocketAddr) -> Self { IpfsClient::new(&socket_addr.ip().to_string(), socket_addr.port()).unwrap() } } impl IpfsClient { /// Creates a new `IpfsClient`. /// #[inline] pub fn new(host: &str, port: u16) -> Result { Self::new_from_uri(format!("http://{}:{}", host, port).as_str()) } /// Creates a new `IpfsClient` for any given URI. /// #[inline] pub fn new_from_uri(uri: &str) -> Result { let base_path = IpfsClient::build_base_path(uri)?; let client = { #[cfg(feature = "hyper")] { Builder::default() .pool_max_idle_per_host(0) .build(HttpsConnector::new()) } #[cfg(feature = "actix")] { Client::default() } }; Ok(IpfsClient { base: base_path, client, }) } /// Builds the base url path for the Ipfs api. /// fn build_base_path(uri: &str) -> Result { format!("{}/api/v0", uri).parse() } /// Builds the url for an api call. /// fn build_base_request( &self, req: Req, form: Option>, ) -> Result where Req: ApiRequest + Serialize, { let url = format!( "{}{}?{}", self.base, Req::PATH, ::serde_urlencoded::to_string(req)? ); #[cfg(feature = "hyper")] { url.parse::().map_err(From::from).and_then(move |url| { let builder = http::Request::builder().method(http::Method::POST).uri(url); let req = if let Some(form) = form { form.set_body_convert::(builder) } else { builder.body(hyper::Body::empty()) }; req.map_err(From::from) }) } #[cfg(feature = "actix")] { let req = if let Some(form) = form { self.client .post(url) .timeout(ACTIX_REQUEST_TIMEOUT) .content_type(form.content_type()) .send_body(multipart::Body::from(form)) } else { self.client.post(url).timeout(ACTIX_REQUEST_TIMEOUT).send() }; Ok(req) } } /// Builds an Api error from a response body. /// #[inline] fn process_error_from_body(body: Bytes) -> Error { match serde_json::from_slice(&body) { Ok(e) => Error::Api(e), Err(_) => match String::from_utf8(body.to_vec()) { Ok(s) => Error::Uncategorized(s), Err(e) => e.into(), }, } } /// Processes a response that expects a json encoded body, returning an /// error or a deserialized json response. /// fn process_json_response(status: StatusCode, body: Bytes) -> Result where for<'de> Res: 'static + Deserialize<'de>, { match status { StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), _ => Err(Self::process_error_from_body(body)), } } /// Processes a response that returns a stream of json deserializable /// results. /// fn process_stream_response( res: Response, decoder: D, ) -> impl Stream> where D: Decoder + Send, { #[cfg(feature = "hyper")] { FramedRead::new(StreamReader::new(res.into_body()), decoder) } #[cfg(feature = "actix")] { FramedRead::new(StreamReader::new(res), decoder) } } /// Generates a request, and returns the unprocessed response future. /// async fn request_raw( &self, req: Req, form: Option>, ) -> Result<(StatusCode, Bytes), Error> where Req: ApiRequest + Serialize, { let req = self.build_base_request(req, form)?; #[cfg(feature = "hyper")] { let res = self.client.request(req).await?; let status = res.status(); let body = body::to_bytes(res.into_body()).await?; Ok((status, body)) } #[cfg(feature = "actix")] { let mut res = req.await?; let status = res.status(); let body = res.body().await?; Ok((status, body)) } } /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// async fn request( &self, req: Req, form: Option>, ) -> Result where Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, { let (status, chunk) = self.request_raw(req, form).await?; IpfsClient::process_json_response(status, chunk) } /// Generic method for making a request to the Ipfs server, and getting /// back a response with no body. /// async fn request_empty( &self, req: Req, form: Option>, ) -> Result<(), Error> where Req: ApiRequest + Serialize, { let (status, chunk) = self.request_raw(req, form).await?; match status { StatusCode::OK => Ok(()), _ => Err(Self::process_error_from_body(chunk)), } } /// Generic method for making a request to the Ipfs server, and getting /// back a raw String response. /// async fn request_string( &self, req: Req, form: Option>, ) -> Result where Req: ApiRequest + Serialize, { let (status, chunk) = self.request_raw(req, form).await?; match status { StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), _ => Err(Self::process_error_from_body(chunk)), } } } impl IpfsClient { /// Generic method for making a request that expects back a streaming /// response. /// fn request_stream( &self, req: Request, process: F, ) -> impl Stream> where OutStream: Stream>, F: 'static + Fn(Response) -> OutStream, { #[cfg(feature = "hyper")] { self.client .request(req) .err_into() .map_ok(move |res| { match res.status() { StatusCode::OK => process(res).right_stream(), // If the server responded with an error status code, the body // still needs to be read so an error can be built. This block will // read the entire body stream, then immediately return an error. // _ => body::to_bytes(res.into_body()) .boxed() .map(|maybe_body| match maybe_body { Ok(body) => Err(Self::process_error_from_body(body)), Err(e) => Err(e.into()), }) .into_stream() .left_stream(), } }) .try_flatten_stream() } #[cfg(feature = "actix")] { req.err_into() .map_ok(move |mut res| { match res.status() { StatusCode::OK => process(res).right_stream(), // If the server responded with an error status code, the body // still needs to be read so an error can be built. This block will // read the entire body stream, then immediately return an error. // _ => res .body() .map(|maybe_body| match maybe_body { Ok(body) => Err(Self::process_error_from_body(body)), Err(e) => Err(e.into()), }) .into_stream() .left_stream(), } }) .try_flatten_stream() } } /// Generic method for making a request to the Ipfs server, and getting /// back a raw stream of bytes. /// fn request_stream_bytes(&self, req: Request) -> impl Stream> { #[cfg(feature = "hyper")] { self.request_stream(req, |res| res.into_body().err_into()) } #[cfg(feature = "actix")] { self.request_stream(req, |res| res.err_into()) } } /// Generic method to return a streaming response of deserialized json /// objects delineated by new line separators. /// fn request_stream_json(&self, req: Request) -> impl Stream> where for<'de> Res: 'static + Deserialize<'de> + Send, { self.request_stream(req, |res| { let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { // Response has the Trailer header set. The StreamError trailer // is used to indicate that there was an error while streaming // data with Ipfs. // if trailer == "X-Stream-Error" { true } else { let err = Error::UnrecognizedTrailerHeader( String::from_utf8_lossy(trailer.as_ref()).into(), ); // There was an unrecognized trailer value. If that is the case, // create a stream that immediately errors. // return future::err(err).into_stream().left_stream(); } } else { false }; IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) .right_stream() }) } } // Implements a call to the IPFS that returns a streaming body response. // Implementing this in a macro is necessary because the Rust compiler // can't reason about the lifetime of the request instance properly. It // thinks that the request needs to live as long as the returned stream, // but in reality, the request instance is only used to build the Hyper // or Actix request. // macro_rules! impl_stream_api_response { (($self:ident, $req:expr, $form:expr) => $call:ident) => { impl_stream_api_response! { ($self, $req, $form) |req| => { $self.$call(req) } } }; (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => { match $self.build_base_request($req, $form) { Ok($var) => $impl.right_stream(), Err(e) => return future::err(e).into_stream().left_stream(), } }; } impl IpfsClient { /// Add file to Ipfs. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// use std::io::Cursor; /// /// let client = IpfsClient::default(); /// let data = Cursor::new("Hello World!"); /// let res = client.add(data); /// ``` /// #[inline] pub async fn add(&self, data: R) -> Result where R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("path", data); self.request(request::Add, Some(form)).await } /// Add a path to Ipfs. Can be a file or directory. /// A hard limit of 128 open file descriptors is set such /// that any small additional files are stored in-memory. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let path = "./src"; /// let res = client.add_path(path); /// ``` /// #[inline] pub async fn add_path

(&self, path: P) -> Result, Error> where P: AsRef, { let prefix = path.as_ref().parent(); let mut paths_to_add: Vec<(PathBuf, u64)> = vec![]; for path in walkdir::WalkDir::new(path.as_ref()) { match path { Ok(entry) if entry.file_type().is_file() => { if entry.file_type().is_file() { let file_size = entry .metadata() .map(|metadata| metadata.len()) .map_err(|e| Error::Io(e.into()))?; paths_to_add.push((entry.path().to_path_buf(), file_size)); } } Ok(_) => (), Err(err) => return Err(Error::Io(err.into())), } } paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse()); let mut it = 0; let mut form = multipart::Form::default(); for (path, file_size) in paths_to_add { let mut file = File::open(&path)?; let file_name = match prefix { Some(prefix) => path.strip_prefix(prefix).unwrap(), None => path.as_path(), } .to_string_lossy(); if it < FILE_DESCRIPTOR_LIMIT { form.add_reader_file("path", file, file_name); it += 1; } else { let mut buf = Vec::with_capacity(file_size as usize); let _ = file.read_to_end(&mut buf)?; form.add_reader_file("path", Cursor::new(buf), file_name); } } let req = self.build_base_request(request::Add, Some(form))?; self.request_stream_json(req).try_collect().await } /// Returns the current ledger for a peer. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"); /// ``` /// #[inline] pub async fn bitswap_ledger( &self, peer: &str, ) -> Result { self.request(request::BitswapLedger { peer }, None).await } /// Triggers a reprovide. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bitswap_reprovide(); /// ``` /// #[inline] pub async fn bitswap_reprovide(&self) -> Result { self.request_empty(request::BitswapReprovide, None).await } /// Returns some stats about the bitswap agent. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bitswap_stat(); /// ``` /// #[inline] pub async fn bitswap_stat(&self) -> Result { self.request(request::BitswapStat, None).await } /// Remove a given block from your wantlist. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] pub async fn bitswap_unwant( &self, key: &str, ) -> Result { self.request_empty(request::BitswapUnwant { key }, None) .await } /// Shows blocks on the wantlist for you or the specified peer. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bitswap_wantlist( /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ") /// ); /// ``` /// #[inline] pub async fn bitswap_wantlist( &self, peer: Option<&str>, ) -> Result { self.request(request::BitswapWantlist { peer }, None).await } /// Gets a raw IPFS block. /// /// # Examples /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; /// let res = client /// .block_get(hash) /// .map_ok(|chunk| chunk.to_vec()) /// .try_concat(); /// ``` /// #[inline] pub fn block_get(&self, hash: &str) -> impl Stream> { impl_stream_api_response! { (self, request::BlockGet { hash }, None) => request_stream_bytes } } /// Store input as an IPFS block. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// use std::io::Cursor; /// /// let client = IpfsClient::default(); /// let data = Cursor::new("Hello World!"); /// let res = client.block_put(data); /// ``` /// #[inline] pub async fn block_put(&self, data: R) -> Result where R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("data", data); self.request(request::BlockPut, Some(form)).await } /// Removes an IPFS block. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] pub async fn block_rm(&self, hash: &str) -> Result { self.request(request::BlockRm { hash }, None).await } /// Prints information about a raw IPFS block. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] pub async fn block_stat(&self, hash: &str) -> Result { self.request(request::BlockStat { hash }, None).await } /// Add default peers to the bootstrap list. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bootstrap_add_default(); /// ``` /// #[inline] pub async fn bootstrap_add_default( &self, ) -> Result { self.request(request::BootstrapAddDefault, None).await } /// Lists peers in bootstrap list. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bootstrap_list(); /// ``` /// #[inline] pub async fn bootstrap_list(&self) -> Result { self.request(request::BootstrapList, None).await } /// Removes all peers in bootstrap list. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.bootstrap_rm_all(); /// ``` /// #[inline] pub async fn bootstrap_rm_all(&self) -> Result { self.request(request::BootstrapRmAll, None).await } /// Returns the contents of an Ipfs object. /// /// # Examples /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; /// let res = client /// .cat(hash) /// .map_ok(|chunk| chunk.to_vec()) /// .try_concat(); /// ``` /// #[inline] pub fn cat(&self, path: &str) -> impl Stream> { impl_stream_api_response! { (self, request::Cat { path }, None) => request_stream_bytes } } /// List available commands that the server accepts. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.commands(); /// ``` /// #[inline] pub async fn commands(&self) -> Result { self.request(request::Commands, None).await } /// Opens the config file for editing (on the server). /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.config_edit(); /// ``` /// #[inline] pub async fn config_edit(&self) -> Result { self.request(request::ConfigEdit, None).await } /// Replace the config file. /// /// ```no_run /// use ipfs_api::IpfsClient; /// use std::io::Cursor; /// /// let client = IpfsClient::default(); /// let config = Cursor::new("{..json..}"); /// let res = client.config_replace(config); /// ``` /// #[inline] pub async fn config_replace(&self, data: R) -> Result where R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("file", data); self.request_empty(request::ConfigReplace, Some(form)).await } /// Show the current config of the server. /// /// Returns an unparsed json string, due to an unclear spec. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.config_show(); /// ``` /// #[inline] pub async fn config_show(&self) -> Result { self.request_string(request::ConfigShow, None).await } /// Returns information about a dag node in Ipfs. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.dag_get("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); /// ``` /// #[inline] pub async fn dag_get(&self, path: &str) -> Result { self.request(request::DagGet { path }, None).await } // TODO /dag routes are experimental, and there isn't a whole lot of // documentation available for how this route works. // // /// Add a DAG node to Ipfs. // /// // #[inline] // pub fn dag_put(&self, data: R) -> AsyncResponse // where // R: 'static + Read + Send, // { // let mut form = multipart::Form::default(); // // form.add_reader("arg", data); // // self.request(&request::DagPut, Some(form)) // } // TODO /dag/resolve /// Query the DHT for all of the multiaddresses associated with a Peer ID. /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; /// let res = client.dht_findpeer(peer).try_collect::>(); /// ``` /// #[inline] pub fn dht_findpeer( &self, peer: &str, ) -> impl Stream> { impl_stream_api_response! { (self, request::DhtFindPeer { peer }, None) => request_stream_json } } /// Find peers in the DHT that can provide a specific value given a key. /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; /// let res = client.dht_findprovs(key).try_collect::>(); /// ``` /// #[inline] pub fn dht_findprovs( &self, key: &str, ) -> impl Stream> { impl_stream_api_response! { (self, request::DhtFindProvs { key }, None) => request_stream_json } } /// Query the DHT for a given key. /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; /// let res = client.dht_get(key).try_collect::>(); /// ``` /// #[inline] pub fn dht_get( &self, key: &str, ) -> impl Stream> { impl_stream_api_response! { (self, request::DhtGet { key }, None) => request_stream_json } } /// Announce to the network that you are providing a given value. /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; /// let res = client.dht_provide(key).try_collect::>(); /// ``` /// #[inline] pub fn dht_provide( &self, key: &str, ) -> impl Stream> { impl_stream_api_response! { (self, request::DhtProvide { key }, None) => request_stream_json } } /// Write a key/value pair to the DHT. /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.dht_put("test", "Hello World!").try_collect::>(); /// ``` /// #[inline] pub fn dht_put( &self, key: &str, value: &str, ) -> impl Stream> { impl_stream_api_response! { (self, request::DhtPut { key, value }, None) => request_stream_json } } /// Find the closest peer given the peer ID by querying the DHT. /// /// ```no_run /// use futures::TryStreamExt; /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; /// let res = client.dht_query(peer).try_collect::>(); /// ``` /// #[inline] pub fn dht_query( &self, peer: &str, ) -> impl Stream> { impl_stream_api_response! { (self, request::DhtQuery { peer }, None) => request_stream_json } } /// Clear inactive requests from the log. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.diag_cmds_clear(); /// ``` /// #[inline] pub async fn diag_cmds_clear(&self) -> Result { self.request_empty(request::DiagCmdsClear, None).await } /// Set how long to keep inactive requests in the log. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.diag_cmds_set_time("1m"); /// ``` /// #[inline] pub async fn diag_cmds_set_time( &self, time: &str, ) -> Result { self.request_empty(request::DiagCmdsSetTime { time }, None) .await } /// Print system diagnostic information. /// /// Note: There isn't good documentation on what this call is supposed to return. /// It might be platform dependent, but if it isn't, this can be fixed to return /// an actual object. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.diag_sys(); /// ``` /// #[inline] pub async fn diag_sys(&self) -> Result { self.request_string(request::DiagSys, None).await } /// Resolve DNS link. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.dns("ipfs.io", true); /// ``` /// #[inline] pub async fn dns(&self, link: &str, recursive: bool) -> Result { self.request(request::Dns { link, recursive }, None).await } /// List directory for Unix filesystem objects. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.file_ls("/ipns/ipfs.io"); /// ``` /// #[inline] pub async fn file_ls(&self, path: &str) -> Result { self.request(request::FileLs { path }, None).await } /// Copy files into MFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_cp("/path/to/file", "/dest"); /// ``` /// #[inline] pub async fn files_cp( &self, path: &str, dest: &str, ) -> Result { self.request_empty(request::FilesCp { path, dest }, None) .await } /// Flush a path's data to disk. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_flush(None); /// let res = client.files_flush(Some("/tmp")); /// ``` /// #[inline] pub async fn files_flush( &self, path: Option<&str>, ) -> Result { self.request_empty(request::FilesFlush { path }, None).await } /// List directories in MFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_ls(None); /// let res = client.files_ls(Some("/tmp")); /// ``` /// #[inline] pub async fn files_ls(&self, path: Option<&str>) -> Result { self.request(request::FilesLs { path }, None).await } /// Make directories in MFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_mkdir("/test", false); /// let res = client.files_mkdir("/test/nested/dir", true); /// ``` /// #[inline] pub async fn files_mkdir( &self, path: &str, parents: bool, ) -> Result { self.request_empty(request::FilesMkdir { path, parents }, None) .await } /// Copy files into MFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_mv("/test/tmp.json", "/test/file.json"); /// ``` /// #[inline] pub async fn files_mv( &self, path: &str, dest: &str, ) -> Result { self.request_empty(request::FilesMv { path, dest }, None) .await } /// Read a file in MFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_read("/test/file.json"); /// ``` /// #[inline] pub fn files_read(&self, path: &str) -> impl Stream> { impl_stream_api_response! { (self, request::FilesRead { path }, None) => request_stream_bytes } } /// Remove a file in MFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_rm("/test/dir", true); /// let res = client.files_rm("/test/file.json", false); /// ``` /// #[inline] pub async fn files_rm( &self, path: &str, recursive: bool, ) -> Result { self.request_empty(request::FilesRm { path, recursive }, None) .await } /// Display a file's status in MDFS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.files_stat("/test/file.json"); /// ``` /// #[inline] pub async fn files_stat(&self, path: &str) -> Result { self.request(request::FilesStat { path }, None).await } /// Write to a mutable file in the filesystem. /// /// ```no_run /// use ipfs_api::IpfsClient; /// use std::fs::File; /// /// let client = IpfsClient::default(); /// let file = File::open("test.json").unwrap(); /// let res = client.files_write("/test/file.json", true, true, file); /// ``` /// #[inline] pub async fn files_write( &self, path: &str, create: bool, truncate: bool, data: R, ) -> Result where R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("data", data); self.request_empty( request::FilesWrite { path, create, truncate, }, Some(form), ) .await } /// List blocks that are both in the filestore and standard block storage. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.filestore_dups(); /// ``` /// #[inline] pub fn filestore_dups( &self, ) -> impl Stream> { impl_stream_api_response! { (self, request::FilestoreDups, None) => request_stream_json } } /// List objects in filestore. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.filestore_ls( /// Some("QmYPP3BovR2m8UqCZxFbdXSit6SKgExxDkFAPLqiGsap4X") /// ); /// ``` /// #[inline] pub fn filestore_ls( &self, cid: Option<&str>, ) -> impl Stream> { impl_stream_api_response! { (self, request::FilestoreLs { cid }, None) => request_stream_json } } /// Verify objects in filestore. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.filestore_verify(None); /// ``` /// #[inline] pub fn filestore_verify( &self, cid: Option<&str>, ) -> impl Stream> { impl_stream_api_response! { (self, request::FilestoreVerify{ cid }, None) => request_stream_json } } /// Download Ipfs object. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.get("/test/file.json"); /// ``` /// #[inline] pub fn get(&self, path: &str) -> impl Stream> { impl_stream_api_response! { (self, request::Get { path }, None) => request_stream_bytes } } /// Returns information about a peer. /// /// If `peer` is `None`, returns information about you. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.id(None); /// let res = client.id(Some("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM")); /// ``` /// #[inline] pub async fn id(&self, peer: Option<&str>) -> Result { self.request(request::Id { peer }, None).await } /// Create a new keypair. /// /// ```no_run /// use ipfs_api::{IpfsClient, KeyType}; /// /// let client = IpfsClient::default(); /// let res = client.key_gen("test", KeyType::Rsa, 64); /// ``` /// #[inline] pub async fn key_gen( &self, name: &str, kind: request::KeyType, size: i32, ) -> Result { self.request(request::KeyGen { name, kind, size }, None) .await } /// List all local keypairs. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.key_list(); /// ``` /// #[inline] pub async fn key_list(&self) -> Result { self.request(request::KeyList, None).await } /// Rename a keypair. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.key_rename("key_0", "new_name", false); /// ``` /// #[inline] pub async fn key_rename( &self, name: &str, new: &str, force: bool, ) -> Result { self.request(request::KeyRename { name, new, force }, None) .await } /// Remove a keypair. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.key_rm("key_0"); /// ``` /// #[inline] pub async fn key_rm(&self, name: &str) -> Result { self.request(request::KeyRm { name }, None).await } /// Change the logging level for a logger. /// /// ```no_run /// use ipfs_api::{IpfsClient, Logger, LoggingLevel}; /// use std::borrow::Cow; /// /// let client = IpfsClient::default(); /// let res = client.log_level(Logger::All, LoggingLevel::Debug); /// let res = client.log_level( /// Logger::Specific(Cow::Borrowed("web")), /// LoggingLevel::Warning /// ); /// ``` /// #[inline] pub async fn log_level( &self, logger: request::Logger<'_>, level: request::LoggingLevel, ) -> Result { self.request(request::LogLevel { logger, level }, None) .await } /// List all logging subsystems. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.log_ls(); /// ``` /// #[inline] pub async fn log_ls(&self) -> Result { self.request(request::LogLs, None).await } /// Read the event log. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.log_tail(); /// ``` /// pub fn log_tail(&self) -> impl Stream> { impl_stream_api_response! { (self, request::LogTail, None) |req| => { self.request_stream(req, |res| { IpfsClient::process_stream_response(res, LineDecoder) }) } } } /// List the contents of an Ipfs multihash. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.ls(None); /// let res = client.ls(Some("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY")); /// ``` /// #[inline] pub async fn ls(&self, path: Option<&str>) -> Result { self.request(request::Ls { path }, None).await } // TODO /mount /// Publish an IPFS path to IPNS. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.name_publish( /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", /// false, /// Some("12h"), /// None, /// None /// ); /// ``` /// pub async fn name_publish( &self, path: &str, resolve: bool, lifetime: Option<&str>, ttl: Option<&str>, key: Option<&str>, ) -> Result { self.request( request::NamePublish { path, resolve, lifetime, ttl, key, }, None, ) .await } /// Resolve an IPNS name. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.name_resolve( /// Some("/ipns/ipfs.io"), /// true, /// false /// ); /// ``` /// pub async fn name_resolve( &self, name: Option<&str>, recursive: bool, nocache: bool, ) -> Result { self.request( request::NameResolve { name, recursive, nocache, }, None, ) .await } /// Output the raw bytes of an Ipfs object. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.object_data("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); /// ``` /// #[inline] pub fn object_data(&self, key: &str) -> impl Stream> { impl_stream_api_response! { (self, request::ObjectData { key }, None) => request_stream_bytes } } /// Returns the diff of two Ipfs objects. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.object_diff( /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", /// "/ipfs/QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA" /// ); /// ``` /// #[inline] pub async fn object_diff( &self, key0: &str, key1: &str, ) -> Result { self.request(request::ObjectDiff { key0, key1 }, None).await } /// Returns the data in an object. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.object_get("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); /// ``` /// #[inline] pub async fn object_get(&self, key: &str) -> Result { self.request(request::ObjectGet { key }, None).await } /// Returns the links that an object points to. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.object_links("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); /// ``` /// #[inline] pub async fn object_links(&self, key: &str) -> Result { self.request(request::ObjectLinks { key }, None).await } /// Create a new object. /// /// ```no_run /// use ipfs_api::{IpfsClient, ObjectTemplate}; /// /// let client = IpfsClient::default(); /// let res = client.object_new(None); /// let res = client.object_new(Some(ObjectTemplate::UnixFsDir)); /// ``` /// #[inline] pub async fn object_new( &self, template: Option, ) -> Result { self.request(request::ObjectNew { template }, None).await } // TODO /object/patch/add-link // TODO /object/patch/append-data // TODO /object/patch/rm-link // TODO /object/patch/set-data // TODO /object/put /// Returns the stats for an object. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.object_stat("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); /// ``` /// #[inline] pub async fn object_stat(&self, key: &str) -> Result { self.request(request::ObjectStat { key }, None).await } // TODO /p2p/listener/close // TODO /p2p/listener/ls // TODO /p2p/listener/open // TODO /p2p/stream/close // TODO /p2p/stream/dial // TODO /p2p/stream/ls /// Pins a new object. /// /// The "recursive" option tells the server whether to /// pin just the top-level object, or all sub-objects /// it depends on. For most cases you want it to be `true`. /// /// Does not yet implement the "progress" agument because /// reading it is kinda squirrelly. /// /// # Examples /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pin_add("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", true); /// ``` #[inline] pub async fn pin_add( &self, key: &str, recursive: bool, ) -> Result { self.request( request::PinAdd { key, recursive: Some(recursive), progress: false, }, None, ) .await } /// Returns a list of pinned objects in local storage. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pin_ls(None, None); /// let res = client.pin_ls( /// Some("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"), /// None /// ); /// let res = client.pin_ls(None, Some("direct")); /// ``` /// #[inline] pub async fn pin_ls( &self, key: Option<&str>, typ: Option<&str>, ) -> Result { self.request(request::PinLs { key, typ }, None).await } /// Removes a pinned object from local storage. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pin_rm( /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", /// false /// ); /// let res = client.pin_rm( /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", /// true /// ); /// ``` /// #[inline] pub async fn pin_rm( &self, key: &str, recursive: bool, ) -> Result { self.request(request::PinRm { key, recursive }, None).await } // TODO /pin/update // TODO /pin/verify /// Pings a peer. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.ping("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", None); /// let res = client.ping("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", Some(15)); /// ``` /// #[inline] pub fn ping( &self, peer: &str, count: Option, ) -> impl Stream> { impl_stream_api_response! { (self, request::Ping { peer, count }, None) => request_stream_json } } /// List subscribed pubsub topics. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pubsub_ls(); /// ``` /// #[inline] pub async fn pubsub_ls(&self) -> Result { self.request(request::PubsubLs, None).await } /// List peers that are being published to. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pubsub_peers(None); /// let res = client.pubsub_peers(Some("feed")); /// ``` /// #[inline] pub async fn pubsub_peers( &self, topic: Option<&str>, ) -> Result { self.request(request::PubsubPeers { topic }, None).await } /// Publish a message to a topic. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pubsub_pub("feed", "Hello World!"); /// ``` /// #[inline] pub async fn pubsub_pub( &self, topic: &str, payload: &str, ) -> Result { self.request_empty(request::PubsubPub { topic, payload }, None) .await } /// Subscribes to a pubsub topic. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.pubsub_sub("feed", false); /// let res = client.pubsub_sub("feed", true); /// ``` /// #[inline] pub fn pubsub_sub( &self, topic: &str, discover: bool, ) -> impl Stream> { impl_stream_api_response! { (self, request::PubsubSub { topic, discover }, None) => request_stream_json } } /// Gets a list of local references. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.refs_local(); /// ``` /// #[inline] pub fn refs_local(&self) -> impl Stream> { impl_stream_api_response! { (self, request::RefsLocal, None) => request_stream_json } } // TODO /repo/fsck // TODO /repo/gc // TODO /repo/stat // TODO /repo/verify // TODO /repo/version // TODO /resolve /// Shutdown the Ipfs daemon. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.shutdown(); /// ``` /// pub async fn shutdown(&self) -> Result { self.request_empty(request::Shutdown, None).await } /// Returns bitswap stats. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.stats_bitswap(); /// ``` /// #[inline] pub async fn stats_bitswap(&self) -> Result { self.request(request::StatsBitswap, None).await } /// Returns bandwidth stats. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.stats_bw(); /// ``` /// #[inline] pub async fn stats_bw(&self) -> Result { self.request(request::StatsBw, None).await } /// Returns repo stats. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.stats_repo(); /// ``` /// #[inline] pub async fn stats_repo(&self) -> Result { self.request(request::StatsRepo, None).await } // TODO /swarm/addrs/listen /// Return a list of local addresses. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.swarm_addrs_local(); /// ``` /// #[inline] pub async fn swarm_addrs_local(&self) -> Result { self.request(request::SwarmAddrsLocal, None).await } // TODO /swarm/connect // TODO /swarm/disconnect // TODO /swarm/filters/add // TODO /swarm/filters/rm /// Return a list of peers with open connections. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.swarm_peers(); /// ``` /// #[inline] pub async fn swarm_peers(&self) -> Result { self.request(request::SwarmPeers, None).await } /// Add a tar file to Ipfs. /// /// Note: `data` should already be a tar file. If it isn't the Api will return /// an error. /// /// ```no_run /// use ipfs_api::IpfsClient; /// use std::fs::File; /// /// let client = IpfsClient::default(); /// let tar = File::open("/path/to/file.tar").unwrap(); /// let res = client.tar_add(tar); /// ``` /// #[inline] pub async fn tar_add(&self, data: R) -> Result where R: 'static + Read + Send + Sync, { let mut form = multipart::Form::default(); form.add_reader("file", data); self.request(request::TarAdd, Some(form)).await } /// Export a tar file from Ipfs. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.tar_cat("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); /// ``` /// #[inline] pub fn tar_cat(&self, path: &str) -> impl Stream> { impl_stream_api_response! { (self, request::TarCat { path }, None) => request_stream_bytes } } /// Returns information about the Ipfs server version. /// /// ```no_run /// use ipfs_api::IpfsClient; /// /// let client = IpfsClient::default(); /// let res = client.version(); /// ``` /// #[inline] pub async fn version(&self) -> Result { self.request(request::Version, None).await } }