summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferris@navapbc.com>2019-12-24 17:04:55 -0500
committerFerris Tseng <ferris@navapbc.com>2019-12-24 17:04:55 -0500
commit789e14fc384be4a8cac9d7dd8416cb9c77c7aa73 (patch)
tree8e35e8cc95e6eef6611f5d095d1b058895d91e3a
parentaf714812893d05d530e0e7168e9b1255f38632e7 (diff)
implement last 2 methods on client
-rw-r--r--ipfs-api/src/client.rs79
1 files changed, 25 insertions, 54 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index f686b18..fda5e95 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -31,12 +31,14 @@ use serde::{Deserialize, Serialize};
use serde_json;
use std::{
fs,
- io::Read,
+ io::{Cursor, Read},
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
};
use tokio_util::codec::{Decoder, FramedRead};
+const FILE_DESCRIPTOR_LIMIT: usize = 127;
+
/// Asynchronous Ipfs client.
///
#[derive(Clone)]
@@ -466,7 +468,6 @@ impl IpfsClient {
self.request(request::Add, Some(form)).await
}
- /*
/// Add a path to Ipfs. Can be a file or directory.
/// A hard limit of 128 open file descriptors is set such
/// that any small additional files are stored in-memory.
@@ -482,41 +483,37 @@ impl IpfsClient {
/// ```
///
#[inline]
- pub fn add_path<P>(&self, path: P) -> AsyncResponse<response::AddResponse>
+ pub async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Error>
where
P: AsRef<Path>,
{
- let mut form = multipart::Form::default();
-
let prefix = path.as_ref().parent();
-
let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
for path in walkdir::WalkDir::new(path.as_ref()) {
match path {
- Ok(entry) => {
+ Ok(entry) if entry.file_type().is_file() => {
if entry.file_type().is_file() {
- let file_size =
- entry.metadata().map(|metadata| metadata.len()).unwrap_or(0);
+ let file_size = entry
+ .metadata()
+ .map(|metadata| metadata.len())
+ .map_err(|e| Error::Io(e.into()))?;
+
paths_to_add.push((entry.path().to_path_buf(), file_size));
}
}
- Err(err) => {
- return Box::new(future::err(Error::Io(err.into())));
- }
+ Ok(_) => (),
+ Err(err) => return Err(Error::Io(err.into())),
}
}
paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
let mut it = 0;
- const FILE_DESCRIPTOR_LIMIT: usize = 127;
+ let mut form = multipart::Form::default();
for (path, file_size) in paths_to_add {
- let file = std::fs::File::open(&path);
- if let Err(err) = file {
- return Box::new(future::err(err.into()));
- }
+ let mut file = fs::File::open(&path)?;
let file_name = match prefix {
Some(prefix) => path.strip_prefix(prefix).unwrap(),
None => path.as_path(),
@@ -524,24 +521,21 @@ impl IpfsClient {
.to_string_lossy();
if it < FILE_DESCRIPTOR_LIMIT {
- form.add_reader_file("path", file.unwrap(), file_name);
+ form.add_reader_file("path", file, file_name);
+
it += 1;
} else {
let mut buf = Vec::with_capacity(file_size as usize);
- if let Err(err) = file.unwrap().read_to_end(&mut buf) {
- return Box::new(future::err(err.into()));
- }
- form.add_reader_file("path", std::io::Cursor::new(buf), file_name);
+ let _ = file.read_to_end(&mut buf)?;
+
+ form.add_reader_file("path", Cursor::new(buf), file_name);
}
}
- Box::new(
- self.request_stream_json(&request::Add, Some(form))
- .collect()
- .map(|mut responses: Vec<response::AddResponse>| responses.pop().unwrap()),
- )
+ self.request_stream_json(request::Add, Some(form))
+ .try_collect()
+ .await
}
- */
/// Returns the current ledger for a peer.
///
@@ -1420,43 +1414,20 @@ impl IpfsClient {
self.request(request::LogLs, None).await
}
- /*
/// Read the event log.
///
/// ```no_run
- /// # extern crate ipfs_api;
- /// #
/// use ipfs_api::IpfsClient;
///
- /// # fn main() {
/// let client = IpfsClient::default();
/// let res = client.log_tail();
- /// # }
/// ```
///
pub fn log_tail(&self) -> AsyncStreamResponse<String> {
- #[cfg(feature = "hyper")]
- let res = self
- .build_base_request(&request::LogTail, None)
- .map(|req| self.client.request(req).from_err())
- .into_future()
- .flatten()
- .map(|res| IpfsClient::process_stream_response(res, LineDecoder))
- .flatten_stream();
- #[cfg(feature = "actix")]
- let res = self
- .build_base_request(&request::LogTail, None)
- .into_future()
- .and_then(|req| {
- req.timeout(std::time::Duration::from_secs(90))
- .send()
- .from_err()
- })
- .map(|res| IpfsClient::process_stream_response(res, LineDecoder))
- .flatten_stream();
- Box::new(res)
+ self.request_stream(request::LogTail, None, |res| {
+ Box::new(IpfsClient::process_stream_response(res, LineDecoder))
+ })
}
- */
/// List the contents of an Ipfs multihash.
///