diff options
Diffstat (limited to 'lib/src/add.rs')
-rw-r--r-- | lib/src/add.rs | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/lib/src/add.rs b/lib/src/add.rs new file mode 100644 index 0000000..18773d8 --- /dev/null +++ b/lib/src/add.rs @@ -0,0 +1,104 @@ +use anyhow::Result; +use futures::stream::Stream; +use futures::stream::StreamExt; +use ipfs_unixfs::file::adder::FileAdder; +use tokio::io::AsyncReadExt; + +use crate::ipfs_client::IpfsClient; + +pub fn add<R>(ipfs: IpfsClient, mut input: R) -> impl Stream<Item = Result<cid::Cid>> + where R: tokio::io::AsyncRead + std::marker::Unpin +{ + async_stream::try_stream! { + let mut adder = FileAdder::default(); + let mut buffer = Vec::with_capacity({ + // because AsyncReadExt::read() returns Ok(0) if the buffer is zero sized, which we + // ensure it is not with this bit + let sizehint = adder.size_hint(); + if sizehint == 0 { + 10 + } else { + sizehint + } + }); + + loop { + let buffer_bytes = input.read(&mut buffer).await?; + + if buffer_bytes == 0 { + log::trace!("Finished adder"); + let iter = adder.finish(); + for (cid, data) in iter { + yield (cid, data) + } + break; + } else { + let mut pushed_bytes = 0; + + while pushed_bytes < buffer_bytes { + let (iter, consumed) = adder.push(&buffer[pushed_bytes..]); + pushed_bytes += consumed; + log::trace!("Consumed {} bytes so far", pushed_bytes); + + for (cid, data) in iter { + yield (cid, data) + } + } + log::trace!("Added {} bytes", pushed_bytes); + } + } + } + .then(move |r| { + let ipfs = ipfs.clone(); + async move { + match r { + Ok((cid, data)) => { + log::trace!("Putting {} with data {:?}", cid, data); + let block = ipfs::Block { cid, data: Box::from(data) }; + ipfs.put_block(block).await.map_err(anyhow::Error::from) + } + Err(e) => Err(e) + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Config; + use crate::client::Client; + + use std::convert::TryFrom; + + async fn mk_ipfs() -> IpfsClient { + let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); + opts.mdns = false; + let (ipfs, fut): (ipfs::Ipfs<ipfs::TestTypes>, _) = ipfs::UninitializedIpfs::new(opts).start().await.unwrap(); + tokio::task::spawn(fut); + ipfs + } + + #[tokio::test] + async fn test_add_text() { + let _ = env_logger::try_init(); + log::debug!("Starting test_add_text test"); + let ipfs = mk_ipfs().await; + + let text = "test_add_text"; + let c = std::io::Cursor::new(text); + + let stream = add(ipfs, c); + + let mut output: Vec<_> = stream.collect::<Vec<Result<cid::Cid>>>().await; + + let first = output.pop(); + assert!(first.is_some()); + let result = first.unwrap(); + assert!(result.is_ok()); + let cid = result.unwrap(); + assert_eq!(cid, ipfs::Cid::try_from("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap()); + + assert!(output.pop().is_none()); + } +} |