diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-04-04 13:18:23 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-04-04 13:18:35 +0200 |
commit | fb364c1ff338e7a67d717b77d431937888d8c009 (patch) | |
tree | 0698dbd412322ec9d2c09cf599ef1fe863be9c0e | |
parent | 2d25173aeafe9357f7cbd5c1edf6225d9ccf1114 (diff) |
Import rewrite from distrox repository
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | Cargo.toml | 20 | ||||
-rw-r--r-- | README.md | 5 | ||||
-rw-r--r-- | src/async_dag.rs | 248 | ||||
-rw-r--r-- | src/dag_backend.rs | 97 | ||||
-rw-r--r-- | src/id.rs | 18 | ||||
-rw-r--r-- | src/lib.rs | 18 | ||||
-rw-r--r-- | src/node.rs | 40 | ||||
-rw-r--r-- | src/node_id.rs | 2 | ||||
-rw-r--r-- | src/repository.rs | 23 | ||||
-rw-r--r-- | src/test_impl.rs | 51 |
10 files changed, 436 insertions, 86 deletions
@@ -1,8 +1,18 @@ [package] -name = "daglib" -version = "0.1.0" -authors = ["Matthias Beyer <mail@beyermatthias.de>"] -edition = "2018" +name = "daglib" +version = "0.1.0" +authors = ["Matthias Beyer <mail@beyermatthias.de>"] +edition = "2018" +description = "Generic async DAG library" [dependencies] -futures = "0.1" +# TODO: Replace with thiserror +anyhow = "1" + +async-trait = "0.1" +futures = "0.3" +tokio = { version = "1", features = ["full"] } + +[dev-dependencies] +tokio-test = "0.4" + @@ -1,7 +1,8 @@ # daglib -Generic library for working with DAG data structures. Based on futures, so that -it can be used in a async context. +Async DAG library generic over Node type, Node Identifier Type and Backend +(storage of the DAG). + ## Status diff --git a/src/async_dag.rs b/src/async_dag.rs new file mode 100644 index 0000000..f882b4d --- /dev/null +++ b/src/async_dag.rs @@ -0,0 +1,248 @@ +use std::pin::Pin; + +use anyhow::Result; +use anyhow::anyhow; +use futures::stream::StreamExt; +use futures::task::Poll; + +use crate::Node; +use crate::NodeId; +use crate::DagBackend; + +/// An async DAG, generic over Node, Node identifier and Backend implementation +pub struct AsyncDag<Id, N, Backend> + where Id: NodeId + Send, + N: Node<Id = Id>, + Backend: DagBackend<Id, N> +{ + head: N, + backend: Backend, +} + +impl<Id, N, Backend> AsyncDag<Id, N, Backend> + where Id: NodeId + Send, + N: Node<Id = Id>, + Backend: DagBackend<Id, N> +{ + pub async fn new(backend: Backend, head: N) -> Result<Self> { + backend + .get(head.id().clone()) + .await? + .map(|node| { + AsyncDag { + head: node, + backend: backend + } + }) + .ok_or_else(|| anyhow!("Head not found in backend")) + } + + pub async fn has_id(&self, id: &Id) -> Result<bool> { + self.stream() + .map(|r| -> Result<bool> { + r.map(|node| node.id() == id) + }) + .collect::<Vec<Result<bool>>>() + .await + .into_iter() + .fold(Ok(false), |acc, e| { + match (acc, e) { + (Err(e), _) => Err(e), + (Ok(_), Err(e)) => Err(e), + (Ok(a), Ok(b)) => Ok(a || b), + } + }) + } + + async fn get_next(&self, id: Id) -> Result<Vec<N>> { + self.backend + .get(id) + .await? + .ok_or_else(|| anyhow!("ID Not found"))? + .parent_ids() + .into_iter() + .map(|id| async move { + self.backend + .get(id) + .await + .transpose() + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<_>>() + .await + .into_iter() + .filter_map(|o| o) + .collect() + } + + fn stream(&self) -> Stream<Id, N, Backend> { + Stream { + dag: self, + backlog: { + let mut v = Vec::with_capacity(2); + v.push(self.backend.get(self.head.id().clone())); + v + } + } + } + +} + + +pub struct Stream<'a, Id, N, Backend> + where Id: NodeId + Send, + N: Node<Id = Id>, + Backend: DagBackend<Id, N> +{ + dag: &'a AsyncDag<Id, N, Backend>, + backlog: Vec<Pin<Box<(dyn futures::future::Future<Output = Result<Option<N>>> + std::marker::Send + 'a)>>>, +} + +impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend> + where Id: NodeId + Send, + N: Node<Id = Id>, + Backend: DagBackend<Id, N> +{ + + type Item = Result<N>; + + /// Attempt to resolve the next item in the stream. + /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value + /// is ready, and `Poll::Ready(None)` if the stream has completed. + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>) -> futures::task::Poll<Option<Self::Item>> { + if let Some(mut fut) = self.as_mut().backlog.pop() { + match fut.as_mut().poll(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Ready(Ok(Some(node))) => { + for parent in node.parent_ids().into_iter() { + let fut = self.dag.backend.get(parent); + self.as_mut().backlog.push(fut); + } + Poll::Ready(Some(Ok(node))) + }, + Poll::Ready(Ok(None)) => { + // backend.get() returned Ok(None), so the referenced node seems not to exist + // + // TODO: Decide whether we should return an error here. + cx.waker().wake_by_ref(); + Poll::Pending + }, + Poll::Pending => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } else { + Poll::Ready(None) + } + } +} + + +#[cfg(test)] +mod tests { + use std::pin::Pin; + + use anyhow::Result; + use anyhow::anyhow; + use async_trait::async_trait; + use tokio_test::block_on; + + use crate::DagBackend; + use crate::AsyncDag; + use crate::test_impl as test; + + #[test] + fn test_dag_two_nodes() { + let head = test::Node { + id: test::Id(1), + parents: vec![test::Id(0)], + data: 43, + }; + + let b = test::Backend(vec![ + { + Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + }) + }, + { + Some(head.clone()) + }, + ]); + + { + let node = tokio_test::block_on(b.get(test::Id(1))).unwrap().unwrap(); + assert_eq!(node.data, 43); + assert_eq!(node.id, test::Id(1)); + assert!(!node.parents.is_empty()); // to check whether the parent is set + } + + let dag = tokio_test::block_on(AsyncDag::new(b, head)); + assert!(dag.is_ok()); + let dag = dag.unwrap(); + + { + let has_id = tokio_test::block_on(dag.has_id(&test::Id(0))); + assert!(has_id.is_ok()); + let has_id = has_id.unwrap(); + assert!(has_id); + } + { + let has_id = tokio_test::block_on(dag.has_id(&test::Id(1))); + assert!(has_id.is_ok()); + let has_id = has_id.unwrap(); + assert!(has_id); + } + + { + let next = tokio_test::block_on(dag.get_next(test::Id(1))); + assert!(next.is_ok()); + let mut next = next.unwrap(); + assert_eq!(next.len(), 1); + let node = next.pop(); + assert!(node.is_some()); + let node = node.unwrap(); + assert_eq!(node.id, test::Id(0)); + assert_eq!(node.data, 42); + assert!(node.parents.is_empty()); + } + } + #[test] + fn test_dag_two_nodes_stream() { + use futures::StreamExt; + + let head = test::Node { + id: test::Id(1), + parents: vec![test::Id(0)], + data: 43, + }; + + let b = test::Backend(vec![ + { + Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + }) + }, + { + Some(head.clone()) + }, + ]); + + let dag = tokio_test::block_on(AsyncDag::new(b, head)); + assert!(dag.is_ok()); + let dag = dag.unwrap(); + + let v = tokio_test::block_on(dag.stream().collect::<Vec<_>>()); + + assert_eq!(v.len(), 2, "Expected two nodes: {:?}", v); + assert_eq!(v[0].as_ref().unwrap().id, test::Id(1)); + assert_eq!(v[1].as_ref().unwrap().id, test::Id(0)); + } + +} + diff --git a/src/dag_backend.rs b/src/dag_backend.rs new file mode 100644 index 0000000..6a49146 --- /dev/null +++ b/src/dag_backend.rs @@ -0,0 +1,97 @@ +use anyhow::Result; +use async_trait::async_trait; + +use crate::NodeId; +use crate::Node; + +#[async_trait] +pub trait DagBackend<Id, N> + where N: Node, + Id: NodeId + Send +{ + async fn get(&self, id: Id) -> Result<Option<N>>; + async fn put(&mut self, node: N) -> Result<Id>; +} + +#[cfg(test)] +mod tests { + use std::pin::Pin; + + use anyhow::Result; + use anyhow::anyhow; + use async_trait::async_trait; + use tokio_test::block_on; + + use crate::test_impl as test; + use crate::*; + + #[test] + fn test_backend_get() { + let b = test::Backend(vec![Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + })]); + + let node = tokio_test::block_on(b.get(test::Id(0))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_some()); + let node = node.unwrap(); + + assert_eq!(node.data, 42); + assert_eq!(node.id, test::Id(0)); + assert!(node.parents.is_empty()); + } + + #[test] + fn test_backend_put() { + let mut b = test::Backend(vec![Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + })]); + + let id = tokio_test::block_on(b.put({ + test::Node { + id: test::Id(1), + parents: vec![], + data: 43, + } + })); + + { + let node = tokio_test::block_on(b.get(test::Id(0))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_some()); + let node = node.unwrap(); + + assert_eq!(node.data, 42); + assert_eq!(node.id, test::Id(0)); + assert!(node.parents.is_empty()); + } + { + let node = tokio_test::block_on(b.get(test::Id(1))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_some()); + let node = node.unwrap(); + + assert_eq!(node.data, 43); + assert_eq!(node.id, test::Id(1)); + assert!(node.parents.is_empty()); + } + { + let node = tokio_test::block_on(b.get(test::Id(2))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_none()); + } + } + +} diff --git a/src/id.rs b/src/id.rs deleted file mode 100644 index 018ae60..0000000 --- a/src/id.rs +++ /dev/null @@ -1,18 +0,0 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -use std::fmt::Debug; - -/// An "ID" is an object that can be used to uniquely identify a node in the DAG. -/// -/// In git-speak, this would be a SHA1 hash. -/// -pub trait Id : Clone + Debug + PartialEq + Eq { - - /* empty */ - -} - @@ -4,6 +4,18 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // -pub mod id; -pub mod node; -pub mod repository; +mod async_dag; +pub use async_dag::*; + +mod dag_backend; +pub use dag_backend::*; + +mod node; +pub use node::*; + +mod node_id; +pub use node_id::*; + +#[cfg(test)] +mod test_impl; + diff --git a/src/node.rs b/src/node.rs index b6a088b..3759259 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,39 +1,9 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// +use crate::NodeId; -use std::fmt::Debug; +pub trait Node { + type Id: NodeId; -use crate::id::Id; - -use futures::future::Future; -use futures::stream::Stream; - -/// A "Node" is an object of the (DA)Graph -/// -/// In git-speak, this would be an "Object". -/// -/// -/// # Equality -/// -/// A node might be compared to another node. In git, for example, equality would be the equality -/// of the nodes ids, because objects are non-mutable. -/// -pub trait Node: Debug + PartialEq + Eq { - type Id: Id; - type NodePayload: Debug; - type Error: Debug; - type Payload: Future<Item = Self::NodePayload, Error = Self::Error>; - - /// It should be trivial to get the Id of a Node. - fn id(&self) -> Self::Id; - - /// Fetch the payload of the node. - fn payload(&self) -> Self::Payload; - - /// Fetch the Ids of the parents of this node - fn parent_ids(&self) -> Stream<Item = Self::Id, Error = Self::Error>; + fn id(&self) -> &Self::Id; + fn parent_ids(&self) -> Vec<Self::Id>; } diff --git a/src/node_id.rs b/src/node_id.rs new file mode 100644 index 0000000..e4fe6b4 --- /dev/null +++ b/src/node_id.rs @@ -0,0 +1,2 @@ +pub trait NodeId: Clone + Eq + PartialEq + std::hash::Hash { +} diff --git a/src/repository.rs b/src/repository.rs deleted file mode 100644 index 245b2e4..0000000 --- a/src/repository.rs +++ /dev/null @@ -1,23 +0,0 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -use std::fmt::Debug; - -use crate::node; -use crate::id; - -use futures::future::Future; - -/// -pub trait Repository: Debug + Sync + Send { - type Id: id::Id; - type Error: Debug; - type Node: node::Node<Id = Self::Id, Error = Self::Error>; - - /// It should be trivial to get the Id of a Node. - fn get(&self, id: Self::Id) -> Box<Future<Item = Self::Node, Error = Self::Error>>; -} - diff --git a/src/test_impl.rs b/src/test_impl.rs new file mode 100644 index 0000000..b79b90b --- /dev/null +++ b/src/test_impl.rs @@ -0,0 +1,51 @@ +use anyhow::Result; +use async_trait::async_trait; + +#[derive(Copy, Clone, Eq, PartialEq, std::hash::Hash, Debug)] +pub struct Id(pub(crate) usize); + +impl crate::NodeId for Id {} + +#[derive(Clone, Debug)] +pub struct Node { + pub(crate) id: Id, + pub(crate) parents: Vec<Id>, + pub(crate) data: u8, +} + +impl crate::Node for Node { + type Id = Id; + + fn id(&self) -> &Self::Id { + &self.id + } + + fn parent_ids(&self) -> Vec<Self::Id> { + self.parents.clone() + } +} + +#[derive(Debug)] +pub struct Backend(pub(crate) Vec<Option<Node>>); + +#[async_trait] +impl crate::DagBackend<Id, Node> for Backend { + async fn get(&self, id: Id) -> Result<Option<Node>> { + if self.0.len() < id.0 + 1 { + Ok(None) + } else { + Ok(self.0[id.0].clone()) + } + } + + async fn put(&mut self, node: Node) -> Result<Id> { + while self.0.len() < node.id.0 + 1 { + self.0.push(None) + } + + let idx = node.id.0; + self.0[idx] = Some(node); + Ok(Id(idx)) + } +} + |