summaryrefslogtreecommitdiffstats
path: root/src/async_dag.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/async_dag.rs')
-rw-r--r--src/async_dag.rs79
1 files changed, 34 insertions, 45 deletions
diff --git a/src/async_dag.rs b/src/async_dag.rs
index 66d79fc..651b91d 100644
--- a/src/async_dag.rs
+++ b/src/async_dag.rs
@@ -32,25 +32,26 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
Backend: DagBackend<Id, N>
{
/// Create a new DAG with a backend and a HEAD node
- pub async fn new(backend: Backend, head: N) -> Result<Self> {
+ ///
+ /// The head node is `DagBackend::put` into the backend before the AsyncDag object is created.
+ pub async fn new(mut backend: Backend, head: N) -> Result<Self> {
backend
- .get(head.id().clone())
- .await?
- .map(|node| {
+ .put(head)
+ .await
+ .map(|id| {
AsyncDag {
- head: node.id().clone(),
+ head: id,
backend: backend,
_node: std::marker::PhantomData,
}
})
- .ok_or_else(|| anyhow!("Head not found in backend"))
}
/// Check whether an `id` is in the DAG.
pub async fn has_id(&self, id: &Id) -> Result<bool> {
self.stream()
.map(|r| -> Result<bool> {
- r.map(|node| node.id() == id)
+ r.map(|(ref node_id, _)| node_id == id)
})
.collect::<Vec<Result<bool>>>()
.await
@@ -68,6 +69,7 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
self.backend
.get(id)
.await?
+ .map(|tpl| tpl.1)
.ok_or_else(|| anyhow!("ID Not found"))?
.parent_ids()
.into_iter()
@@ -82,6 +84,7 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
.await
.into_iter()
.filter_map(|o| o)
+ .map(|r| r.map(|tpl| tpl.1))
.collect()
}
@@ -170,7 +173,7 @@ pub struct Stream<'a, Id, N, Backend>
Backend: DagBackend<Id, N>
{
dag: &'a AsyncDag<Id, N, Backend>,
- backlog: Vec<Pin<Box<(dyn futures::future::Future<Output = Result<Option<N>>> + std::marker::Send + 'a)>>>,
+ backlog: Vec<Pin<Box<(dyn futures::future::Future<Output = Result<Option<(Id, N)>>> + std::marker::Send + 'a)>>>,
}
impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend>
@@ -178,18 +181,18 @@ impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend>
N: Node<Id = Id>,
Backend: DagBackend<Id, N>
{
- type Item = Result<N>;
+ type Item = Result<(Id, N)>;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>) -> futures::task::Poll<Option<Self::Item>> {
if let Some(mut fut) = self.as_mut().backlog.pop() {
match fut.as_mut().poll(cx) {
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
- Poll::Ready(Ok(Some(node))) => {
+ Poll::Ready(Ok(Some((node_id, node)))) => {
for parent in node.parent_ids().into_iter() {
- let fut = self.dag.backend.get(parent);
+ let fut = self.dag.backend.get(parent.clone());
self.as_mut().backlog.push(fut);
}
- Poll::Ready(Some(Ok(node)))
+ Poll::Ready(Some(Ok((node_id, node))))
},
Poll::Ready(Ok(None)) => {
// backend.get() returned Ok(None), so the referenced node seems not to exist
@@ -227,17 +230,15 @@ mod tests {
#[test]
fn test_dag_two_nodes() {
let head = test::Node {
- id: test::Id(1),
parents: vec![test::Id(0)],
- data: 43,
+ data: 1,
};
let b = test::Backend::new(vec![
{
Some(test::Node {
- id: test::Id(0),
parents: vec![],
- data: 42,
+ data: 0,
})
},
{
@@ -247,9 +248,8 @@ mod tests {
{
let node = tokio_test::block_on(b.get(test::Id(1))).unwrap().unwrap();
- assert_eq!(node.data, 43);
- assert_eq!(node.id, test::Id(1));
- assert!(!node.parents.is_empty()); // to check whether the parent is set
+ assert_eq!(node.1.data, 1);
+ assert!(!node.1.parents.is_empty()); // to check whether the parent is set
}
let dag = tokio_test::block_on(AsyncDag::new(b, head));
@@ -277,8 +277,7 @@ mod tests {
let node = next.pop();
assert!(node.is_some());
let node = node.unwrap();
- assert_eq!(node.id, test::Id(0));
- assert_eq!(node.data, 42);
+ assert_eq!(node.data, 0);
assert!(node.parents.is_empty());
}
}
@@ -286,17 +285,15 @@ mod tests {
#[test]
fn test_dag_two_nodes_stream() {
let head = test::Node {
- id: test::Id(1),
parents: vec![test::Id(0)],
- data: 43,
+ data: 1,
};
let b = test::Backend::new(vec![
{
Some(test::Node {
- id: test::Id(0),
parents: vec![],
- data: 42,
+ data: 0,
})
},
{
@@ -311,16 +308,15 @@ mod tests {
let v = tokio_test::block_on(dag.stream().collect::<Vec<_>>());
assert_eq!(v.len(), 2, "Expected two nodes: {:?}", v);
- assert_eq!(v[0].as_ref().unwrap().id, test::Id(1));
- assert_eq!(v[1].as_ref().unwrap().id, test::Id(0));
+ assert_eq!(v[0].as_ref().unwrap().0, test::Id(1));
+ assert_eq!(v[1].as_ref().unwrap().0, test::Id(0));
}
#[test]
fn test_adding_head() {
let head = test::Node {
- id: test::Id(0),
parents: vec![],
- data: 42,
+ data: 0,
};
let b = test::Backend::new(vec![Some(head.clone())]);
@@ -329,9 +325,8 @@ mod tests {
let mut dag = dag.unwrap();
let new_head = test::Node {
- id: test::Id(1),
parents: vec![test::Id(0)],
- data: 43,
+ data: 1,
};
assert_eq!(dag.backend.0.read().unwrap().len(), 1);
@@ -344,10 +339,10 @@ mod tests {
assert_eq!(dag.backend.0.read().unwrap().len(), 2);
assert_eq!(dag.head, test::Id(1));
- assert_eq!(dag.backend.0.read().unwrap()[0].as_ref().unwrap().id, test::Id(0));
+ assert_eq!(dag.backend.0.read().unwrap()[0].as_ref().unwrap().data, 0);
assert!(dag.backend.0.read().unwrap()[0].as_ref().unwrap().parents.is_empty());
- assert_eq!(dag.backend.0.read().unwrap()[1].as_ref().unwrap().id, test::Id(1));
+ assert_eq!(dag.backend.0.read().unwrap()[1].as_ref().unwrap().data, 1);
assert_eq!(dag.backend.0.read().unwrap()[1].as_ref().unwrap().parents.len(), 1);
assert_eq!(dag.backend.0.read().unwrap()[1].as_ref().unwrap().parents[0], test::Id(0));
}
@@ -356,9 +351,8 @@ mod tests {
fn test_branch() {
let mut dag = {
let head = test::Node {
- id: test::Id(0),
parents: vec![],
- data: 42,
+ data: 0,
};
let b = test::Backend::new(vec![Some(head.clone())]);
let dag = tokio_test::block_on(AsyncDag::new(b, head));
@@ -372,9 +366,8 @@ mod tests {
assert_eq!(dag.backend.0.read().unwrap().len(), 1);
assert_eq!(dag.head, test::Id(0));
let new_head = test::Node {
- id: test::Id(1),
parents: vec![test::Id(0)],
- data: 43,
+ data: 1,
};
let id = tokio_test::block_on(dag.update_head(new_head));
@@ -393,9 +386,8 @@ mod tests {
fn test_merging() {
let mut dag = {
let head = test::Node {
- id: test::Id(0),
parents: vec![],
- data: 42,
+ data: 0,
};
let b = test::Backend::new(vec![Some(head.clone())]);
let dag = tokio_test::block_on(AsyncDag::new(b, head));
@@ -409,9 +401,8 @@ mod tests {
assert_eq!(dag.backend.0.read().unwrap().len(), 1);
assert_eq!(dag.head, test::Id(0));
let new_head = test::Node {
- id: test::Id(1),
parents: vec![test::Id(0)],
- data: 43,
+ data: 1,
};
let id = tokio_test::block_on(dag.update_head(new_head));
@@ -426,9 +417,8 @@ mod tests {
assert_eq!(branched.backend.0.read().unwrap().len(), 2);
assert_eq!(branched.head, test::Id(0));
let new_head = test::Node {
- id: test::Id(2),
parents: vec![test::Id(0)],
- data: 44,
+ data: 2,
};
let id = tokio_test::block_on(branched.update_head(new_head));
@@ -443,9 +433,8 @@ mod tests {
impl super::Merger<test::Id, test::Node> for M {
fn create_merge_node(&self, left_id: &test::Id, right_id: &test::Id) -> Result<test::Node> {
Ok(test::Node {
- id: test::Id(3),
parents: vec![left_id.clone(), right_id.clone()],
- data: 45,
+ data: 3,
})
}
}