diff options
Diffstat (limited to 'lib/src/stream.rs')
-rw-r--r-- | lib/src/stream.rs | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/lib/src/stream.rs b/lib/src/stream.rs new file mode 100644 index 0000000..2dd2781 --- /dev/null +++ b/lib/src/stream.rs @@ -0,0 +1,46 @@ +use anyhow::Result; + +use crate::client::Client; +use crate::types::Node; + +#[derive(Debug)] +pub struct NodeStreamBuilder { + state: Vec<cid::Cid> +} + +impl NodeStreamBuilder { + pub fn starting_from(node_cid: cid::Cid) -> Self { + Self { + state: vec![node_cid] + } + } + + pub fn into_stream(self, client: Client) -> impl futures::stream::Stream<Item = Result<Node>> { + futures::stream::unfold((client, self.state), move |(client, mut state)| { + async move { + if let Some(node_cid) = state.pop() { + match client + .get_node(node_cid) + .await + .map(move |node| { + node.parents().iter().for_each(|parent| { + state.push(parent.clone()) + }); + + (node, state) + }) + .map(Some) + .transpose() + { + Some(Ok((item, state))) => Some((Ok(item), (client, state))), + Some(Err(e)) => Some((Err(e), (client, vec![]))), + None => None, + } + } else { + None + } + } + }) + } + +} |