// Copyright 2021 rust-ipfs-api Developers
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//
use crate::{request, response, Backend};
use async_trait::async_trait;
use bytes::Bytes;
use common_multipart_rfc7578::client::multipart;
use futures::{future, FutureExt, Stream, StreamExt, TryStreamExt};
use std::{
fs::File,
io::{Cursor, Read},
path::{Path, PathBuf},
};
const FILE_DESCRIPTOR_LIMIT: usize = 127;
// Implements a call to the IPFS that returns a streaming body response.
// Implementing this in a macro is necessary because the Rust compiler
// can't reason about the lifetime of the request instance properly. It
// thinks that the request needs to live as long as the returned stream,
// but in reality, the request instance is only used to build the Hyper
// or Actix request.
//
macro_rules! impl_stream_api_response {
(($self:ident, $req:expr, $form:expr) => $call:ident) => {
impl_stream_api_response! {
($self, $req, $form) |req| => { $self.$call(req) }
}
};
(($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => {
match $self.build_base_request(&$req, $form) {
Ok($var) => $impl,
Err(e) => Box::new(future::err(e).into_stream()),
}
};
}
#[async_trait(?Send)]
pub trait IpfsApi: Backend {
/// Add file to Ipfs.
///
/// # Examples
///
/// ```no_run
/// use ipfs_api::{IpfsApi, IpfsClient};
/// use std::io::Cursor;
///
/// let client = IpfsClient::default();
/// let data = Cursor::new("Hello World!");
/// let res = client.add(data);
/// ```
///
async fn add<R>(&self, data: R) -> Result<response::AddResponse, Self::Error>
where
R: 'static + Read + Send + Sync,
{
self.add_with_options(data, request::Add::default()).await
}
/// Add a file to IPFS with options.
///
/// # Examples
///
/// ```no_run
/// # extern crate ipfs_api;
/// #
/// use ipfs_api::{IpfsApi, IpfsClient};
/// use std::io::Cursor;
///
/// # fn main() {
/// let client = IpfsClient::default();
/// let data = Cursor::new("Hello World!");
/// #[cfg(feature = "with-builder")]
/// let add = ipfs_api::request::Add::builder()
/// .chunker("rabin-512-1024-2048")
/// .build();
/// #[cfg(not(feature = "with-builder"))]
/// let add = ipfs_api::request::Add {
/// chunker: Some("rabin-512-1024-2048"),
/// ..Default::default()
/// };
/// let req = client.add_with_options(data, add);
/// # }
/// ```
///
async fn add_with_options<R>(
&self,
data: R,
add: request::Add<'_>,
) -> Result<response::AddResponse, Self::Error>
where
R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
self.request(add, Some(form)).await
}
/// Add a path to Ipfs. Can be a file or directory.
/// A hard limit of 128 open file descriptors is set such
/// that any small additional files are stored in-memory.
///
/// # Examples
///
/// ```no_run
/// use ipfs_api::{IpfsApi, IpfsClient};
///
/// let client = IpfsClient::default();
/// let path = "./src";
/// let res = client.add_path(path);
/// ```
///
async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Self::Error>
where
P: AsRef<Path>,
{
let prefix = path.as_ref().parent();
let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
for path in walkdir::WalkDir::new(path.as_ref()) {
match path {
Ok(entry) if entry.file_type().is_file() => {
let file_size = entry
.metadata()
.map(|metadata| metadata.len())
.map_err(|e| crate::Error::Io(e.into()))?;
paths_to_add.push((entry.path().to_path_buf(), file_size));
}
Ok(_) => (),
Err(e) => return Err(crate::Error::Io(e.into()).into()),
}
}
paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
let mut it = 0;
let mut form = multipart::Form::default();
for (path, file_size) in paths_to_add {
let mut file = File::open(&path).map_err(|e| crate::Error::Io(e))?;
let file_name = match prefix {
Some(prefix) => path.strip_prefix(prefix).unwrap(),
None => path.