summaryrefslogtreecommitdiffstats
path: root/lib/src/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/stream.rs')
-rw-r--r--lib/src/stream.rs46
1 files changed, 46 insertions, 0 deletions
diff --git a/lib/src/stream.rs b/lib/src/stream.rs
new file mode 100644
index 0000000..2dd2781
--- /dev/null
+++ b/lib/src/stream.rs
@@ -0,0 +1,46 @@
+use anyhow::Result;
+
+use crate::client::Client;
+use crate::types::Node;
+
+#[derive(Debug)]
+pub struct NodeStreamBuilder {
+ state: Vec<cid::Cid>
+}
+
+impl NodeStreamBuilder {
+ pub fn starting_from(node_cid: cid::Cid) -> Self {
+ Self {
+ state: vec![node_cid]
+ }
+ }
+
+ pub fn into_stream(self, client: Client) -> impl futures::stream::Stream<Item = Result<Node>> {
+ futures::stream::unfold((client, self.state), move |(client, mut state)| {
+ async move {
+ if let Some(node_cid) = state.pop() {
+ match client
+ .get_node(node_cid)
+ .await
+ .map(move |node| {
+ node.parents().iter().for_each(|parent| {
+ state.push(parent.clone())
+ });
+
+ (node, state)
+ })
+ .map(Some)
+ .transpose()
+ {
+ Some(Ok((item, state))) => Some((Ok(item), (client, state))),
+ Some(Err(e)) => Some((Err(e), (client, vec![]))),
+ None => None,
+ }
+ } else {
+ None
+ }
+ }
+ })
+ }
+
+}