From c429ee51d8c29678c216f44a48ae596f99decc4b Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Mon, 22 Feb 2021 20:32:31 -0500 Subject: declare new backend crates --- Cargo.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e37572d..44c7cba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,8 @@ [workspace] members = [ - "ipfs-api" + "ipfs-api", + "ipfs-api-prelude", + "ipfs-api-backend-actix", + "ipfs-api-backend-hyper" ] -- cgit v1.2.3 From 7dddbde3a1abcb96b75aa4636a07498193f7c551 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Mon, 22 Feb 2021 20:41:44 -0500 Subject: migrate common code to ipfs-api-prelude --- ipfs-api-prelude/Cargo.toml | 28 + ipfs-api-prelude/src/api.rs | 886 ++++++++++++ ipfs-api-prelude/src/backend.rs | 220 +++ ipfs-api-prelude/src/error.rs | 28 + ipfs-api-prelude/src/from_uri.rs | 203 +++ ipfs-api-prelude/src/header.rs | 12 + ipfs-api-prelude/src/lib.rs | 23 + ipfs-api-prelude/src/read.rs | 227 +++ ipfs-api-prelude/src/request/add.rs | 61 + ipfs-api-prelude/src/request/bitswap.rs | 55 + ipfs-api-prelude/src/request/block.rs | 48 + ipfs-api-prelude/src/request/bootstrap.rs | 33 + ipfs-api-prelude/src/request/cat.rs | 20 + ipfs-api-prelude/src/request/commands.rs | 17 + ipfs-api-prelude/src/request/config.rs | 53 + ipfs-api-prelude/src/request/dag.rs | 28 + ipfs-api-prelude/src/request/dht.rs | 73 + ipfs-api-prelude/src/request/diag.rs | 36 + ipfs-api-prelude/src/request/dns.rs | 22 + ipfs-api-prelude/src/request/file.rs | 20 + ipfs-api-prelude/src/request/files.rs | 201 +++ ipfs-api-prelude/src/request/filestore.rs | 38 + ipfs-api-prelude/src/request/get.rs | 20 + ipfs-api-prelude/src/request/id.rs | 20 + ipfs-api-prelude/src/request/key.rs | 78 ++ ipfs-api-prelude/src/request/log.rs | 85 ++ ipfs-api-prelude/src/request/ls.rs | 55 + ipfs-api-prelude/src/request/mod.rs | 133 ++ ipfs-api-prelude/src/request/name.rs | 42 + ipfs-api-prelude/src/request/object.rs | 105 ++ ipfs-api-prelude/src/request/pin.rs | 48 + ipfs-api-prelude/src/request/ping.rs | 22 + ipfs-api-prelude/src/request/pubsub.rs | 53 + ipfs-api-prelude/src/request/refs.rs | 17 + ipfs-api-prelude/src/request/shutdown.rs | 17 + ipfs-api-prelude/src/request/stats.rs | 33 + ipfs-api-prelude/src/request/swarm.rs | 25 + ipfs-api-prelude/src/request/tar.rs | 28 + ipfs-api-prelude/src/request/version.rs | 17 + ipfs-api-prelude/src/response/add.rs | 17 + ipfs-api-prelude/src/response/bitswap.rs | 55 + ipfs-api-prelude/src/response/block.rs | 35 + ipfs-api-prelude/src/response/bootstrap.rs | 36 + ipfs-api-prelude/src/response/commands.rs | 34 + ipfs-api-prelude/src/response/config.rs | 23 + ipfs-api-prelude/src/response/dag.rs | 46 + ipfs-api-prelude/src/response/dht.rs | 85 ++ ipfs-api-prelude/src/response/diag.rs | 13 + ipfs-api-prelude/src/response/dns.rs | 15 + ipfs-api-prelude/src/response/error.rs | 23 + ipfs-api-prelude/src/response/file.rs | 40 + ipfs-api-prelude/src/response/files.rs | 68 + ipfs-api-prelude/src/response/filestore.rs | 33 + ipfs-api-prelude/src/response/id.rs | 30 + ipfs-api-prelude/src/response/key.rs | 46 + ipfs-api-prelude/src/response/log.rs | 28 + ipfs-api-prelude/src/response/ls.rs | 43 + ipfs-api-prelude/src/response/mod.rs | 106 ++ ipfs-api-prelude/src/response/mount.rs | 26 + ipfs-api-prelude/src/response/name.rs | 27 + ipfs-api-prelude/src/response/object.rs | 123 ++ ipfs-api-prelude/src/response/pin.rs | 47 + ipfs-api-prelude/src/response/ping.rs | 24 + ipfs-api-prelude/src/response/pubsub.rs | 48 + ipfs-api-prelude/src/response/refs.rs | 23 + ipfs-api-prelude/src/response/repo.rs | 58 + ipfs-api-prelude/src/response/resolve.rs | 20 + ipfs-api-prelude/src/response/serde.rs | 171 +++ ipfs-api-prelude/src/response/shutdown.rs | 9 + ipfs-api-prelude/src/response/stats.rs | 28 + ipfs-api-prelude/src/response/swarm.rs | 78 ++ ipfs-api-prelude/src/response/tar.rs | 21 + .../src/response/tests/v0_bitswap_stat_0.json | 352 +++++ .../src/response/tests/v0_block_stat_0.json | 4 + .../src/response/tests/v0_bootstrap_list_0.json | 22 + .../src/response/tests/v0_commands_0.json | 1465 ++++++++++++++++++++ .../src/response/tests/v0_dag_get_0.json | 19 + .../src/response/tests/v0_file_ls_0.json | 50 + .../src/response/tests/v0_file_ls_1.json | 4 + .../src/response/tests/v0_files_ls_0.json | 3 + .../src/response/tests/v0_files_stat_0.json | 7 + ipfs-api-prelude/src/response/tests/v0_id_0.json | 9 + .../src/response/tests/v0_key_gen_0.json | 4 + .../src/response/tests/v0_key_list_0.json | 16 + .../src/response/tests/v0_key_rename_0.json | 6 + .../src/response/tests/v0_key_rm_0.json | 8 + .../src/response/tests/v0_log_ls_0.json | 71 + ipfs-api-prelude/src/response/tests/v0_ls_0.json | 75 + ipfs-api-prelude/src/response/tests/v0_ls_1.json | 45 + .../src/response/tests/v0_mount_0.json | 5 + .../src/response/tests/v0_name_resolve_0.json | 3 + .../src/response/tests/v0_object_diff_0.json | 14 + .../src/response/tests/v0_object_links_0.json | 40 + .../src/response/tests/v0_object_stat_0.json | 8 + .../src/response/tests/v0_pin_add_0.json | 5 + .../src/response/tests/v0_pin_ls_0.json | 7 + ipfs-api-prelude/src/response/tests/v0_ping_0.json | 5 + ipfs-api-prelude/src/response/tests/v0_ping_1.json | 5 + ipfs-api-prelude/src/response/tests/v0_ping_2.json | 5 + .../src/response/tests/v0_pubsub_ls_0.json | 3 + .../src/response/tests/v0_pubsub_ls_1.json | 5 + .../src/response/tests/v0_pubsub_peers_0.json | 1 + .../src/response/tests/v0_pubsub_sub_0.json | 8 + .../src/response/tests/v0_pubsub_sub_1.json | 1 + .../src/response/tests/v0_refs_local_0.json | 4 + .../src/response/tests/v0_repo_gc_0.json | 5 + .../src/response/tests/v0_repo_stat_0.json | 1 + .../src/response/tests/v0_repo_verify_0.json | 4 + .../src/response/tests/v0_repo_verify_1.json | 4 + .../src/response/tests/v0_repo_version_0.json | 3 + .../src/response/tests/v0_resolve_0.json | 3 + .../src/response/tests/v0_stats_bw_0.json | 6 + .../src/response/tests/v0_swarm_addrs_local_0.json | 8 + .../src/response/tests/v0_swarm_peers_0.json | 88 ++ .../src/response/tests/v0_swarm_peers_1.json | 18 + .../src/response/tests/v0_swarm_peers_2.json | 32 + .../src/response/tests/v0_tar_add_0.json | 5 + .../src/response/tests/v0_version_0.json | 7 + .../src/response/tests/v0_version_1.json | 5 + ipfs-api-prelude/src/response/version.rs | 29 + 120 files changed, 7106 insertions(+) create mode 100644 ipfs-api-prelude/Cargo.toml create mode 100644 ipfs-api-prelude/src/api.rs create mode 100644 ipfs-api-prelude/src/backend.rs create mode 100644 ipfs-api-prelude/src/error.rs create mode 100644 ipfs-api-prelude/src/from_uri.rs create mode 100644 ipfs-api-prelude/src/header.rs create mode 100644 ipfs-api-prelude/src/lib.rs create mode 100644 ipfs-api-prelude/src/read.rs create mode 100644 ipfs-api-prelude/src/request/add.rs create mode 100644 ipfs-api-prelude/src/request/bitswap.rs create mode 100644 ipfs-api-prelude/src/request/block.rs create mode 100644 ipfs-api-prelude/src/request/bootstrap.rs create mode 100644 ipfs-api-prelude/src/request/cat.rs create mode 100644 ipfs-api-prelude/src/request/commands.rs create mode 100644 ipfs-api-prelude/src/request/config.rs create mode 100644 ipfs-api-prelude/src/request/dag.rs create mode 100644 ipfs-api-prelude/src/request/dht.rs create mode 100644 ipfs-api-prelude/src/request/diag.rs create mode 100644 ipfs-api-prelude/src/request/dns.rs create mode 100644 ipfs-api-prelude/src/request/file.rs create mode 100644 ipfs-api-prelude/src/request/files.rs create mode 100644 ipfs-api-prelude/src/request/filestore.rs create mode 100644 ipfs-api-prelude/src/request/get.rs create mode 100644 ipfs-api-prelude/src/request/id.rs create mode 100644 ipfs-api-prelude/src/request/key.rs create mode 100644 ipfs-api-prelude/src/request/log.rs create mode 100644 ipfs-api-prelude/src/request/ls.rs create mode 100644 ipfs-api-prelude/src/request/mod.rs create mode 100644 ipfs-api-prelude/src/request/name.rs create mode 100644 ipfs-api-prelude/src/request/object.rs create mode 100644 ipfs-api-prelude/src/request/pin.rs create mode 100644 ipfs-api-prelude/src/request/ping.rs create mode 100644 ipfs-api-prelude/src/request/pubsub.rs create mode 100644 ipfs-api-prelude/src/request/refs.rs create mode 100644 ipfs-api-prelude/src/request/shutdown.rs create mode 100644 ipfs-api-prelude/src/request/stats.rs create mode 100644 ipfs-api-prelude/src/request/swarm.rs create mode 100644 ipfs-api-prelude/src/request/tar.rs create mode 100644 ipfs-api-prelude/src/request/version.rs create mode 100644 ipfs-api-prelude/src/response/add.rs create mode 100644 ipfs-api-prelude/src/response/bitswap.rs create mode 100644 ipfs-api-prelude/src/response/block.rs create mode 100644 ipfs-api-prelude/src/response/bootstrap.rs create mode 100644 ipfs-api-prelude/src/response/commands.rs create mode 100644 ipfs-api-prelude/src/response/config.rs create mode 100644 ipfs-api-prelude/src/response/dag.rs create mode 100644 ipfs-api-prelude/src/response/dht.rs create mode 100644 ipfs-api-prelude/src/response/diag.rs create mode 100644 ipfs-api-prelude/src/response/dns.rs create mode 100644 ipfs-api-prelude/src/response/error.rs create mode 100644 ipfs-api-prelude/src/response/file.rs create mode 100644 ipfs-api-prelude/src/response/files.rs create mode 100644 ipfs-api-prelude/src/response/filestore.rs create mode 100644 ipfs-api-prelude/src/response/id.rs create mode 100644 ipfs-api-prelude/src/response/key.rs create mode 100644 ipfs-api-prelude/src/response/log.rs create mode 100644 ipfs-api-prelude/src/response/ls.rs create mode 100644 ipfs-api-prelude/src/response/mod.rs create mode 100644 ipfs-api-prelude/src/response/mount.rs create mode 100644 ipfs-api-prelude/src/response/name.rs create mode 100644 ipfs-api-prelude/src/response/object.rs create mode 100644 ipfs-api-prelude/src/response/pin.rs create mode 100644 ipfs-api-prelude/src/response/ping.rs create mode 100644 ipfs-api-prelude/src/response/pubsub.rs create mode 100644 ipfs-api-prelude/src/response/refs.rs create mode 100644 ipfs-api-prelude/src/response/repo.rs create mode 100644 ipfs-api-prelude/src/response/resolve.rs create mode 100644 ipfs-api-prelude/src/response/serde.rs create mode 100644 ipfs-api-prelude/src/response/shutdown.rs create mode 100644 ipfs-api-prelude/src/response/stats.rs create mode 100644 ipfs-api-prelude/src/response/swarm.rs create mode 100644 ipfs-api-prelude/src/response/tar.rs create mode 100644 ipfs-api-prelude/src/response/tests/v0_bitswap_stat_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_block_stat_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_bootstrap_list_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_commands_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_dag_get_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_file_ls_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_file_ls_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_files_ls_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_files_stat_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_id_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_key_gen_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_key_list_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_key_rename_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_key_rm_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_log_ls_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_ls_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_ls_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_mount_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_name_resolve_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_object_diff_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_object_links_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_object_stat_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pin_add_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pin_ls_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_ping_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_ping_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_ping_2.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pubsub_ls_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pubsub_ls_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pubsub_peers_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pubsub_sub_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_pubsub_sub_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_refs_local_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_repo_gc_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_repo_stat_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_repo_verify_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_repo_verify_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_repo_version_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_resolve_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_stats_bw_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_swarm_addrs_local_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_swarm_peers_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_swarm_peers_1.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_swarm_peers_2.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_tar_add_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_version_0.json create mode 100644 ipfs-api-prelude/src/response/tests/v0_version_1.json create mode 100644 ipfs-api-prelude/src/response/version.rs diff --git a/ipfs-api-prelude/Cargo.toml b/ipfs-api-prelude/Cargo.toml new file mode 100644 index 0000000..61590a6 --- /dev/null +++ b/ipfs-api-prelude/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "ipfs-api-prelude" +description = "Shared code for IPFS HTTP API client" +authors = ["Ferris Tseng "] +edition = "2018" +documentation = "https://docs.rs/ipfs-api" +repository = "https://github.com/ferristseng/rust-ipfs-api" +keywords = ["ipfs"] +categories = ["filesystem", "web-programming"] +version = "0.1.0" +readme = "../README.md" +license = "MIT OR Apache-2.0" + +[dependencies] +async-trait = "0.1" +bytes = "1.0" +dirs = "3.0" +futures = "0.3" +http = "0.2" +parity-multiaddr = "0.11" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_urlencoded = "0.7" +thiserror = "1.0" +tokio = "1.2" +tokio-util = { version = "0.6", features = ["codec"] } +tracing = "0.1" +walkdir = "2.3" diff --git a/ipfs-api-prelude/src/api.rs b/ipfs-api-prelude/src/api.rs new file mode 100644 index 0000000..237165e --- /dev/null +++ b/ipfs-api-prelude/src/api.rs @@ -0,0 +1,886 @@ +// Copyright 2021 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use crate::{request, response, Backend}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{future, FutureExt, Stream, StreamExt, TryStreamExt}; +use std::{ + fs::File, + io::{Cursor, Read}, + path::{Path, PathBuf}, +}; + +const FILE_DESCRIPTOR_LIMIT: usize = 127; + +// Implements a call to the IPFS that returns a streaming body response. +// Implementing this in a macro is necessary because the Rust compiler +// can't reason about the lifetime of the request instance properly. It +// thinks that the request needs to live as long as the returned stream, +// but in reality, the request instance is only used to build the Hyper +// or Actix request. +// +macro_rules! impl_stream_api_response { + (($self:ident, $req:expr, $form:expr) => $call:ident) => { + impl_stream_api_response! { + ($self, $req, $form) |req| => { $self.$call(req) } + } + }; + (($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => { + match $self.build_base_request(&$req, $form) { + Ok($var) => Box::new($impl), + Err(e) => Box::new(future::err(e).into_stream()), + } + }; +} + +#[async_trait(?Send)] +pub trait IpfsApi: Backend { + /// Add file to Ipfs. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let data = Cursor::new("Hello World!"); + /// let res = client.add(data); + /// ``` + /// + async fn add(&self, data: R) -> Result + where + R: 'static + Read + Send + Sync, + { + self.add_with_options(data, request::Add::default()).await + } + + /// Add a file to IPFS with options. + /// + /// # Examples + /// + /// ```no_run + /// # extern crate ipfs_api; + /// # + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// # fn main() { + /// let client = IpfsClient::default(); + /// let data = Cursor::new("Hello World!"); + /// #[cfg(feature = "with-builder")] + /// let add = ipfs_api::request::Add::builder() + /// .chunker("rabin-512-1024-2048") + /// .build(); + /// #[cfg(not(feature = "with-builder"))] + /// let add = ipfs_api::request::Add { + /// chunker: Some("rabin-512-1024-2048"), + /// ..Default::default() + /// }; + /// let req = client.add_with_options(data, add); + /// # } + /// ``` + /// + async fn add_with_options( + &self, + data: R, + add: request::Add<'_>, + ) -> Result + where + R: 'static + Read + Send + Sync, + { + let mut form = Self::MultipartForm::default(); + + //form.add_reader("path", data); + + self.request(add, Some(form)).await + } + + /// Add a path to Ipfs. Can be a file or directory. + /// A hard limit of 128 open file descriptors is set such + /// that any small additional files are stored in-memory. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let path = "./src"; + /// let res = client.add_path(path); + /// ``` + /// + async fn add_path

(&self, path: P) -> Result, Self::Error> + where + P: AsRef, + { + let prefix = path.as_ref().parent(); + let mut paths_to_add: Vec<(PathBuf, u64)> = vec![]; + + for path in walkdir::WalkDir::new(path.as_ref()) { + match path { + Ok(entry) if entry.file_type().is_file() => { + let file_size = entry + .metadata() + .map(|metadata| metadata.len()) + .map_err(|e| crate::Error::Io(e.into()))?; + + paths_to_add.push((entry.path().to_path_buf(), file_size)); + } + Ok(_) => (), + Err(e) => return Err(crate::Error::Io(e.into()).into()), + } + } + + paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse()); + + let mut it = 0; + let mut form = Self::MultipartForm::default(); + + for (path, file_size) in paths_to_add { + let mut file = File::open(&path).map_err(|e| crate::Error::Io(e))?; + let file_name = match prefix { + Some(prefix) => path.strip_prefix(prefix).unwrap(), + None => path.as_path(), + } + .to_string_lossy(); + + if it < FILE_DESCRIPTOR_LIMIT { + //form.add_reader_file("path", file, file_name); + + it += 1; + } else { + let mut buf = Vec::with_capacity(file_size as usize); + let _ = file + .read_to_end(&mut buf) + .map_err(|e| crate::Error::Io(e))?; + + //form.add_reader_file("path", Cursor::new(buf), file_name); + } + } + + let req = self.build_base_request(&request::Add::default(), Some(form))?; + + self.request_stream_json(req).try_collect().await + } + + /// Returns the current ledger for a peer. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_ledger("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"); + /// ``` + /// + async fn bitswap_ledger( + &self, + peer: &str, + ) -> Result { + self.request(request::BitswapLedger { peer }, None).await + } + + /// Triggers a reprovide. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_reprovide(); + /// ``` + /// + async fn bitswap_reprovide(&self) -> Result { + self.request_empty(request::BitswapReprovide, None).await + } + + /// Returns some stats about the bitswap agent. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_stat(); + /// ``` + /// + async fn bitswap_stat(&self) -> Result { + self.request(request::BitswapStat, None).await + } + + /// Remove a given block from your wantlist. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_unwant("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + async fn bitswap_unwant( + &self, + key: &str, + ) -> Result { + self.request_empty(request::BitswapUnwant { key }, None) + .await + } + + /// Shows blocks on the wantlist for you or the specified peer. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bitswap_wantlist( + /// Some("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ") + /// ); + /// ``` + /// + async fn bitswap_wantlist( + &self, + peer: Option<&str>, + ) -> Result { + self.request(request::BitswapWantlist { peer }, None).await + } + + /// Gets a raw IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client + /// .block_get(hash) + /// .map_ok(|chunk| chunk.to_vec()) + /// .try_concat(); + /// ``` + /// + fn block_get(&self, hash: &str) -> Box> + Unpin> { + impl_stream_api_response! { + (self, request::BlockGet { hash }, None) => request_stream_bytes + } + } + + /// Store input as an IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let data = Cursor::new("Hello World!"); + /// let res = client.block_put(data); + /// ``` + /// + async fn block_put(&self, data: R) -> Result + where + R: 'static + Read + Send + Sync, + { + let mut form = Self::MultipartForm::default(); + + //form.add_reader("data", data); + + self.request(request::BlockPut, Some(form)).await + } + + /// Removes an IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.block_rm("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + async fn block_rm(&self, hash: &str) -> Result { + self.request(request::BlockRm { hash }, None).await + } + + /// Prints information about a raw IPFS block. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.block_stat("QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"); + /// ``` + /// + async fn block_stat(&self, hash: &str) -> Result { + self.request(request::BlockStat { hash }, None).await + } + + /// Add default peers to the bootstrap list. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bootstrap_add_default(); + /// ``` + /// + async fn bootstrap_add_default( + &self, + ) -> Result { + self.request(request::BootstrapAddDefault, None).await + } + + /// Lists peers in bootstrap list. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bootstrap_list(); + /// ``` + /// + async fn bootstrap_list(&self) -> Result { + self.request(request::BootstrapList, None).await + } + + /// Removes all peers in bootstrap list. + /// + /// # Examples + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.bootstrap_rm_all(); + /// ``` + /// + async fn bootstrap_rm_all(&self) -> Result { + self.request(request::BootstrapRmAll, None).await + } + + /// Returns the contents of an Ipfs object. + /// + /// # Examples + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client + /// .cat(hash) + /// .map_ok(|chunk| chunk.to_vec()) + /// .try_concat(); + /// ``` + /// + fn cat(&self, path: &str) -> Box>> { + impl_stream_api_response! { + (self, request::Cat { path }, None) => request_stream_bytes + } + } + + /// List available commands that the server accepts. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.commands(); + /// ``` + /// + async fn commands(&self) -> Result { + self.request(request::Commands, None).await + } + + /// Get ipfs config strings. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_get_string("Identity.PeerID"); + /// ``` + /// + async fn config_get_string(&self, key: &str) -> Result { + self.request( + request::Config { + key, + value: None, + boolean: None, + stringified_json: None, + }, + None, + ) + .await + } + + /// Get ipfs config booleans. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_get_bool("Datastore.HashOnRead"); + /// ``` + /// + async fn config_get_bool(&self, key: &str) -> Result { + self.request( + request::Config { + key, + value: None, + boolean: None, + stringified_json: None, + }, + None, + ) + .await + } + + /// Get ipfs config json. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_get_json("Mounts"); + /// ``` + /// + async fn config_get_json(&self, key: &str) -> Result { + self.request( + request::Config { + key, + value: None, + boolean: None, + stringified_json: None, + }, + None, + ) + .await + } + + /// Set ipfs config string. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_set_string("Routing.Type", "dht"); + /// ``` + /// + async fn config_set_string( + &self, + key: &str, + value: &str, + ) -> Result { + self.request( + request::Config { + key, + value: Some(value), + boolean: None, + stringified_json: None, + }, + None, + ) + .await + } + + /// Set ipfs config boolean. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_set_bool("Pubsub.DisableSigning", false); + /// ``` + /// + async fn config_set_bool( + &self, + key: &str, + value: bool, + ) -> Result { + self.request( + request::Config { + key, + value: Some(&value.to_string()), + boolean: Some(true), + stringified_json: None, + }, + None, + ) + .await + } + + /// Set ipfs config json. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_set_json("Discovery", r#"{"MDNS":{"Enabled":true,"Interval":10}}"#); + /// ``` + /// + async fn config_set_json( + &self, + key: &str, + value: &str, + ) -> Result { + self.request( + request::Config { + key, + value: Some(value), + boolean: None, + stringified_json: Some(true), + }, + None, + ) + .await + } + + /// Opens the config file for editing (on the server). + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_edit(); + /// ``` + /// + async fn config_edit(&self) -> Result { + self.request(request::ConfigEdit, None).await + } + + /// Replace the config file. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let config = Cursor::new("{..json..}"); + /// let res = client.config_replace(config); + /// ``` + /// + async fn config_replace(&self, data: R) -> Result + where + R: 'static + Read + Send + Sync, + { + let mut form = Self::MultipartForm::default(); + + //form.add_reader("file", data); + + self.request_empty(request::ConfigReplace, Some(form)).await + } + + /// Show the current config of the server. + /// + /// Returns an unparsed json string, due to an unclear spec. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.config_show(); + /// ``` + /// + async fn config_show(&self) -> Result { + self.request_string(request::ConfigShow, None).await + } + + /// Returns information about a dag node in Ipfs. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let hash = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client + /// .dag_get(hash) + /// .map_ok(|chunk| chunk.to_vec()) + /// .try_concat(); + /// ``` + /// + fn dag_get(&self, path: &str) -> Box>> { + impl_stream_api_response! { + (self, request::DagGet { path }, None) => request_stream_bytes + } + } + + /// Add a DAG node to Ipfs. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// use std::io::Cursor; + /// + /// let client = IpfsClient::default(); + /// let data = Cursor::new(r#"{ "hello" : "world" }"#); + /// let res = client.dag_put(data); + /// ``` + /// + async fn dag_put(&self, data: R) -> Result + where + R: 'static + Read + Send + Sync, + { + let mut form = Self::MultipartForm::default(); + + //form.add_reader("object data", data); + + self.request(request::DagPut, Some(form)).await + } + + // TODO /dag/resolve + + /// Query the DHT for all of the multiaddresses associated with a Peer ID. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; + /// let res = client.dht_findpeer(peer).try_collect::>(); + /// ``` + /// + fn dht_findpeer( + &self, + peer: &str, + ) -> Box> + Unpin> { + impl_stream_api_response! { + (self, request::DhtFindPeer { peer }, None) => request_stream_json + } + } + + /// Find peers in the DHT that can provide a specific value given a key. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client.dht_findprovs(key).try_collect::>(); + /// ``` + /// + fn dht_findprovs( + &self, + key: &str, + ) -> Box> + Unpin> { + impl_stream_api_response! { + (self, request::DhtFindProvs { key }, None) => request_stream_json + } + } + + /// Query the DHT for a given key. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client.dht_get(key).try_collect::>(); + /// ``` + /// + fn dht_get( + &self, + key: &str, + ) -> Box>> { + impl_stream_api_response! { + (self, request::DhtGet { key }, None) => request_stream_json + } + } + + /// Announce to the network that you are providing a given value. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let key = "QmXdNSQx7nbdRvkjGCEQgVjVtVwsHvV8NmV2a8xzQVwuFA"; + /// let res = client.dht_provide(key).try_collect::>(); + /// ``` + /// + fn dht_provide( + &self, + key: &str, + ) -> Box>> { + impl_stream_api_response! { + (self, request::DhtProvide { key }, None) => request_stream_json + } + } + + /// Write a key/value pair to the DHT. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.dht_put("test", "Hello World!").try_collect::>(); + /// ``` + /// + fn dht_put( + &self, + key: &str, + value: &str, + ) -> Box>> { + impl_stream_api_response! { + (self, request::DhtPut { key, value }, None) => request_stream_json + } + } + + /// Find the closest peer given the peer ID by querying the DHT. + /// + /// ```no_run + /// use futures::TryStreamExt; + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let peer = "QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"; + /// let res = client.dht_query(peer).try_collect::>(); + /// ``` + /// + fn dht_query( + &self, + peer: &str, + ) -> Box>> { + impl_stream_api_response! { + (self, request::DhtQuery { peer }, None) => request_stream_json + } + } + + /// Clear inactive requests from the log. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.diag_cmds_clear(); + /// ``` + /// + async fn diag_cmds_clear(&self) -> Result { + self.request_empty(request::DiagCmdsClear, None).await + } + + /// Set how long to keep inactive requests in the log. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.diag_cmds_set_time("1m"); + /// ``` + /// + async fn diag_cmds_set_time( + &self, + time: &str, + ) -> Result { + self.request_empty(request::DiagCmdsSetTime { time }, None) + .await + } + + /// Print system diagnostic information. + /// + /// Note: There isn't good documentation on what this call is supposed to return. + /// It might be platform dependent, but if it isn't, this can be fixed to return + /// an actual object. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.diag_sys(); + /// ``` + /// + async fn diag_sys(&self) -> Result { + self.request_string(request::DiagSys, None).await + } + + /// Resolve DNS link. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.dns("ipfs.io", true); + /// ``` + /// + async fn dns(&self, link: &str, recursive: bool) -> Result { + self.request(request::Dns { link, recursive }, None).await + } + + /// List directory for Unix filesystem objects. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.file_ls("/ipns/ipfs.io"); + /// ``` + /// + async fn file_ls(&self, path: &str) -> Result { + self.request(request::FileLs { path }, None).await + } + + /// Copy files into MFS. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.files_cp("/path/to/file", "/dest"); + /// ``` + /// + async fn files_cp( + &self, + path: &str, + dest: &str, + ) -> Result { + self.files_cp_with_options(request::FilesCp { + path, + dest, + ..Default::default() + }) + .await + } + + /// Copy files into MFS. + /// + /// ```no_run + /// use ipfs_api::IpfsClient; + /// + /// let client = IpfsClient::default(); + /// let res = client.files_cp("/path/to/file", "/dest"); + /// ``` + /// + async fn files_cp_with_options( + &self, + options: request::FilesCp<'_>, + ) -> Result { + self.request_empty(options, None).await + } +} diff --git a/ipfs-api-prelude/src/backend.rs b/ipfs-api-prelude/src/backend.rs new file mode 100644 index 0000000..bcc32e9 --- /dev/null +++ b/ipfs-api-prelude/src/backend.rs @@ -0,0 +1,220 @@ +// Copyright 2021 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use crate::{ + header::{TRAILER, X_STREAM_ERROR_KEY}, + read::{JsonLineDecoder, StreamReader}, + ApiError, ApiRequest, +}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::{future, FutureExt, Stream, StreamExt, TryStreamExt}; +use http::{ + header::{HeaderName, HeaderValue}, + StatusCode, +}; +use serde::{Deserialize, Serialize}; +use std::{fmt::Display, string::FromUtf8Error}; +use tokio_util::codec::{Decoder, FramedRead}; + +#[async_trait(?Send)] +pub trait Backend: Default { + /// HTTP request type. + /// + type HttpRequest; + + /// HTTP response type. + /// + type HttpResponse; + + /// HTTP multipart form type. + /// + type MultipartForm: Default; + + /// Error type for Result. + /// + type Error: Display + + From + + From + + From + + From + + 'static; + + fn build_base_request( + &self, + req: &Req, + form: Option, + ) -> Result + where + Req: ApiRequest; + + fn get_header<'a>(res: &'a Self::HttpResponse, key: HeaderName) -> Option<&'a HeaderValue>; + + async fn request_raw( + &self, + req: Req, + form: Option, + ) -> Result<(StatusCode, Bytes), Self::Error> + where + Req: ApiRequest + Serialize; + + fn response_to_byte_stream( + res: Self::HttpResponse, + ) -> Box> + Unpin>; + + fn request_stream( + &self, + req: Self::HttpRequest, + process: F, + ) -> Box> + Unpin> + where + OutStream: Stream> + Unpin, + F: 'static + Fn(Self::HttpResponse) -> OutStream; + + /// Builds an Api error from a response body. + /// + #[inline] + fn process_error_from_body(body: Bytes) -> Self::Error { + match serde_json::from_slice::(&body) { + Ok(e) => e.into(), + Err(_) => match String::from_utf8(body.to_vec()) { + Ok(s) => crate::Error::UnrecognizedApiError(s).into(), + Err(e) => e.into(), + }, + } + } + + /// Processes a response that expects a json encoded body, returning an + /// error or a deserialized json response. + /// + fn process_json_response(status: StatusCode, body: Bytes) -> Result + where + for<'de> Res: 'static + Deserialize<'de>, + { + match status { + StatusCode::OK => serde_json::from_slice(&body).map_err(From::from), + _ => Err(Self::process_error_from_body(body)), + } + } + + /// Processes a response that returns a stream of json deserializable + /// results. + /// + fn process_stream_response( + res: Self::HttpResponse, + decoder: D, + ) -> FramedRead> + Unpin>>, D> + where + D: Decoder, + { + FramedRead::new( + StreamReader::new(Self::response_to_byte_stream(res)), + decoder, + ) + } + + /// Generic method for making a request to the Ipfs server, and getting + /// a deserializable response. + /// + async fn request( + &self, + req: Req, + form: Option, + ) -> Result + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de>, + { + let (status, chunk) = self.request_raw(req, form).await?; + + Self::process_json_response(status, chunk) + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a response with no body. + /// + async fn request_empty( + &self, + req: Req, + form: Option, + ) -> Result<(), Self::Error> + where + Req: ApiRequest + Serialize, + { + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => Ok(()), + _ => Err(Self::process_error_from_body(chunk)), + } + } + + /// Generic method for making a request to the Ipfs server, and getting + /// back a raw String response. + /// + async fn request_string( + &self, + req: Req, + form: Option, + ) -> Result + where + Req: ApiRequest + Serialize, + { + let (status, chunk) = self.request_raw(req, form).await?; + + match status { + StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(Self::Error::from), + _ => Err(Self::process_error_from_body(chunk)), + } + } + + fn request_stream_bytes( + &self, + req: Self::HttpRequest, + ) -> Box> + Unpin> { + self.request_stream(req, |res| Self::response_to_byte_stream(res)) + } + + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream_json( + &self, + req: Self::HttpRequest, + ) -> Box> + Unpin> + where + for<'de> Res: 'static + Deserialize<'de>, + { + self.request_stream(req, |res| { + let parse_stream_error = if let Some(trailer) = Self::get_header(&res, TRAILER) { + // Response has the Trailer header set. The StreamError trailer + // is used to indicate that there was an error while streaming + // data with Ipfs. + // + if trailer == X_STREAM_ERROR_KEY { + true + } else { + let err = crate::Error::UnrecognizedTrailerHeader( + String::from_utf8_lossy(trailer.as_ref()).into(), + ); + + // There was an unrecognized trailer value. If that is the case, + // create a stream that immediately errors. + // + return future::err(err).into_stream().err_into().left_stream(); + } + } else { + false + }; + + Self::process_stream_response(res, JsonLineDecoder::new(parse_stream_error)) + .err_into() + .right_stream() + }) + } +} diff --git a/ipfs-api-prelude/src/error.rs b/ipfs-api-prelude/src/error.rs new file mode 100644 index 0000000..1b9bcaa --- /dev/null +++ b/ipfs-api-prelude/src/error.rs @@ -0,0 +1,28 @@ +// Copyright 2021 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use std::io; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("io error `{0}`")] + Io(#[from] io::Error), + + #[error("utf8 decoding error `{0}`")] + Parse(#[from] serde_json::Error), + + #[error("api returned an error while streaming: `{0}`")] + StreamError(String), + + #[error("api got unrecognized trailer header: `{0}`")] + UnrecognizedTrailerHeader(String), + + #[error("api returned an unknown error: `{0}`")] + UnrecognizedApiError(String), +} diff --git a/ipfs-api-prelude/src/from_uri.rs b/ipfs-api-prelude/src/from_uri.rs new file mode 100644 index 0000000..a52a61a --- /dev/null +++ b/ipfs-api-prelude/src/from_uri.rs @@ -0,0 +1,203 @@ +// Copyright 2020 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use http::uri::{Builder, InvalidUri, PathAndQuery, Scheme, Uri}; +use parity_multiaddr::{self as multiaddr, Multiaddr, Protocol}; +use std::{ + fs, + net::{SocketAddr, SocketAddrV4, SocketAddrV6}, + str::FromStr, +}; + +const VERSION_PATH_V0: &str = "/api/v0"; + +/// Builds the base url path for the Ipfs api. +/// +fn build_base_path(builder: Builder) -> Result { + builder.path_and_query(VERSION_PATH_V0).build() +} + +pub trait TryFromUri: Sized { + /// Builds a new client from a base URI to the IPFS API. + /// + fn build_with_base_uri(uri: Uri) -> Self; + + /// Creates a new client from a str. + /// + /// Note: This constructor will overwrite the path/query part of the URI. + /// + fn from_str(uri: &str) -> Result { + let uri: Uri = uri.parse()?; + let mut parts = uri.into_parts(); + + parts.path_and_query = Some(PathAndQuery::from_static(VERSION_PATH_V0)); + + Ok(Self::build_with_base_uri(Uri::from_parts(parts).unwrap())) + } + + /// Creates a new client from a host name and port. + /// + fn from_host_and_port(scheme: Scheme, host: &str, port: u16) -> Result { + let authority = format!("{}:{}", host, port); + let builder = Builder::new().scheme(scheme).authority(&authority[..]); + + build_base_path(builder).map(Self::build_with_base_uri) + } + + /// Creates a new client from an IPV4 address and port number. + /// + fn from_ipv4(scheme: Scheme, addr: SocketAddrV4) -> Result { + let authority = format!("{}", addr); + let builder = Builder::new().scheme(scheme).authority(&authority[..]); + + build_base_path(builder).map(Self::build_with_base_uri) + } + + /// Creates a new client from an IPV6 addr and port number. + /// + fn from_ipv6(scheme: Scheme, addr: SocketAddrV6) -> Result { + let authority = format!("{}", addr); + let builder = Builder::new().scheme(scheme).authority(&authority[..]); + + build_base_path(builder).map(Self::build_with_base_uri) + } + + /// Creates a new client from an IP address and port number. + /// + fn from_socket(scheme: Scheme, socket_addr: SocketAddr) -> Result { + match socket_addr { + SocketAddr::V4(addr) => Self::from_ipv4(scheme, addr), + SocketAddr::V6(addr) => Self::from_ipv6(scheme, addr), + } + } + + /// Creates a new client from a multiaddr. + /// + fn from_multiaddr(multiaddr: Multiaddr) -> Result { + let mut scheme: Option = None; + let mut port: Option = None; + + for addr_component in multiaddr.iter() { + match addr_component { + Protocol::Tcp(tcpport) => port = Some(tcpport), + Protocol::Http => scheme = Some(Scheme::HTTP), + Protocol::Https => scheme = Some(Scheme::HTTPS), + _ => (), + } + } + + let scheme = scheme.unwrap_or(Scheme::HTTP); + + if let Some(port) = port { + for addr_component in multiaddr.iter() { + match addr_component { + Protocol::Tcp(_) | Protocol::Http | Protocol::Https => (), + Protocol::Ip4(v4addr) => { + return Ok(Self::from_ipv4(scheme, SocketAddrV4::new(v4addr, port)).unwrap()) + } + Protocol::Ip6(v6addr) => { + return Ok( + Self::from_ipv6(scheme, SocketAddrV6::new(v6addr, port, 0, 0)).unwrap(), + ) + } + Protocol::Dns(ref hostname) => { + return Ok(Self::from_host_and_port(scheme, hostname, port).unwrap()) + } + Protocol::Dns4(ref v4host) => { + return Ok(Self::from_host_and_port(scheme, v4host, port).unwrap()) + } + Protocol::Dns6(ref v6host) => { + return Ok(Self::from_host_and_port(scheme, v6host, port).unwrap()) + } + _ => { + return Err(multiaddr::Error::InvalidMultiaddr); + } + } + } + } + + Err(multiaddr::Error::InvalidMultiaddr) + } + + /// Creates a new client from a multiaddr. + /// + fn from_multiaddr_str(multiaddr: &str) -> Result { + parity_multiaddr::from_url(multiaddr) + .map_err(|e| multiaddr::Error::ParsingError(Box::new(e))) + .or_else(|_| Multiaddr::from_str(multiaddr)) + .and_then(Self::from_multiaddr) + } + + /// Creates a new client connected to the endpoint specified in ~/.ipfs/api. + /// + #[inline] + fn from_ipfs_config() -> Option { + dirs::home_dir() + .map(|home_dir| home_dir.join(".ipfs").join("api")) + .and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok()) + .and_then(|multiaddr_str| Self::from_multiaddr_str(&multiaddr_str).ok()) + } +} + +#[cfg(test)] +mod tests { + use crate::TryFromUri; + use http::uri::{Scheme, Uri}; + + #[derive(Debug)] + struct StringWrapper(String); + + impl TryFromUri for StringWrapper { + fn build_with_base_uri(uri: Uri) -> Self { + StringWrapper(uri.to_string()) + } + } + + macro_rules! test_from_value_fn_ok { + ([$method: path]: $($f: ident ($($args: expr),+) => $output: expr),+) => { + $( + #[test] + fn $f() { + let result: Result = $method($($args),+); + + assert!( + result.is_ok(), + format!("should be ok but failed with error: {:?}", result.unwrap_err()) + ); + + let StringWrapper(result) = result.unwrap(); + + assert!( + result == $output, + format!("got: ({}) expected: ({})", result, $output) + ); + } + )+ + }; + } + + test_from_value_fn_ok!( + [TryFromUri::from_str]: + test_from_str_0_ok ("http://localhost:5001") => "http://localhost:5001/api/v0", + test_from_str_1_ok ("https://ipfs.io:9001") => "https://ipfs.io:9001/api/v0" + ); + + test_from_value_fn_ok!( + [TryFromUri::from_host_and_port]: + test_from_host_and_port_0_ok (Scheme::HTTP, "localhost", 5001) => "http://localhost:5001/api/v0", + test_from_host_and_port_1_ok (Scheme::HTTP, "ipfs.io", 9001) => "http://ipfs.io:9001/api/v0" + ); + + test_from_value_fn_ok!( + [TryFromUri::from_multiaddr_str]: + test_from_multiaddr_str_0_ok ("http://localhost:5001/") => "http://localhost:5001/api/v0", + test_from_multiaddr_str_1_ok ("https://ipfs.io:9001/") => "https://ipfs.io:9001/api/v0", + test_from_multiaddr_str_2_ok ("/ip4/127.0.0.1/tcp/5001/http") => "http://127.0.0.1:5001/api/v0", + test_from_multiaddr_str_3_ok ("/ip6/0:0:0:0:0:0:0:0/tcp/5001/http") => "http://[::]:5001/api/v0" + ); +} diff --git a/ipfs-api-prelude/src/header.rs b/ipfs-api-prelude/src/header.rs new file mode 100644 index 0000000..39bb3c8 --- /dev/null +++ b/ipfs-api-prelude/src/header.rs @@ -0,0 +1,12 @@ +// Copyright 2017 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +pub use http::header::TRAILER; + +pub const X_STREAM_ERROR: &str = "x-stream-error"; +pub const X_STREAM_ERROR_KEY: &str = "X-Stream-Error"; diff --git a/ipfs-api-prelude/src/lib.rs b/ipfs-api-prelude/src/lib.rs new file mode 100644 index 0000000..8455db7 --- /dev/null +++ b/ipfs-api-prelude/src/lib.rs @@ -0,0 +1,23 @@ +// Copyright 2021 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +extern crate serde; + +mod api; +mod backend; +mod error; +mod from_uri; +mod header; +mod read; +pub mod request; +pub mod response; + +pub use { + api::IpfsApi, backend::Backend, error::Error, from_uri::TryFromUri, request::ApiRequest, + response::ApiError, +}; diff --git a/ipfs-api-prelude/src/read.rs b/ipfs-api-prelude/src/read.rs new file mode 100644 index 0000000..f0a887e --- /dev/null +++ b/ipfs-api-prelude/src/read.rs @@ -0,0 +1,227 @@ +// Copyright 2017 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use crate::header::X_STREAM_ERROR; +use bytes::{Bytes, BytesMut}; +use futures::{ + task::{Context, Poll}, + Stream, +}; +use serde::Deserialize; +use serde_json; +use std::{cmp, fmt::Display, io, marker::PhantomData, pin::Pin}; +use tokio::io::{AsyncRead, ReadBuf}; +use tokio_util::codec::Decoder; +use tracing::{event, instrument, Level}; + +/// A decoder for a response where each line is a full json object. +/// +pub struct JsonLineDecoder { + /// Set to true if the stream can contain a X-Stream-Error header, + /// which indicates an error while streaming. + /// + parse_stream_error: bool, + + ty: PhantomData, +} + +impl JsonLineDecoder { + #[inline] + pub fn new(parse_stream_error: bool) -> JsonLineDecoder { + JsonLineDecoder { + parse_stream_error, + ty: PhantomData, + } + } +} + +impl Decoder for JsonLineDecoder +where + for<'de> T: Deserialize<'de>, +{ + type Item = T; + + type Error = crate::Error; + + /// Tries to find a new line character. If it does, it will split the buffer, + /// and parse the first slice. + /// + #[instrument(skip(self, src), fields(stream_trailer = self.parse_stream_error))] + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let nl_index = src.iter().position(|b| *b == b'\n'); + + if let Some(pos) = nl_index { + event!(Level::INFO, "Found new line delimeter in buffer"); + + let slice = src.split_to(pos + 1); + let slice = &slice[..slice.len() - 1]; + + match serde_json::from_slice(slice) { + Ok(json) => Ok(json), + // If a JSON object couldn't be parsed from the response, it is possible + // that a stream error trailing header was returned. If the JSON decoder + // was configured to parse these kinds of error, it should try. If a header + // couldn't be parsed, it will return the original error. + // + Err(e) => { + if self.parse_stream_error { + match slice.iter().position(|&x| x == b':') { + Some(colon) if &slice[..colon] == X_STREAM_ERROR.as_bytes() => { + let e = crate::Error::StreamError( + String::from_utf8_lossy(&slice[colon + 2..]).into(), + ); + + Err(e) + } + _ => Err(e.into()), + } + } else { + Err(e.into()) + } + } + } + } else { + event!(Level::INFO, "Waiting for more data to decode JSON"); + + Ok(None) + } + } +} + +/// A decoder that reads a line at a time. +/// +pub struct LineDecoder; + +impl Decoder for LineDecoder { + type Item = String; + + type Error = crate::Error; + + /// Attempts to find a new line character,