1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
use anyhow::Result;
use futures::stream::Stream;
use futures::stream::StreamExt;
use ipfs_unixfs::file::adder::FileAdder;
use tokio::io::AsyncReadExt;
use crate::ipfs_client::IpfsClient;
pub fn add<R>(ipfs: IpfsClient, mut input: R) -> impl Stream<Item = Result<cid::Cid>>
where R: tokio::io::AsyncRead + std::marker::Unpin
{
async_stream::try_stream! {
let mut adder = FileAdder::default();
let mut buffer = Vec::with_capacity({
// because AsyncReadExt::read() returns Ok(0) if the buffer is zero sized, which we
// ensure it is not with this bit
let sizehint = adder.size_hint();
if sizehint == 0 {
10
} else {
sizehint
}
});
loop {
let buffer_bytes = input.read(&mut buffer).await?;
if buffer_bytes == 0 {
log::trace!("Finished adder");
let iter = adder.finish();
for (cid, data) in iter {
yield (cid, data)
}
break;
} else {
let mut pushed_bytes = 0;
while pushed_bytes < buffer_bytes {
let (iter, consumed) = adder.push(&buffer[pushed_bytes..]);
pushed_bytes += consumed;
log::trace!("Consumed {} bytes so far", pushed_bytes);
for (cid, data) in iter {
yield (cid, data)
}
}
log::trace!("Added {} bytes", pushed_bytes);
}
}
}
.then(move |r| {
let ipfs = ipfs.clone();
async move {
match r {
Ok((cid, data)) => {
log::trace!("Putting {} with data {:?}", cid, data);
let block = ipfs::Block { cid, data: Box::from(data) };
ipfs.put_block(block).await.map_err(anyhow::Error::from)
}
Err(e) => Err(e)
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use crate::client::Client;
use std::convert::TryFrom;
async fn mk_ipfs() -> IpfsClient {
let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
opts.mdns = false;
let (ipfs, fut): (ipfs::Ipfs<ipfs::TestTypes>, _) = ipfs::UninitializedIpfs::new(opts).start().await.unwrap();
tokio::task::spawn(fut);
ipfs
}
#[tokio::test]
async fn test_add_text() {
let _ = env_logger::try_init();
log::debug!("Starting test_add_text test");
let ipfs = mk_ipfs().await;
let text = "test_add_text";
let c = std::io::Cursor::new(text);
let stream = add(ipfs, c);
let mut output: Vec<_> = stream.collect::<Vec<Result<cid::Cid>>>().await;
let first = output.pop();
assert!(first.is_some());
let result = first.unwrap();
assert!(result.is_ok());
let cid = result.unwrap();
assert_eq!(cid, ipfs::Cid::try_from("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap());
assert!(output.pop().is_none());
}
}
|