summaryrefslogtreecommitdiffstats
path: root/lib/src/add.rs
blob: 18773d8a01c22d6cb3c971c5a0a34c294dc1f3a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use anyhow::Result;
use futures::stream::Stream;
use futures::stream::StreamExt;
use ipfs_unixfs::file::adder::FileAdder;
use tokio::io::AsyncReadExt;

use crate::ipfs_client::IpfsClient;

pub fn add<R>(ipfs: IpfsClient, mut input: R) -> impl Stream<Item = Result<cid::Cid>>
    where R: tokio::io::AsyncRead + std::marker::Unpin
{
    async_stream::try_stream! {
        let mut adder = FileAdder::default();
        let mut buffer = Vec::with_capacity({
            // because AsyncReadExt::read() returns Ok(0) if the buffer is zero sized, which we
            // ensure it is not with this bit
            let sizehint = adder.size_hint();
            if sizehint == 0 {
                10
            } else {
                sizehint
            }
        });

        loop {
            let buffer_bytes = input.read(&mut buffer).await?;

            if buffer_bytes == 0 {
                log::trace!("Finished adder");
                let iter = adder.finish();
                for (cid, data) in iter {
                    yield (cid, data)
                }
                break;
            } else {
                let mut pushed_bytes = 0;

                while pushed_bytes < buffer_bytes {
                    let (iter, consumed) = adder.push(&buffer[pushed_bytes..]);
                    pushed_bytes += consumed;
                    log::trace!("Consumed {} bytes so far", pushed_bytes);

                    for (cid, data) in iter {
                        yield (cid, data)
                    }
                }
                log::trace!("Added {} bytes", pushed_bytes);
            }
        }
    }
    .then(move |r| {
        let ipfs = ipfs.clone();
        async move {
            match r {
                Ok((cid, data)) => {
                    log::trace!("Putting {} with data {:?}", cid, data);
                    let block = ipfs::Block { cid, data: Box::from(data) };
                    ipfs.put_block(block).await.map_err(anyhow::Error::from)
                }
                Err(e) => Err(e)
            }
        }
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::Config;
    use crate::client::Client;

    use std::convert::TryFrom;

    async fn mk_ipfs() -> IpfsClient {
        let mut opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
        opts.mdns = false;
        let (ipfs, fut): (ipfs::Ipfs<ipfs::TestTypes>, _) = ipfs::UninitializedIpfs::new(opts).start().await.unwrap();
        tokio::task::spawn(fut);
        ipfs
    }

    #[tokio::test]
    async fn test_add_text() {
        let _ = env_logger::try_init();
        log::debug!("Starting test_add_text test");
        let ipfs  = mk_ipfs().await;

        let text = "test_add_text";
        let c = std::io::Cursor::new(text);

        let stream = add(ipfs, c);

        let mut output: Vec<_> = stream.collect::<Vec<Result<cid::Cid>>>().await;

        let first = output.pop();
        assert!(first.is_some());
        let result = first.unwrap();
        assert!(result.is_ok());
        let cid = result.unwrap();
        assert_eq!(cid, ipfs::Cid::try_from("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap());

        assert!(output.pop().is_none());
    }
}