diff options
Diffstat (limited to 'src/async_dag.rs')
-rw-r--r-- | src/async_dag.rs | 74 |
1 files changed, 43 insertions, 31 deletions
diff --git a/src/async_dag.rs b/src/async_dag.rs index 8b4c595..7abc6c5 100644 --- a/src/async_dag.rs +++ b/src/async_dag.rs @@ -6,8 +6,6 @@ use std::pin::Pin; -use anyhow::anyhow; -use anyhow::Result; use futures::stream::StreamExt; use futures::task::Poll; @@ -27,16 +25,17 @@ where _node: std::marker::PhantomData<N>, } -impl<Id, N, Backend> AsyncDag<Id, N, Backend> +impl<Id, N, Backend, Error> AsyncDag<Id, N, Backend> where Id: NodeId + Send, N: Node<Id = Id>, - Backend: DagBackend<Id, N>, + Backend: DagBackend<Id, N, Error = Error>, + Error: From<crate::Error<Id>>, { /// Create a new DAG with a backend and a HEAD node /// /// The head node is `DagBackend::put` into the backend before the AsyncDag object is created. - pub async fn new(mut backend: Backend, head: N) -> Result<Self> { + pub async fn new(mut backend: Backend, head: N) -> Result<Self, Error> { backend.put(head).await.map(|id| AsyncDag { head: id, backend, @@ -49,16 +48,16 @@ where /// # Warning /// /// This fails if backend.get(head) fails. - pub async fn load(backend: Backend, head: Id) -> Result<Self> { + pub async fn load(backend: Backend, head: Id) -> Result<Self, Error> { backend - .get(head) + .get(head.clone()) .await? .map(|(id, _)| AsyncDag { head: id, backend, _node: std::marker::PhantomData, }) - .ok_or_else(|| anyhow!("Node not found")) + .ok_or_else(|| crate::Error::NodeNotFound(head).into()) } pub fn head(&self) -> &Id { @@ -74,10 +73,10 @@ where } /// Check whether an `id` is in the DAG. - pub async fn has_id(&self, id: &Id) -> Result<bool> { + pub async fn has_id(&self, id: &Id) -> Result<bool, Error> { self.stream() - .map(|r| -> Result<bool> { r.map(|(ref node_id, _)| node_id == id) }) - .collect::<Vec<Result<bool>>>() + .map(|r| -> Result<bool, _> { r.map(|(ref node_id, _)| node_id == id) }) + .collect::<Vec<Result<bool, _>>>() .await .into_iter() .fold(Ok(false), |acc, e| match (acc, e) { @@ -94,7 +93,7 @@ where /// # Warning /// /// The order of the nodes is not (yet) guaranteed. - pub fn stream(&self) -> Stream<Id, N, Backend> { + pub fn stream(&self) -> Stream<Id, N, Backend, Error> { Stream { dag: self, backlog: { @@ -110,11 +109,11 @@ where /// # Warning /// /// fails if the passed node does not point to the current HEAD in its parents. - pub async fn update_head(&mut self, node: N) -> Result<Id> { + pub async fn update_head(&mut self, node: N) -> Result<Id, Error> { if node.parent_ids().iter().any(|id| id == &self.head) { self.update_head_unchecked(node).await } else { - Err(anyhow!("Node does not have HEAD as parent")) + Err(Error::from(crate::Error::HeadNotAParent)) } } @@ -124,7 +123,7 @@ where /// /// Does not check whether the passed `node` does point to the current (then old) HEAD in its /// parents. Be careful to not lose nodes from the DAG with this. - pub async fn update_head_unchecked(&mut self, node: N) -> Result<Id> { + pub async fn update_head_unchecked(&mut self, node: N) -> Result<Id, Error> { let id = self.backend.put(node).await?; self.head = id.clone(); Ok(id) @@ -149,9 +148,13 @@ where /// /// Use the `merger` function to merge the two head IDs and generate a new Node instance for /// the new HEAD of `self`. - pub async fn merge<M>(&mut self, other: &AsyncDag<Id, N, Backend>, merger: M) -> Result<Id> + pub async fn merge<M>( + &mut self, + other: &AsyncDag<Id, N, Backend>, + merger: M, + ) -> Result<Id, Error> where - M: Merger<Id, N>, + M: Merger<Id, N, Error = Error>, { let node = merger.create_merge_node(&self.head, &other.head)?; let id = self.backend.put(node).await?; @@ -165,41 +168,49 @@ where Id: NodeId, N: Node<Id = Id>, { - fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N>; + type Error; + + fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N, Self::Error>; } -impl<Id, N, F> Merger<Id, N> for F +impl<Id, N, F, Error> Merger<Id, N> for F where Id: NodeId, N: Node<Id = Id>, - F: Fn(&Id, &Id) -> Result<N>, + F: Fn(&Id, &Id) -> Result<N, Error>, + Error: From<crate::Error<Id>>, { - fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N> { + type Error = Error; + + fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N, Self::Error> { (self)(left_id, right_id) } } /// Stream adapter for streaming all nodes in a DAG -pub struct Stream<'a, Id, N, Backend> +pub struct Stream<'a, Id, N, Backend, Error> where Id: NodeId + Send, N: Node<Id = Id>, Backend: DagBackend<Id, N>, + Error: From<crate::Error<Id>>, { dag: &'a AsyncDag<Id, N, Backend>, - backlog: Vec<Pin<Backlog<'a, Id, N>>>, + backlog: Vec<Pin<Backlog<'a, Id, N, Error>>>, } -pub type Backlog<'a, Id, N> = - Box<(dyn futures::future::Future<Output = Result<Option<(Id, N)>>> + std::marker::Send + 'a)>; +pub type Backlog<'a, Id, N, Error> = Box< + (dyn futures::future::Future<Output = Result<Option<(Id, N)>, Error>> + std::marker::Send + 'a), +>; -impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend> +impl<'a, Id, N, Backend, Error> futures::stream::Stream for Stream<'a, Id, N, Backend, Error> where Id: NodeId + Send, N: Node<Id = Id>, - Backend: DagBackend<Id, N>, + Backend: DagBackend<Id, N, Error = Error>, + Error: From<crate::Error<Id>>, { - type Item = Result<(Id, N)>; + type Item = Result<(Id, N), Error>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -235,11 +246,10 @@ where #[cfg(test)] mod tests { - - use anyhow::Result; use futures::StreamExt; use crate::test_impl as test; + use crate::test_impl::TestError; use crate::AsyncDag; use crate::DagBackend; @@ -445,11 +455,13 @@ mod tests { struct M; impl super::Merger<test::Id, test::Node> for M { + type Error = TestError; + fn create_merge_node( &self, left_id: &test::Id, right_id: &test::Id, - ) -> Result<test::Node> { + ) -> Result<test::Node, Self::Error> { Ok(test::Node { parents: vec![*left_id, *right_id], data: 3, |