summaryrefslogtreecommitdiffstats
path: root/src/async_dag.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/async_dag.rs')
-rw-r--r--src/async_dag.rs26
1 files changed, 21 insertions, 5 deletions
diff --git a/src/async_dag.rs b/src/async_dag.rs
index 20a3239..66d79fc 100644
--- a/src/async_dag.rs
+++ b/src/async_dag.rs
@@ -31,6 +31,7 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
N: Node<Id = Id>,
Backend: DagBackend<Id, N>
{
+ /// Create a new DAG with a backend and a HEAD node
pub async fn new(backend: Backend, head: N) -> Result<Self> {
backend
.get(head.id().clone())
@@ -45,6 +46,7 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
.ok_or_else(|| anyhow!("Head not found in backend"))
}
+ /// Check whether an `id` is in the DAG.
pub async fn has_id(&self, id: &Id) -> Result<bool> {
self.stream()
.map(|r| -> Result<bool> {
@@ -83,6 +85,13 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
.collect()
}
+ /// Iterate over the DAG
+ ///
+ /// This function returns a Stream over all nodes in the DAG.
+ ///
+ /// # Warning
+ ///
+ /// The order of the nodes is not (yet) guaranteed.
pub fn stream(&self) -> Stream<Id, N, Backend> {
Stream {
dag: self,
@@ -94,6 +103,11 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
}
}
+ /// Update the HEAD pointer of the DAG
+ ///
+ /// # Warning
+ ///
+ /// fails if the passed node does not point to the current HEAD in its parents.
pub async fn update_head(&mut self, node: N) -> Result<Id> {
if node.parent_ids().iter().any(|id| id == &self.head) {
self.update_head_unchecked(node).await
@@ -102,6 +116,12 @@ impl<Id, N, Backend> AsyncDag<Id, N, Backend>
}
}
+ /// Update the HEAD pointer of the DAG, unchecked
+ ///
+ /// # Warning
+ ///
+ /// Does not check whether the passed `node` does point to the current (then old) HEAD in its
+ /// parents. Be careful to not lose nodes from the DAG with this.
pub async fn update_head_unchecked(&mut self, node: N) -> Result<Id> {
let id = self.backend.put(node).await?;
self.head = id.clone();
@@ -143,7 +163,7 @@ pub trait Merger<Id, N>
fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N>;
}
-
+/// Stream adapter for streaming all nodes in a DAG
pub struct Stream<'a, Id, N, Backend>
where Id: NodeId + Send,
N: Node<Id = Id>,
@@ -158,12 +178,8 @@ impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend>
N: Node<Id = Id>,
Backend: DagBackend<Id, N>
{
-
type Item = Result<N>;
- /// Attempt to resolve the next item in the stream.
- /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
- /// is ready, and `Poll::Ready(None)` if the stream has completed.
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>) -> futures::task::Poll<Option<Self::Item>> {
if let Some(mut fut) = self.as_mut().backlog.pop() {
match fut.as_mut().poll(cx) {