diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-09 18:52:50 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 17:59:08 +0100 |
commit | 19233255c0b4266895c36a7d5d52eca5f9d55b83 (patch) | |
tree | f8eb1a185a6cad2ab8228d2b8f612c73306b94bd | |
parent | 000b41ecfab76a1133245659e284e53dd815bf79 (diff) |
Use ipfs_unixfs::FileAdder to build file adding mechanism
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | lib/Cargo.toml | 5 | ||||
-rw-r--r-- | lib/src/add.rs | 104 | ||||
-rw-r--r-- | lib/src/lib.rs | 1 |
3 files changed, 110 insertions, 0 deletions
diff --git a/lib/Cargo.toml b/lib/Cargo.toml index a8a90fa..0b36cbd 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -37,6 +37,7 @@ getset = "0.1" xdg = "2.4" tracing = "0.1" ctrlc = "3.2" +async-stream = "0.3" [dependencies.libp2p] version = "0.39.1" @@ -59,5 +60,9 @@ features = [ git = "https://github.com/rs-ipfs/rust-ipfs/" rev = "ad3ab49b4d9236363969b0f74f14aabc7c906b3b" +[dependencies.ipfs-unixfs] +git = "https://github.com/rs-ipfs/rust-ipfs/" +rev = "ad3ab49b4d9236363969b0f74f14aabc7c906b3b" + [dev-dependencies] multibase = "0.8" diff --git a/lib/src/add.rs b/lib/src/add.rs new file mode 100644 index 0000000..18773d8 --- /dev/null +++ b/lib/src/add.rs @@ -0,0 +1,104 @@ +use anyhow::Result; +use futures::stream::Stream; +use futures::stream::StreamExt; +use ipfs_unixfs::file::adder::FileAdder; +use tokio::io::AsyncReadExt; + +use crate::ipfs_client::IpfsClient; + +pub fn add<R>(ipfs: IpfsClient, mut input: R) -> impl Stream<Item = Result<cid::Cid>> + where R: tokio::io::AsyncRead + std::marker::Unpin +{ + async_stream::try_stream! { + let mut adder = FileAdder::default(); + let mut buffer = Vec::with_capacity({ + // because AsyncReadExt::read() returns Ok(0) if the buffer is zero sized, which we + // ensure it is not with this bit + let sizehint = adder.size_hint(); + if sizehint == 0 { + 10 + } else { + sizehint + } + }); + + loop { + let buffer_bytes = input.read(&mut buffer).await?; + + if buffer_bytes == 0 { + log::trace!("Finished adder"); + let iter = adder.finish(); + for (cid, data) in iter { + yield (cid, data) + } + break; + } else { + let mut pushed_bytes = 0; + + while pushed_bytes < buffer_bytes { + let (iter, consumed) = adder.push(&buffer[pushed_bytes..]); + pushed_bytes += consumed; + log::trace!("Consumed {} bytes so far", pushed_bytes); + + for (cid, data) in iter { + yield (cid, data) + } + } + log::trace!("Added {} bytes", pushed_bytes); + } + } + } + .then(move |r| { + let ipfs = ipfs.clone(); + async move { + match r { + Ok((cid, data)) => { + log::trace!("Putting {} with data {:?}", cid, data); + let block = ipfs::Block { cid, data: Box::from(data) }; + ipfs.put_block(block).await.map_err(anyhow::Error::from) + } + Err(e) => Err(e) + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use crate::client::Client; + + use std::convert::TryFrom; + + async fn mk_ipfs() -> IpfsClient { + let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); + opts.mdns = false; + let (ipfs, fut): (ipfs::Ipfs<ipfs::TestTypes>, _) = ipfs::UninitializedIpfs::new(opts).start().await.unwrap(); + tokio::task::spawn(fut); + ipfs + } + + #[tokio::test] + async fn test_add_text() { + let _ = env_logger::try_init(); + log::debug!("Starting test_add_text test"); + let ipfs = mk_ipfs().await; + + let text = "test_add_text"; + let c = std::io::Cursor::new(text); + + let stream = add(ipfs, c); + + let mut output: Vec<_> = stream.collect::<Vec<Result<cid::Cid>>>().await; + + let first = output.pop(); + assert!(first.is_some()); + let result = first.unwrap(); + assert!(result.is_ok()); + let cid = result.unwrap(); + assert_eq!(cid, ipfs::Cid::try_from("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap()); + + assert!(output.pop().is_none()); + } +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b7b05e2..a1db6fd 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,3 +1,4 @@ +mod add; pub mod client; pub mod consts; pub mod ipfs_client; |