summaryrefslogtreecommitdiffstats
path: root/src/async_dag.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/async_dag.rs')
-rw-r--r--src/async_dag.rs74
1 files changed, 43 insertions, 31 deletions
diff --git a/src/async_dag.rs b/src/async_dag.rs
index 8b4c595..7abc6c5 100644
--- a/src/async_dag.rs
+++ b/src/async_dag.rs
@@ -6,8 +6,6 @@
use std::pin::Pin;
-use anyhow::anyhow;
-use anyhow::Result;
use futures::stream::StreamExt;
use futures::task::Poll;
@@ -27,16 +25,17 @@ where
_node: std::marker::PhantomData<N>,
}
-impl<Id, N, Backend> AsyncDag<Id, N, Backend>
+impl<Id, N, Backend, Error> AsyncDag<Id, N, Backend>
where
Id: NodeId + Send,
N: Node<Id = Id>,
- Backend: DagBackend<Id, N>,
+ Backend: DagBackend<Id, N, Error = Error>,
+ Error: From<crate::Error<Id>>,
{
/// Create a new DAG with a backend and a HEAD node
///
/// The head node is `DagBackend::put` into the backend before the AsyncDag object is created.
- pub async fn new(mut backend: Backend, head: N) -> Result<Self> {
+ pub async fn new(mut backend: Backend, head: N) -> Result<Self, Error> {
backend.put(head).await.map(|id| AsyncDag {
head: id,
backend,
@@ -49,16 +48,16 @@ where
/// # Warning
///
/// This fails if backend.get(head) fails.
- pub async fn load(backend: Backend, head: Id) -> Result<Self> {
+ pub async fn load(backend: Backend, head: Id) -> Result<Self, Error> {
backend
- .get(head)
+ .get(head.clone())
.await?
.map(|(id, _)| AsyncDag {
head: id,
backend,
_node: std::marker::PhantomData,
})
- .ok_or_else(|| anyhow!("Node not found"))
+ .ok_or_else(|| crate::Error::NodeNotFound(head).into())
}
pub fn head(&self) -> &Id {
@@ -74,10 +73,10 @@ where
}
/// Check whether an `id` is in the DAG.
- pub async fn has_id(&self, id: &Id) -> Result<bool> {
+ pub async fn has_id(&self, id: &Id) -> Result<bool, Error> {
self.stream()
- .map(|r| -> Result<bool> { r.map(|(ref node_id, _)| node_id == id) })
- .collect::<Vec<Result<bool>>>()
+ .map(|r| -> Result<bool, _> { r.map(|(ref node_id, _)| node_id == id) })
+ .collect::<Vec<Result<bool, _>>>()
.await
.into_iter()
.fold(Ok(false), |acc, e| match (acc, e) {
@@ -94,7 +93,7 @@ where
/// # Warning
///
/// The order of the nodes is not (yet) guaranteed.
- pub fn stream(&self) -> Stream<Id, N, Backend> {
+ pub fn stream(&self) -> Stream<Id, N, Backend, Error> {
Stream {
dag: self,
backlog: {
@@ -110,11 +109,11 @@ where
/// # Warning
///
/// fails if the passed node does not point to the current HEAD in its parents.
- pub async fn update_head(&mut self, node: N) -> Result<Id> {
+ pub async fn update_head(&mut self, node: N) -> Result<Id, Error> {
if node.parent_ids().iter().any(|id| id == &self.head) {
self.update_head_unchecked(node).await
} else {
- Err(anyhow!("Node does not have HEAD as parent"))
+ Err(Error::from(crate::Error::HeadNotAParent))
}
}
@@ -124,7 +123,7 @@ where
///
/// Does not check whether the passed `node` does point to the current (then old) HEAD in its
/// parents. Be careful to not lose nodes from the DAG with this.
- pub async fn update_head_unchecked(&mut self, node: N) -> Result<Id> {
+ pub async fn update_head_unchecked(&mut self, node: N) -> Result<Id, Error> {
let id = self.backend.put(node).await?;
self.head = id.clone();
Ok(id)
@@ -149,9 +148,13 @@ where
///
/// Use the `merger` function to merge the two head IDs and generate a new Node instance for
/// the new HEAD of `self`.
- pub async fn merge<M>(&mut self, other: &AsyncDag<Id, N, Backend>, merger: M) -> Result<Id>
+ pub async fn merge<M>(
+ &mut self,
+ other: &AsyncDag<Id, N, Backend>,
+ merger: M,
+ ) -> Result<Id, Error>
where
- M: Merger<Id, N>,
+ M: Merger<Id, N, Error = Error>,
{
let node = merger.create_merge_node(&self.head, &other.head)?;
let id = self.backend.put(node).await?;
@@ -165,41 +168,49 @@ where
Id: NodeId,
N: Node<Id = Id>,
{
- fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N>;
+ type Error;
+
+ fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N, Self::Error>;
}
-impl<Id, N, F> Merger<Id, N> for F
+impl<Id, N, F, Error> Merger<Id, N> for F
where
Id: NodeId,
N: Node<Id = Id>,
- F: Fn(&Id, &Id) -> Result<N>,
+ F: Fn(&Id, &Id) -> Result<N, Error>,
+ Error: From<crate::Error<Id>>,
{
- fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N> {
+ type Error = Error;
+
+ fn create_merge_node(&self, left_id: &Id, right_id: &Id) -> Result<N, Self::Error> {
(self)(left_id, right_id)
}
}
/// Stream adapter for streaming all nodes in a DAG
-pub struct Stream<'a, Id, N, Backend>
+pub struct Stream<'a, Id, N, Backend, Error>
where
Id: NodeId + Send,
N: Node<Id = Id>,
Backend: DagBackend<Id, N>,
+ Error: From<crate::Error<Id>>,
{
dag: &'a AsyncDag<Id, N, Backend>,
- backlog: Vec<Pin<Backlog<'a, Id, N>>>,
+ backlog: Vec<Pin<Backlog<'a, Id, N, Error>>>,
}
-pub type Backlog<'a, Id, N> =
- Box<(dyn futures::future::Future<Output = Result<Option<(Id, N)>>> + std::marker::Send + 'a)>;
+pub type Backlog<'a, Id, N, Error> = Box<
+ (dyn futures::future::Future<Output = Result<Option<(Id, N)>, Error>> + std::marker::Send + 'a),
+>;
-impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend>
+impl<'a, Id, N, Backend, Error> futures::stream::Stream for Stream<'a, Id, N, Backend, Error>
where
Id: NodeId + Send,
N: Node<Id = Id>,
- Backend: DagBackend<Id, N>,
+ Backend: DagBackend<Id, N, Error = Error>,
+ Error: From<crate::Error<Id>>,
{
- type Item = Result<(Id, N)>;
+ type Item = Result<(Id, N), Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
@@ -235,11 +246,10 @@ where
#[cfg(test)]
mod tests {
-
- use anyhow::Result;
use futures::StreamExt;
use crate::test_impl as test;
+ use crate::test_impl::TestError;
use crate::AsyncDag;
use crate::DagBackend;
@@ -445,11 +455,13 @@ mod tests {
struct M;
impl super::Merger<test::Id, test::Node> for M {
+ type Error = TestError;
+
fn create_merge_node(
&self,
left_id: &test::Id,
right_id: &test::Id,
- ) -> Result<test::Node> {
+ ) -> Result<test::Node, Self::Error> {
Ok(test::Node {
parents: vec![*left_id, *right_id],
data: 3,