summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-09 10:14:47 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-09 12:37:55 +0100
commit57f04b73c4e52ec022eae80433603337e3a53dd2 (patch)
tree976a9839e2c42670780c647365b0de11e5b4fda8
parent7513d002fab0e9564a037033659c837211feae13 (diff)
Add type to stream Nodes starting from a CID
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/lib.rs1
-rw-r--r--lib/src/stream.rs46
2 files changed, 47 insertions, 0 deletions
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index 759ea14..0cccef9 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -3,4 +3,5 @@ pub mod config;
pub mod consts;
pub mod ipfs_client;
pub mod profile;
+pub mod stream;
pub mod types;
diff --git a/lib/src/stream.rs b/lib/src/stream.rs
new file mode 100644
index 0000000..01548e6
--- /dev/null
+++ b/lib/src/stream.rs
@@ -0,0 +1,46 @@
+use anyhow::Result;
+
+use crate::client::Client;
+use crate::types::Node;
+
+#[derive(Debug)]
+pub struct NodeStreamBuilder {
+ state: Vec<cid::Cid>
+}
+
+impl NodeStreamBuilder {
+ pub fn starting_from(node_cid: cid::Cid) -> Self {
+ Self {
+ state: vec![node_cid]
+ }
+ }
+
+ pub fn into_stream<'a>(self, client: &'a Client) -> impl futures::stream::Stream<Item = Result<Node>> + 'a {
+ futures::stream::unfold((client, self.state), move |(client, mut state)| {
+ async move {
+ if let Some(node_cid) = state.pop() {
+ match client
+ .get_node(node_cid)
+ .await
+ .map(move |node| {
+ node.parents().iter().for_each(|parent| {
+ state.push(parent.clone())
+ });
+
+ (node, state)
+ })
+ .map(Some)
+ .transpose()
+ {
+ Some(Ok((item, state))) => Some((Ok(item), (client, state))),
+ Some(Err(e)) => Some((Err(e), (client, vec![]))),
+ None => None,
+ }
+ } else {
+ None
+ }
+ }
+ })
+ }
+
+}