From 789e14fc384be4a8cac9d7dd8416cb9c77c7aa73 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Tue, 24 Dec 2019 17:04:55 -0500 Subject: implement last 2 methods on client --- ipfs-api/src/client.rs | 79 ++++++++++++++++---------------------------------- 1 file changed, 25 insertions(+), 54 deletions(-) diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index f686b18..fda5e95 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -31,12 +31,14 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::{ fs, - io::Read, + io::{Cursor, Read}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, }; use tokio_util::codec::{Decoder, FramedRead}; +const FILE_DESCRIPTOR_LIMIT: usize = 127; + /// Asynchronous Ipfs client. /// #[derive(Clone)] @@ -466,7 +468,6 @@ impl IpfsClient { self.request(request::Add, Some(form)).await } - /* /// Add a path to Ipfs. Can be a file or directory. /// A hard limit of 128 open file descriptors is set such /// that any small additional files are stored in-memory. @@ -482,41 +483,37 @@ impl IpfsClient { /// ``` /// #[inline] - pub fn add_path

(&self, path: P) -> AsyncResponse + pub async fn add_path

(&self, path: P) -> Result, Error> where P: AsRef, { - let mut form = multipart::Form::default(); - let prefix = path.as_ref().parent(); - let mut paths_to_add: Vec<(PathBuf, u64)> = vec![]; for path in walkdir::WalkDir::new(path.as_ref()) { match path { - Ok(entry) => { + Ok(entry) if entry.file_type().is_file() => { if entry.file_type().is_file() { - let file_size = - entry.metadata().map(|metadata| metadata.len()).unwrap_or(0); + let file_size = entry + .metadata() + .map(|metadata| metadata.len()) + .map_err(|e| Error::Io(e.into()))?; + paths_to_add.push((entry.path().to_path_buf(), file_size)); } } - Err(err) => { - return Box::new(future::err(Error::Io(err.into()))); - } + Ok(_) => (), + Err(err) => return Err(Error::Io(err.into())), } } paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse()); let mut it = 0; - const FILE_DESCRIPTOR_LIMIT: usize = 127; + let mut form = multipart::Form::default(); for (path, file_size) in paths_to_add { - let file = std::fs::File::open(&path); - if let Err(err) = file { - return Box::new(future::err(err.into())); - } + let mut file = fs::File::open(&path)?; let file_name = match prefix { Some(prefix) => path.strip_prefix(prefix).unwrap(), None => path.as_path(), @@ -524,24 +521,21 @@ impl IpfsClient { .to_string_lossy(); if it < FILE_DESCRIPTOR_LIMIT { - form.add_reader_file("path", file.unwrap(), file_name); + form.add_reader_file("path", file, file_name); + it += 1; } else { let mut buf = Vec::with_capacity(file_size as usize); - if let Err(err) = file.unwrap().read_to_end(&mut buf) { - return Box::new(future::err(err.into())); - } - form.add_reader_file("path", std::io::Cursor::new(buf), file_name); + let _ = file.read_to_end(&mut buf)?; + + form.add_reader_file("path", Cursor::new(buf), file_name); } } - Box::new( - self.request_stream_json(&request::Add, Some(form)) - .collect() - .map(|mut responses: Vec| responses.pop().unwrap()), - ) + self.request_stream_json(request::Add, Some(form)) + .try_collect() + .await } - */ /// Returns the current ledger for a peer. /// @@ -1420,43 +1414,20 @@ impl IpfsClient { self.request(request::LogLs, None).await } - /* /// Read the event log. /// /// ```no_run - /// # extern crate ipfs_api; - /// # /// use ipfs_api::IpfsClient; /// - /// # fn main() { /// let client = IpfsClient::default(); /// let res = client.log_tail(); - /// # } /// ``` /// pub fn log_tail(&self) -> AsyncStreamResponse { - #[cfg(feature = "hyper")] - let res = self - .build_base_request(&request::LogTail, None) - .map(|req| self.client.request(req).from_err()) - .into_future() - .flatten() - .map(|res| IpfsClient::process_stream_response(res, LineDecoder)) - .flatten_stream(); - #[cfg(feature = "actix")] - let res = self - .build_base_request(&request::LogTail, None) - .into_future() - .and_then(|req| { - req.timeout(std::time::Duration::from_secs(90)) - .send() - .from_err() - }) - .map(|res| IpfsClient::process_stream_response(res, LineDecoder)) - .flatten_stream(); - Box::new(res) + self.request_stream(request::LogTail, None, |res| { + Box::new(IpfsClient::process_stream_response(res, LineDecoder)) + }) } - */ /// List the contents of an Ipfs multihash. /// -- cgit v1.2.3