summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-09 18:52:50 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 17:59:08 +0100
commit19233255c0b4266895c36a7d5d52eca5f9d55b83 (patch)
treef8eb1a185a6cad2ab8228d2b8f612c73306b94bd
parent000b41ecfab76a1133245659e284e53dd815bf79 (diff)
Use ipfs_unixfs::FileAdder to build file adding mechanism
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/Cargo.toml5
-rw-r--r--lib/src/add.rs104
-rw-r--r--lib/src/lib.rs1
3 files changed, 110 insertions, 0 deletions
diff --git a/lib/Cargo.toml b/lib/Cargo.toml
index a8a90fa..0b36cbd 100644
--- a/lib/Cargo.toml
+++ b/lib/Cargo.toml
@@ -37,6 +37,7 @@ getset = "0.1"
xdg = "2.4"
tracing = "0.1"
ctrlc = "3.2"
+async-stream = "0.3"
[dependencies.libp2p]
version = "0.39.1"
@@ -59,5 +60,9 @@ features = [
git = "https://github.com/rs-ipfs/rust-ipfs/"
rev = "ad3ab49b4d9236363969b0f74f14aabc7c906b3b"
+[dependencies.ipfs-unixfs]
+git = "https://github.com/rs-ipfs/rust-ipfs/"
+rev = "ad3ab49b4d9236363969b0f74f14aabc7c906b3b"
+
[dev-dependencies]
multibase = "0.8"
diff --git a/lib/src/add.rs b/lib/src/add.rs
new file mode 100644
index 0000000..18773d8
--- /dev/null
+++ b/lib/src/add.rs
@@ -0,0 +1,104 @@
+use anyhow::Result;
+use futures::stream::Stream;
+use futures::stream::StreamExt;
+use ipfs_unixfs::file::adder::FileAdder;
+use tokio::io::AsyncReadExt;
+
+use crate::ipfs_client::IpfsClient;
+
+pub fn add<R>(ipfs: IpfsClient, mut input: R) -> impl Stream<Item = Result<cid::Cid>>
+ where R: tokio::io::AsyncRead + std::marker::Unpin
+{
+ async_stream::try_stream! {
+ let mut adder = FileAdder::default();
+ let mut buffer = Vec::with_capacity({
+ // because AsyncReadExt::read() returns Ok(0) if the buffer is zero sized, which we
+ // ensure it is not with this bit
+ let sizehint = adder.size_hint();
+ if sizehint == 0 {
+ 10
+ } else {
+ sizehint
+ }
+ });
+
+ loop {
+ let buffer_bytes = input.read(&mut buffer).await?;
+
+ if buffer_bytes == 0 {
+ log::trace!("Finished adder");
+ let iter = adder.finish();
+ for (cid, data) in iter {
+ yield (cid, data)
+ }
+ break;
+ } else {
+ let mut pushed_bytes = 0;
+
+ while pushed_bytes < buffer_bytes {
+ let (iter, consumed) = adder.push(&buffer[pushed_bytes..]);
+ pushed_bytes += consumed;
+ log::trace!("Consumed {} bytes so far", pushed_bytes);
+
+ for (cid, data) in iter {
+ yield (cid, data)
+ }
+ }
+ log::trace!("Added {} bytes", pushed_bytes);
+ }
+ }
+ }
+ .then(move |r| {
+ let ipfs = ipfs.clone();
+ async move {
+ match r {
+ Ok((cid, data)) => {
+ log::trace!("Putting {} with data {:?}", cid, data);
+ let block = ipfs::Block { cid, data: Box::from(data) };
+ ipfs.put_block(block).await.map_err(anyhow::Error::from)
+ }
+ Err(e) => Err(e)
+ }
+ }
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::Config;
+ use crate::client::Client;
+
+ use std::convert::TryFrom;
+
+ async fn mk_ipfs() -> IpfsClient {
+ let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
+ opts.mdns = false;
+ let (ipfs, fut): (ipfs::Ipfs<ipfs::TestTypes>, _) = ipfs::UninitializedIpfs::new(opts).start().await.unwrap();
+ tokio::task::spawn(fut);
+ ipfs
+ }
+
+ #[tokio::test]
+ async fn test_add_text() {
+ let _ = env_logger::try_init();
+ log::debug!("Starting test_add_text test");
+ let ipfs = mk_ipfs().await;
+
+ let text = "test_add_text";
+ let c = std::io::Cursor::new(text);
+
+ let stream = add(ipfs, c);
+
+ let mut output: Vec<_> = stream.collect::<Vec<Result<cid::Cid>>>().await;
+
+ let first = output.pop();
+ assert!(first.is_some());
+ let result = first.unwrap();
+ assert!(result.is_ok());
+ let cid = result.unwrap();
+ assert_eq!(cid, ipfs::Cid::try_from("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap());
+
+ assert!(output.pop().is_none());
+ }
+}
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index b7b05e2..a1db6fd 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -1,3 +1,4 @@
+mod add;
pub mod client;
pub mod consts;
pub mod ipfs_client;