blob: 2dd278113ba65c1a109cf77c5824419931c3bb7d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
use anyhow::Result;
use crate::client::Client;
use crate::types::Node;
#[derive(Debug)]
pub struct NodeStreamBuilder {
state: Vec<cid::Cid>
}
impl NodeStreamBuilder {
pub fn starting_from(node_cid: cid::Cid) -> Self {
Self {
state: vec![node_cid]
}
}
pub fn into_stream(self, client: Client) -> impl futures::stream::Stream<Item = Result<Node>> {
futures::stream::unfold((client, self.state), move |(client, mut state)| {
async move {
if let Some(node_cid) = state.pop() {
match client
.get_node(node_cid)
.await
.map(move |node| {
node.parents().iter().for_each(|parent| {
state.push(parent.clone())
});
(node, state)
})
.map(Some)
.transpose()
{
Some(Ok((item, state))) => Some((Ok(item), (client, state))),
Some(Err(e)) => Some((Err(e), (client, vec![]))),
None => None,
}
} else {
None
}
}
})
}
}
|