From 1faee58a3c7d1376d220a7d44824d74043625f99 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Sat, 16 May 2020 13:29:45 -0400 Subject: move client to separate module --- ipfs-api/src/client.rs | 2037 -------------------------------------------- ipfs-api/src/client/mod.rs | 2037 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 2037 insertions(+), 2037 deletions(-) delete mode 100644 ipfs-api/src/client.rs create mode 100644 ipfs-api/src/client/mod.rs diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs deleted file mode 100644 index 0cd7796..0000000 --- a/ipfs-api/src/client.rs +++ /dev/null @@ -1,2037 +0,0 @@ -// Copyright 2017 rust-ipfs-api Developers -// -// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be -// copied, modified, or distributed except according to those terms. -// -use crate::{ - header::TRAILER, - read::{JsonLineDecoder, LineDecoder, StreamReader}, - request::{self, ApiRequest}, - response::{self, Error}, - Client, Request, Response, -}; -#[cfg(feature = "actix")] -use actix_multipart::client::multipart; -use bytes::Bytes; -use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; -use http::{ - uri::{InvalidUri, Uri}, - StatusCode, -}; -#[cfg(feature = "hyper")] -use hyper::{body, client::Builder}; -#[cfg(feature = "hyper")] -use hyper_multipart::client::multipart; -#[cfg(feature = "hyper")] -use hyper_tls::HttpsConnector; -use parity_multiaddr::Protocol; -use serde::{Deserialize, Serialize}; -use serde_json; -#[cfg(feature = "actix")] -use std::time::Duration; -use std::{ - fs::{self, File}, - io::{Cursor, Read}, - net::{IpAddr, SocketAddr}, - path::{Path, PathBuf}, -}; -use tokio_util::codec::{Decoder, FramedRead}; - -const FILE_DESCRIPTOR_LIMIT: usize = 127; - -#[cfg(feature = "actix")] -const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90); - -/// Asynchronous Ipfs client. -/// -#[derive(Clone)] -pub struct IpfsClient { - base: Uri, - client: Client, -} - -impl Default for IpfsClient { - /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api. - /// If not found, tries to connect to `localhost:5001`. - /// - fn default() -> IpfsClient { - dirs::home_dir() - .map(|home_dir| home_dir.join(".ipfs").join("api")) - .and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok()) - .and_then(|multiaddr_str| parity_multiaddr::from_url(&multiaddr_str).ok()) - .and_then(|multiaddr| { - let mut addr: Option = None; - let mut port: Option = None; - for addr_component in multiaddr.iter() { - match addr_component { - Protocol::Ip4(v4addr) => addr = Some(v4addr.into()), - Protocol::Ip6(v6addr) => addr = Some(v6addr.into()), - Protocol::Tcp(tcpport) => port = Some(tcpport), - _ => { - return None; - } - } - } - if let (Some(addr), Some(port)) = (addr, port) { - Some(SocketAddr::new(addr, port)) - } else { - None - } - }) - .map(IpfsClient::from) - .unwrap_or_else(|| IpfsClient::new("localhost", 5001).unwrap()) - } -} - -impl From for IpfsClient { - fn from(socket_addr: SocketAddr) -> Self { - IpfsClient::new(&socket_addr.ip().to_string(), socket_addr.port()).unwrap() - } -} - -impl IpfsClient { - /// Creates a new `IpfsClient`. - /// - #[inline] - pub fn new(host: &str, port: u16) -> Result { - Self::new_from_uri(format!("http://{}:{}", host, port).as_str()) - } - - /// Creates a new `IpfsClient` for any given URI. - /// - #[inline] - pub fn new_from_uri(uri: &str) -> Result { - let base_path = IpfsClient::build_base_path(uri)?; - let client = { - #[cfg(feature = "hyper")] - { - Builder::default() - .pool_max_idle_per_host(0) - .build(HttpsConnector::new()) - } - #[cfg(feature = "actix")] - { - Client::default() - } - }; - - Ok(IpfsClient { - base: base_path, - client, - }) - } - - /// Builds the base url path for the Ipfs api. - /// - fn build_base_path(uri: &str) -> Result { - format!("{}/api/v0", uri).parse() - } - - /// Builds the url for an api call. - /// - fn build_base_request( - &self, - req: Req, - form: Option>, - ) -> Result - where - Req: ApiRequest + Serialize, - { - let url = format!( - "{}{}?{}", - self.base, - Req::PATH, - ::serde_urlencoded::to_string(req)? - ); - - #[cfg(feature = "hyper")] - { - url.parse::().map_err(From::from).and_then(move |url| { - let builder = http::Request::builder().method(http::Method::POST).uri(url); - - let req = if let Some(form) = form { - form.set_body_convert::(builder) - } else { - builder.body(hyper::Body::empty()) - }; - - req.map_err(From::from) - }) - } - #[cfg(feature = "actix")] - { - let req = if let Some(form) = form { - self.client - .post(url) - .timeout(ACTIX_REQUEST_TIMEOUT) - .content_type(form.content_type()) - .send_body(multipart::Body::from(form)) - } else { - self.client.post(url).timeout(ACTIX_REQUEST_TIMEOUT).send() - }; - - Ok(req) - } - } - - /// Builds an Api error from a response body. - /// - #[inline] - fn process_error_from_body(body: Bytes) -> Error { - match serde_json::from_slice(&body) { - Ok(e) => Error::Api(e), - Err(_) => match String::from_utf8(body.to_vec()) { - Ok(s) => Error::Uncategorized(s), - Err(e) => e.into(), - }, - } - } - - /// Processes a response that expects a json encoded body, returning an - /// error or a deserialized json response. - /// - fn process_json_response(status: StatusCode, body: Bytes) -> Result - where - for<'de> Res: 'static + Deserialize<'de>, - { - match status { - StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), - _ => Err(Self::process_error_from_body(body)), - } - } - - /// Processes a response that returns a stream of json deserializable - /// results. - /// - fn process_stream_response( - res: Response, - decoder: D, - ) -> impl Stream> - where - D: Decoder + Send, - { - #[cfg(feature = "hyper")] - { - FramedRead::new(StreamReader::new(res.into_body()), decoder) - } - #[cfg(feature = "actix")] - { - FramedRead::new(StreamReader::new(res), decoder) - } - } - - /// Generates a request, and returns the unprocessed response future. - /// - async fn request_raw( - &self, - req: Req, - form: Option>, - ) -> Result<(StatusCode, Bytes), Error> - where - Req: ApiRequest + Serialize, - { - let req = self.build_base_request(req, form)?; - - #[cfg(feature = "hyper")] - { - let res = self.client.request(req).await?; - let status = res.status(); - let body = body::to_bytes(res.into_body()).await?; - - Ok((status, body)) - } - #[cfg(feature = "actix")] - { - let mut res = req.await?; - let status = res.status(); - let body = res.body().await?; - - Ok((status, body)) - } - } - - /// Generic method for making a request to the Ipfs server, and getting - /// a deserializable response. - /// - async fn request( - &self, - req: Req, - form: Option>, - ) -> Result - where - Req: ApiRequest + Serialize, - for<'de> Res: 'static + Deserialize<'de>, - { - let (status, chunk) = self.request_raw(req, form).await?; - - IpfsClient::process_json_response(status, chunk) - } - - /// Generic method for making a request to the Ipfs server, and getting - /// back a response with no body. - /// - async fn request_empty( - &self, - req: Req, - form: Option>, - ) -> Result<(), Error> - where - Req: ApiRequest + Serialize, - { - let (status, chunk) = self.request_raw(req, form).await?; - - match status { - StatusCode::OK => Ok(()), - _ => Err(Self::process_error_from_body(chunk)), - } - } - - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw String response. - /// - async fn request_string( - &self, - req: Req, - form: Option>, - ) -> Result - where - Req: ApiRequest + Serialize, - { - let (status, chunk) = self.request_raw(req, form).await?; - - match status { - StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from), - _ => Err(Self::process_error_from_body(chunk)), - } - } -} - -impl IpfsClient { - /// Generic method for making a request that expects back a streaming - /// response. - /// - fn request_stream( - &self, - req: Request, - process: F, - ) -> impl Stream> - where - OutStream: Stream>, - F: 'static + Fn(Response) -> OutStream, - { - #[cfg(feature = "hyper")] - { - self.client - .request(req) - .err_into() - .map_ok(move |res| { - match res.status() { - StatusCode::OK => process(res).right_stream(), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => body::to_bytes(res.into_body()) - .boxed() - .map(|maybe_body| match maybe_body { - Ok(body) => Err(Self::process_error_from_body(body)), - Err(e) => Err(e.into()), - }) - .into_stream() - .left_stream(), - } - }) - .try_flatten_stream() - } - #[cfg(feature = "actix")] - { - req.err_into() - .map_ok(move |mut res| { - match res.status() { - StatusCode::OK => process(res).right_stream(), - // If the server responded with an error status code, the body - // still needs to be read so an error can be built. This block will - // read the entire body stream, then immediately return an error. - // - _ => res - .body() - .map(|maybe_body| match maybe_body { - Ok(body) => Err(Self::process_error_from_body(body)), - Err(e) => Err(e.into()), - }) - .into_stream() - .left_stream(), - } - }) - .try_flatten_stream() - } - } - - /// Generic method for making a request to the Ipfs server, and getting - /// back a raw stream of bytes. - /// - fn request_stream_bytes(&self, req: Request) -> impl Stream> { - #[cfg(feature = "hyper")] - { - self.request_stream(req, |res| res.into_body().err_into()) - } - #[cfg(feature = "actix")] - { - self.request_stream(req, |res| res.err_into()) - } - } - - /// Generic method to return a streaming response of deserialized json - /// objects delineated by new line separators. - /// - fn request_stream_json(&self, req: Request) -> impl Stream> - where - for<'de> Res: 'static + Deserialize<'de> + Send, - { - self.request_stream(req, |res| { - let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) { - // Response has the Trailer header set. The StreamError trailer - // is used to indicate that there was an error while streaming - // data with Ipfs. - // - if trailer == "X-Stream-Error" { - true - } else { - let err = Error::UnrecognizedTrailerHeader( - String::from_utf8_lossy(trailer.as_ref()).into(), - ); - - // There was an unrecognized trailer value. If that is the case, - // create a stream that immediately errors. - // - return future::err(err).into_stream().left_stream(); - } - } else { - false - }; - - IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) - .right_stream() - }) - } -} - -// Implements a call to the IPFS that returns a streaming body response. -// Implementing this in a macro is necessary because the Rust compiler -// can't reason about the lifetime of the request instance properly. It -// thinks that the request needs to live as long as the returned stream, -// but in reality, the request instance is only used to build the Hyper -// or Actix request. -// -macro_rules! impl_stream_api_response { - (($self:ident, $req:expr, $form:expr) => $call:ident) => { - impl_stream_api_response! { - ($self, $req, $form) |req| => { $self.$call(req) } - } - }; - (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => { - match $self.build_base_request($req, $form) { - Ok($var) => $impl.right_stream(), - Err(e) => return future::err(e).into_stream().left_stream(), - } - }; -} - -impl IpfsClient { - /// Add file to Ipfs. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// use std::io::Cursor; - /// - /// let client = IpfsClient::default(); - /// let data = Cursor::new("Hello World!"); - /// let res = client.add(data); - /// ``` - /// - #[inline] - pub async fn add(&self, data: R) -> Result - where - R: 'static + Read + Send + Sync, - { - let mut form = multipart::Form::default(); - - form.add_reader("path", data); - - self.request(request::Add, Some(form)).await - } - - /// Add a path to Ipfs. Can be a file or directory. - /// A hard limit of 128 open file descriptors is set such - /// that any small additional files are stored in-memory. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let path = "./src"; - /// let res = client.add_path(path); - /// ``` - /// - #[inline] - pub async fn add_path

(&self, path: P) -> Result, Error> - where - P: AsRef, - { - let prefix = path.as_ref().parent(); - let mut paths_to_add: Vec<(PathBuf, u64)> = vec![]; - - for path in walkdir::WalkDir::new(path.as_ref()) { - match path { - Ok(entry) if entry.file_type().is_file() => { - if entry.file_type().is_file() { - let file_size = entry - .metadata() - .map(|metadata| metadata.len()) - .map_err(|e| Error::Io(e.into()))?; - - paths_to_add.push((entry.path().to_path_buf(), file_size)); - } - } - Ok(_) => (), - Err(err) => return Err(Error::Io(err.into())), - } - } - - paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse()); - - let mut it = 0; - let mut form = multipart::Form::default(); - - for (path, file_size) in paths_to_add { - let mut file = File::open(&path)?; - let file_name = match prefix { - Some(prefix) => path.strip_prefix(prefix).unwrap(), - None => path.as_path(), - } - .to_string_lossy(); - - if it < FILE_DESCRIPTOR_LIMIT { - form.add_reader_file("path", file, file_name); - - it += 1; - } else { - let mut buf = Vec::with_capacity(file_size as usize); - let _ = file.read_to_end(&mut buf)?; - - form.add_reader_file("path", Cursor::new(buf), file_name); - } - } - - let req = self.build_base_request(request::Add, Some(form))?; - - self.request_stream_json(req).try_collect().await - } - - /// Returns the current ledger for a peer. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"); - /// ``` - /// - #[inline] - pub async fn bitswap_ledger( - &self, - peer: &str, - ) -> Result { - self.request(request::BitswapLedger { peer }, None).await - } - - /// Triggers a reprovide. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bitswap_reprovide(); - /// ``` - /// - #[inline] - pub async fn bitswap_reprovide(&self) -> Result { - self.request_empty(request::BitswapReprovide, None).await - } - - /// Returns some stats about the bitswap agent. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bitswap_stat(); - /// ``` - /// - #[inline] - pub async fn bitswap_stat(&self) -> Result { - self.request(request::BitswapStat, None).await - } - - /// Remove a given block from your wantlist. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// ``` - /// - #[inline] - pub async fn bitswap_unwant( - &self, - key: &str, - ) -> Result { - self.request_empty(request::BitswapUnwant { key }, None) - .await - } - - /// Shows blocks on the wantlist for you or the specified peer. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bitswap_wantlist( - /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ") - /// ); - /// ``` - /// - #[inline] - pub async fn bitswap_wantlist( - &self, - peer: Option<&str>, - ) -> Result { - self.request(request::BitswapWantlist { peer }, None).await - } - - /// Gets a raw IPFS block. - /// - /// # Examples - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; - /// let res = client - /// .block_get(hash) - /// .map_ok(|chunk| chunk.to_vec()) - /// .try_concat(); - /// ``` - /// - #[inline] - pub fn block_get(&self, hash: &str) -> impl Stream> { - impl_stream_api_response! { - (self, request::BlockGet { hash }, None) => request_stream_bytes - } - } - - /// Store input as an IPFS block. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// use std::io::Cursor; - /// - /// let client = IpfsClient::default(); - /// let data = Cursor::new("Hello World!"); - /// let res = client.block_put(data); - /// ``` - /// - #[inline] - pub async fn block_put(&self, data: R) -> Result - where - R: 'static + Read + Send + Sync, - { - let mut form = multipart::Form::default(); - - form.add_reader("data", data); - - self.request(request::BlockPut, Some(form)).await - } - - /// Removes an IPFS block. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// ``` - /// - #[inline] - pub async fn block_rm(&self, hash: &str) -> Result { - self.request(request::BlockRm { hash }, None).await - } - - /// Prints information about a raw IPFS block. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// ``` - /// - #[inline] - pub async fn block_stat(&self, hash: &str) -> Result { - self.request(request::BlockStat { hash }, None).await - } - - /// Add default peers to the bootstrap list. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bootstrap_add_default(); - /// ``` - /// - #[inline] - pub async fn bootstrap_add_default( - &self, - ) -> Result { - self.request(request::BootstrapAddDefault, None).await - } - - /// Lists peers in bootstrap list. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bootstrap_list(); - /// ``` - /// - #[inline] - pub async fn bootstrap_list(&self) -> Result { - self.request(request::BootstrapList, None).await - } - - /// Removes all peers in bootstrap list. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.bootstrap_rm_all(); - /// ``` - /// - #[inline] - pub async fn bootstrap_rm_all(&self) -> Result { - self.request(request::BootstrapRmAll, None).await - } - - /// Returns the contents of an Ipfs object. - /// - /// # Examples - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; - /// let res = client - /// .cat(hash) - /// .map_ok(|chunk| chunk.to_vec()) - /// .try_concat(); - /// ``` - /// - #[inline] - pub fn cat(&self, path: &str) -> impl Stream> { - impl_stream_api_response! { - (self, request::Cat { path }, None) => request_stream_bytes - } - } - - /// List available commands that the server accepts. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.commands(); - /// ``` - /// - #[inline] - pub async fn commands(&self) -> Result { - self.request(request::Commands, None).await - } - - /// Opens the config file for editing (on the server). - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.config_edit(); - /// ``` - /// - #[inline] - pub async fn config_edit(&self) -> Result { - self.request(request::ConfigEdit, None).await - } - - /// Replace the config file. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// use std::io::Cursor; - /// - /// let client = IpfsClient::default(); - /// let config = Cursor::new("{..json..}"); - /// let res = client.config_replace(config); - /// ``` - /// - #[inline] - pub async fn config_replace(&self, data: R) -> Result - where - R: 'static + Read + Send + Sync, - { - let mut form = multipart::Form::default(); - - form.add_reader("file", data); - - self.request_empty(request::ConfigReplace, Some(form)).await - } - - /// Show the current config of the server. - /// - /// Returns an unparsed json string, due to an unclear spec. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.config_show(); - /// ``` - /// - #[inline] - pub async fn config_show(&self) -> Result { - self.request_string(request::ConfigShow, None).await - } - - /// Returns information about a dag node in Ipfs. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.dag_get("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); - /// ``` - /// - #[inline] - pub async fn dag_get(&self, path: &str) -> Result { - self.request(request::DagGet { path }, None).await - } - - // TODO /dag routes are experimental, and there isn't a whole lot of - // documentation available for how this route works. - // - // /// Add a DAG node to Ipfs. - // /// - // #[inline] - // pub fn dag_put(&self, data: R) -> AsyncResponse - // where - // R: 'static + Read + Send, - // { - // let mut form = multipart::Form::default(); - // - // form.add_reader("arg", data); - // - // self.request(&request::DagPut, Some(form)) - // } - - // TODO /dag/resolve - - /// Query the DHT for all of the multiaddresses associated with a Peer ID. - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; - /// let res = client.dht_findpeer(peer).try_collect::>(); - /// ``` - /// - #[inline] - pub fn dht_findpeer( - &self, - peer: &str, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::DhtFindPeer { peer }, None) => request_stream_json - } - } - - /// Find peers in the DHT that can provide a specific value given a key. - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; - /// let res = client.dht_findprovs(key).try_collect::>(); - /// ``` - /// - #[inline] - pub fn dht_findprovs( - &self, - key: &str, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::DhtFindProvs { key }, None) => request_stream_json - } - } - - /// Query the DHT for a given key. - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; - /// let res = client.dht_get(key).try_collect::>(); - /// ``` - /// - #[inline] - pub fn dht_get( - &self, - key: &str, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::DhtGet { key }, None) => request_stream_json - } - } - - /// Announce to the network that you are providing a given value. - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; - /// let res = client.dht_provide(key).try_collect::>(); - /// ``` - /// - #[inline] - pub fn dht_provide( - &self, - key: &str, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::DhtProvide { key }, None) => request_stream_json - } - } - - /// Write a key/value pair to the DHT. - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.dht_put("test", "Hello World!").try_collect::>(); - /// ``` - /// - #[inline] - pub fn dht_put( - &self, - key: &str, - value: &str, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::DhtPut { key, value }, None) => request_stream_json - } - } - - /// Find the closest peer given the peer ID by querying the DHT. - /// - /// ```no_run - /// use futures::TryStreamExt; - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; - /// let res = client.dht_query(peer).try_collect::>(); - /// ``` - /// - #[inline] - pub fn dht_query( - &self, - peer: &str, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::DhtQuery { peer }, None) => request_stream_json - } - } - - /// Clear inactive requests from the log. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.diag_cmds_clear(); - /// ``` - /// - #[inline] - pub async fn diag_cmds_clear(&self) -> Result { - self.request_empty(request::DiagCmdsClear, None).await - } - - /// Set how long to keep inactive requests in the log. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.diag_cmds_set_time("1m"); - /// ``` - /// - #[inline] - pub async fn diag_cmds_set_time( - &self, - time: &str, - ) -> Result { - self.request_empty(request::DiagCmdsSetTime { time }, None) - .await - } - - /// Print system diagnostic information. - /// - /// Note: There isn't good documentation on what this call is supposed to return. - /// It might be platform dependent, but if it isn't, this can be fixed to return - /// an actual object. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.diag_sys(); - /// ``` - /// - #[inline] - pub async fn diag_sys(&self) -> Result { - self.request_string(request::DiagSys, None).await - } - - /// Resolve DNS link. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.dns("ipfs.io", true); - /// ``` - /// - #[inline] - pub async fn dns(&self, link: &str, recursive: bool) -> Result { - self.request(request::Dns { link, recursive }, None).await - } - - /// List directory for Unix filesystem objects. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.file_ls("/ipns/ipfs.io"); - /// ``` - /// - #[inline] - pub async fn file_ls(&self, path: &str) -> Result { - self.request(request::FileLs { path }, None).await - } - - /// Copy files into MFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_cp("/path/to/file", "/dest"); - /// ``` - /// - #[inline] - pub async fn files_cp( - &self, - path: &str, - dest: &str, - ) -> Result { - self.request_empty(request::FilesCp { path, dest }, None) - .await - } - - /// Flush a path's data to disk. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_flush(None); - /// let res = client.files_flush(Some("/tmp")); - /// ``` - /// - #[inline] - pub async fn files_flush( - &self, - path: Option<&str>, - ) -> Result { - self.request_empty(request::FilesFlush { path }, None).await - } - - /// List directories in MFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_ls(None); - /// let res = client.files_ls(Some("/tmp")); - /// ``` - /// - #[inline] - pub async fn files_ls(&self, path: Option<&str>) -> Result { - self.request(request::FilesLs { path }, None).await - } - - /// Make directories in MFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_mkdir("/test", false); - /// let res = client.files_mkdir("/test/nested/dir", true); - /// ``` - /// - #[inline] - pub async fn files_mkdir( - &self, - path: &str, - parents: bool, - ) -> Result { - self.request_empty(request::FilesMkdir { path, parents }, None) - .await - } - - /// Copy files into MFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_mv("/test/tmp.json", "/test/file.json"); - /// ``` - /// - #[inline] - pub async fn files_mv( - &self, - path: &str, - dest: &str, - ) -> Result { - self.request_empty(request::FilesMv { path, dest }, None) - .await - } - - /// Read a file in MFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_read("/test/file.json"); - /// ``` - /// - #[inline] - pub fn files_read(&self, path: &str) -> impl Stream> { - impl_stream_api_response! { - (self, request::FilesRead { path }, None) => request_stream_bytes - } - } - - /// Remove a file in MFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_rm("/test/dir", true); - /// let res = client.files_rm("/test/file.json", false); - /// ``` - /// - #[inline] - pub async fn files_rm( - &self, - path: &str, - recursive: bool, - ) -> Result { - self.request_empty(request::FilesRm { path, recursive }, None) - .await - } - - /// Display a file's status in MDFS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.files_stat("/test/file.json"); - /// ``` - /// - #[inline] - pub async fn files_stat(&self, path: &str) -> Result { - self.request(request::FilesStat { path }, None).await - } - - /// Write to a mutable file in the filesystem. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// use std::fs::File; - /// - /// let client = IpfsClient::default(); - /// let file = File::open("test.json").unwrap(); - /// let res = client.files_write("/test/file.json", true, true, file); - /// ``` - /// - #[inline] - pub async fn files_write( - &self, - path: &str, - create: bool, - truncate: bool, - data: R, - ) -> Result - where - R: 'static + Read + Send + Sync, - { - let mut form = multipart::Form::default(); - - form.add_reader("data", data); - - self.request_empty( - request::FilesWrite { - path, - create, - truncate, - }, - Some(form), - ) - .await - } - - /// List blocks that are both in the filestore and standard block storage. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.filestore_dups(); - /// ``` - /// - #[inline] - pub fn filestore_dups( - &self, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::FilestoreDups, None) => request_stream_json - } - } - - /// List objects in filestore. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.filestore_ls( - /// Some("QmYPP3BovR2m8UqCZxFbdXSit6SKgExxDkFAPLqiGsap4X") - /// ); - /// ``` - /// - #[inline] - pub fn filestore_ls( - &self, - cid: Option<&str>, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::FilestoreLs { cid }, None) => request_stream_json - } - } - - /// Verify objects in filestore. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.filestore_verify(None); - /// ``` - /// - #[inline] - pub fn filestore_verify( - &self, - cid: Option<&str>, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::FilestoreVerify{ cid }, None) => request_stream_json - } - } - - /// Download Ipfs object. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.get("/test/file.json"); - /// ``` - /// - #[inline] - pub fn get(&self, path: &str) -> impl Stream> { - impl_stream_api_response! { - (self, request::Get { path }, None) => request_stream_bytes - } - } - - /// Returns information about a peer. - /// - /// If `peer` is `None`, returns information about you. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.id(None); - /// let res = client.id(Some("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM")); - /// ``` - /// - #[inline] - pub async fn id(&self, peer: Option<&str>) -> Result { - self.request(request::Id { peer }, None).await - } - - /// Create a new keypair. - /// - /// ```no_run - /// use ipfs_api::{IpfsClient, KeyType}; - /// - /// let client = IpfsClient::default(); - /// let res = client.key_gen("test", KeyType::Rsa, 64); - /// ``` - /// - #[inline] - pub async fn key_gen( - &self, - name: &str, - kind: request::KeyType, - size: i32, - ) -> Result { - self.request(request::KeyGen { name, kind, size }, None) - .await - } - - /// List all local keypairs. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.key_list(); - /// ``` - /// - #[inline] - pub async fn key_list(&self) -> Result { - self.request(request::KeyList, None).await - } - - /// Rename a keypair. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.key_rename("key_0", "new_name", false); - /// ``` - /// - #[inline] - pub async fn key_rename( - &self, - name: &str, - new: &str, - force: bool, - ) -> Result { - self.request(request::KeyRename { name, new, force }, None) - .await - } - - /// Remove a keypair. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.key_rm("key_0"); - /// ``` - /// - #[inline] - pub async fn key_rm(&self, name: &str) -> Result { - self.request(request::KeyRm { name }, None).await - } - - /// Change the logging level for a logger. - /// - /// ```no_run - /// use ipfs_api::{IpfsClient, Logger, LoggingLevel}; - /// use std::borrow::Cow; - /// - /// let client = IpfsClient::default(); - /// let res = client.log_level(Logger::All, LoggingLevel::Debug); - /// let res = client.log_level( - /// Logger::Specific(Cow::Borrowed("web")), - /// LoggingLevel::Warning - /// ); - /// ``` - /// - #[inline] - pub async fn log_level( - &self, - logger: request::Logger<'_>, - level: request::LoggingLevel, - ) -> Result { - self.request(request::LogLevel { logger, level }, None) - .await - } - - /// List all logging subsystems. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.log_ls(); - /// ``` - /// - #[inline] - pub async fn log_ls(&self) -> Result { - self.request(request::LogLs, None).await - } - - /// Read the event log. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.log_tail(); - /// ``` - /// - pub fn log_tail(&self) -> impl Stream> { - impl_stream_api_response! { - (self, request::LogTail, None) |req| => { - self.request_stream(req, |res| { - IpfsClient::process_stream_response(res, LineDecoder) - }) - } - } - } - - /// List the contents of an Ipfs multihash. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.ls(None); - /// let res = client.ls(Some("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY")); - /// ``` - /// - #[inline] - pub async fn ls(&self, path: Option<&str>) -> Result { - self.request(request::Ls { path }, None).await - } - - // TODO /mount - - /// Publish an IPFS path to IPNS. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.name_publish( - /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", - /// false, - /// Some("12h"), - /// None, - /// None - /// ); - /// ``` - /// - pub async fn name_publish( - &self, - path: &str, - resolve: bool, - lifetime: Option<&str>, - ttl: Option<&str>, - key: Option<&str>, - ) -> Result { - self.request( - request::NamePublish { - path, - resolve, - lifetime, - ttl, - key, - }, - None, - ) - .await - } - - /// Resolve an IPNS name. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.name_resolve( - /// Some("/ipns/ipfs.io"), - /// true, - /// false - /// ); - /// ``` - /// - pub async fn name_resolve( - &self, - name: Option<&str>, - recursive: bool, - nocache: bool, - ) -> Result { - self.request( - request::NameResolve { - name, - recursive, - nocache, - }, - None, - ) - .await - } - - /// Output the raw bytes of an Ipfs object. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.object_data("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); - /// ``` - /// - #[inline] - pub fn object_data(&self, key: &str) -> impl Stream> { - impl_stream_api_response! { - (self, request::ObjectData { key }, None) => request_stream_bytes - } - } - - /// Returns the diff of two Ipfs objects. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.object_diff( - /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", - /// "/ipfs/QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA" - /// ); - /// ``` - /// - #[inline] - pub async fn object_diff( - &self, - key0: &str, - key1: &str, - ) -> Result { - self.request(request::ObjectDiff { key0, key1 }, None).await - } - - /// Returns the data in an object. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.object_get("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); - /// ``` - /// - #[inline] - pub async fn object_get(&self, key: &str) -> Result { - self.request(request::ObjectGet { key }, None).await - } - - /// Returns the links that an object points to. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.object_links("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); - /// ``` - /// - #[inline] - pub async fn object_links(&self, key: &str) -> Result { - self.request(request::ObjectLinks { key }, None).await - } - - /// Create a new object. - /// - /// ```no_run - /// use ipfs_api::{IpfsClient, ObjectTemplate}; - /// - /// let client = IpfsClient::default(); - /// let res = client.object_new(None); - /// let res = client.object_new(Some(ObjectTemplate::UnixFsDir)); - /// ``` - /// - #[inline] - pub async fn object_new( - &self, - template: Option, - ) -> Result { - self.request(request::ObjectNew { template }, None).await - } - - // TODO /object/patch/add-link - - // TODO /object/patch/append-data - - // TODO /object/patch/rm-link - - // TODO /object/patch/set-data - - // TODO /object/put - - /// Returns the stats for an object. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.object_stat("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); - /// ``` - /// - #[inline] - pub async fn object_stat(&self, key: &str) -> Result { - self.request(request::ObjectStat { key }, None).await - } - - // TODO /p2p/listener/close - - // TODO /p2p/listener/ls - - // TODO /p2p/listener/open - - // TODO /p2p/stream/close - - // TODO /p2p/stream/dial - - // TODO /p2p/stream/ls - - /// Pins a new object. - /// - /// The "recursive" option tells the server whether to - /// pin just the top-level object, or all sub-objects - /// it depends on. For most cases you want it to be `true`. - /// - /// Does not yet implement the "progress" agument because - /// reading it is kinda squirrelly. - /// - /// # Examples - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pin_add("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", true); - /// ``` - #[inline] - pub async fn pin_add( - &self, - key: &str, - recursive: bool, - ) -> Result { - self.request( - request::PinAdd { - key, - recursive: Some(recursive), - progress: false, - }, - None, - ) - .await - } - - /// Returns a list of pinned objects in local storage. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pin_ls(None, None); - /// let res = client.pin_ls( - /// Some("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"), - /// None - /// ); - /// let res = client.pin_ls(None, Some("direct")); - /// ``` - /// - #[inline] - pub async fn pin_ls( - &self, - key: Option<&str>, - typ: Option<&str>, - ) -> Result { - self.request(request::PinLs { key, typ }, None).await - } - - /// Removes a pinned object from local storage. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pin_rm( - /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", - /// false - /// ); - /// let res = client.pin_rm( - /// "/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY", - /// true - /// ); - /// ``` - /// - #[inline] - pub async fn pin_rm( - &self, - key: &str, - recursive: bool, - ) -> Result { - self.request(request::PinRm { key, recursive }, None).await - } - - // TODO /pin/update - - // TODO /pin/verify - - /// Pings a peer. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.ping("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", None); - /// let res = client.ping("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", Some(15)); - /// ``` - /// - #[inline] - pub fn ping( - &self, - peer: &str, - count: Option, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::Ping { peer, count }, None) => request_stream_json - } - } - - /// List subscribed pubsub topics. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pubsub_ls(); - /// ``` - /// - #[inline] - pub async fn pubsub_ls(&self) -> Result { - self.request(request::PubsubLs, None).await - } - - /// List peers that are being published to. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pubsub_peers(None); - /// let res = client.pubsub_peers(Some("feed")); - /// ``` - /// - #[inline] - pub async fn pubsub_peers( - &self, - topic: Option<&str>, - ) -> Result { - self.request(request::PubsubPeers { topic }, None).await - } - - /// Publish a message to a topic. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pubsub_pub("feed", "Hello World!"); - /// ``` - /// - #[inline] - pub async fn pubsub_pub( - &self, - topic: &str, - payload: &str, - ) -> Result { - self.request_empty(request::PubsubPub { topic, payload }, None) - .await - } - - /// Subscribes to a pubsub topic. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.pubsub_sub("feed", false); - /// let res = client.pubsub_sub("feed", true); - /// ``` - /// - #[inline] - pub fn pubsub_sub( - &self, - topic: &str, - discover: bool, - ) -> impl Stream> { - impl_stream_api_response! { - (self, request::PubsubSub { topic, discover }, None) => request_stream_json - } - } - - /// Gets a list of local references. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.refs_local(); - /// ``` - /// - #[inline] - pub fn refs_local(&self) -> impl Stream> { - impl_stream_api_response! { - (self, request::RefsLocal, None) => request_stream_json - } - } - - // TODO /repo/fsck - - // TODO /repo/gc - - // TODO /repo/stat - - // TODO /repo/verify - - // TODO /repo/version - - // TODO /resolve - - /// Shutdown the Ipfs daemon. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.shutdown(); - /// ``` - /// - pub async fn shutdown(&self) -> Result { - self.request_empty(request::Shutdown, None).await - } - - /// Returns bitswap stats. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.stats_bitswap(); - /// ``` - /// - #[inline] - pub async fn stats_bitswap(&self) -> Result { - self.request(request::StatsBitswap, None).await - } - - /// Returns bandwidth stats. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.stats_bw(); - /// ``` - /// - #[inline] - pub async fn stats_bw(&self) -> Result { - self.request(request::StatsBw, None).await - } - - /// Returns repo stats. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.stats_repo(); - /// ``` - /// - #[inline] - pub async fn stats_repo(&self) -> Result { - self.request(request::StatsRepo, None).await - } - - // TODO /swarm/addrs/listen - - /// Return a list of local addresses. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.swarm_addrs_local(); - /// ``` - /// - #[inline] - pub async fn swarm_addrs_local(&self) -> Result { - self.request(request::SwarmAddrsLocal, None).await - } - - // TODO /swarm/connect - - // TODO /swarm/disconnect - - // TODO /swarm/filters/add - - // TODO /swarm/filters/rm - - /// Return a list of peers with open connections. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.swarm_peers(); - /// ``` - /// - #[inline] - pub async fn swarm_peers(&self) -> Result { - self.request(request::SwarmPeers, None).await - } - - /// Add a tar file to Ipfs. - /// - /// Note: `data` should already be a tar file. If it isn't the Api will return - /// an error. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// use std::fs::File; - /// - /// let client = IpfsClient::default(); - /// let tar = File::open("/path/to/file.tar").unwrap(); - /// let res = client.tar_add(tar); - /// ``` - /// - #[inline] - pub async fn tar_add(&self, data: R) -> Result - where - R: 'static + Read + Send + Sync, - { - let mut form = multipart::Form::default(); - - form.add_reader("file", data); - - self.request(request::TarAdd, Some(form)).await - } - - /// Export a tar file from Ipfs. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.tar_cat("/ipfs/QmVrLsEDn27sScp3k23sgZNefVTjSAL3wpgW1iWPi4MgoY"); - /// ``` - /// - #[inline] - pub fn tar_cat(&self, path: &str) -> impl Stream> { - impl_stream_api_response! { - (self, request::TarCat { path }, None) => request_stream_bytes - } - } - - /// Returns information about the Ipfs server version. - /// - /// ```no_run - /// use ipfs_api::IpfsClient; - /// - /// let client = IpfsClient::default(); - /// let res = client.version(); - /// ``` - /// - #[inline] - pub async fn version(&self) -> Result { - self.request(request::Version, None).await - } -} diff --git a/ipfs-api/src/client/mod.rs b/ipfs-api/src/client/mod.rs new file mode 100644 index 0000000..0cd7796 --- /dev/null +++ b/ipfs-api/src/client/mod.rs @@ -0,0 +1,2037 @@ +// Copyright 2017 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// +use crate::{ + header::TRAILER, + read::{JsonLineDecoder, LineDecoder, StreamReader}, + request::{self, ApiRequest}, + response::{self, Error}, + Client, Request, Response, +}; +#[cfg(feature = "actix")] +use actix_multipart::client::multipart; +use bytes::Bytes; +use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use http::{ + uri::{InvalidUri, Uri}, + StatusCode, +}; +#[cfg(feature = "hyper")] +use hyper::{body, client::Builder}; +#[cfg(feature = "hyper")] +use hyper_multipart::client::multipart; +#[cfg(feature = "hyper")] +use hyper_tls::HttpsConnector; +use parity_multiaddr::Protocol; +use serde::{Deserialize, Serialize}; +use serde_json; +#[cfg(feature = "actix")] +use std::time::Duration; +use std::{ + fs::{self, File}, + io::{Cursor, Read}, + net::{IpAddr, SocketAddr}, + path::{Path, PathBuf}, +}; +use tokio_util::codec::{Decoder, FramedRead}; + +const FILE_DESCRIPTOR_LIMIT: usize = 127; + +#[cfg(feature = "actix")] +const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90); + +/// Asynchronous Ipfs client. +/// +#[derive(Clone)] +pub struct IpfsClient { + base: Uri, + client: Client, +} + +impl Default for IpfsClient { + /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api. + /// If not found, tries to connect to `localhost:5001`. + /// + fn default() -> IpfsClient { + dirs::home_dir() + .map(|home_dir| home_dir.join(".ipfs").join("api")) + .and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok()) + .and_then(|multiaddr_str| parity_multiaddr::from_url(&multiaddr_str).ok()) + .and_then(|multiaddr| { + let mut addr: Option = None; + let mut port: Option = None; + for addr_component in multiaddr.iter() { + match addr_component { + Protocol::Ip4(v4addr) => addr = Some(v4addr.into()), + Protocol::Ip6(v6addr) => addr = Some(v6addr.into()), + Protocol::Tcp(tcpport) => port = Some(tcpport), + _ => { + return None; + } + } + } + if let (Some(addr), Some(port)) = (addr, port) { + Some(SocketAddr::new(addr, port)) + } else { + None + } + }) + .map(IpfsClient::from) + .unwrap_or_else(|| IpfsClient::new("localhost", 5001).unwrap()) + } +} + +impl From for IpfsClient { + fn from(socket_addr: SocketAddr) -> Self { + IpfsClient::new(&socket_addr.ip().to_string(), socket_addr.port()).unwrap() + } +} + +impl IpfsClient { + /// Creates a new `IpfsClient`. + /// + #[inline] + pub fn new(host: &str, port: u16) -> Result { + Self::new_from_uri(format!("http://{}:{}", host, port).as_str()) + } + + /// Creates a new `IpfsClient` for any given URI. + /// + #[inline] + pub fn new_from_uri(uri: &str) -> Result { + let base_path = IpfsClient::build_base_path(uri)?; + let client = { + #[cfg(feature = "hyper")] + { + Builder::default() + .pool_max_idle_per_host(0) + .build(HttpsConnector::new()) + } + #[cfg(feature = "actix")] + { + Client::default() + } + }; + + Ok(IpfsClient { + base: base_path, + client, + }) + } + + /// Builds the base url path for the Ipfs api. + /// + fn build_base_path(uri: &str) -> Result { + format!("{}/api/v0", uri).parse() + } + + /// Builds the url for an api call