From fb364c1ff338e7a67d717b77d431937888d8c009 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 4 Apr 2021 13:18:23 +0200 Subject: Import rewrite from distrox repository Signed-off-by: Matthias Beyer --- src/async_dag.rs | 248 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/dag_backend.rs | 97 +++++++++++++++++++++ src/id.rs | 18 ---- src/lib.rs | 18 +++- src/node.rs | 40 ++------- src/node_id.rs | 2 + src/repository.rs | 23 ----- src/test_impl.rs | 51 +++++++++++ 8 files changed, 418 insertions(+), 79 deletions(-) create mode 100644 src/async_dag.rs create mode 100644 src/dag_backend.rs delete mode 100644 src/id.rs create mode 100644 src/node_id.rs delete mode 100644 src/repository.rs create mode 100644 src/test_impl.rs (limited to 'src') diff --git a/src/async_dag.rs b/src/async_dag.rs new file mode 100644 index 0000000..f882b4d --- /dev/null +++ b/src/async_dag.rs @@ -0,0 +1,248 @@ +use std::pin::Pin; + +use anyhow::Result; +use anyhow::anyhow; +use futures::stream::StreamExt; +use futures::task::Poll; + +use crate::Node; +use crate::NodeId; +use crate::DagBackend; + +/// An async DAG, generic over Node, Node identifier and Backend implementation +pub struct AsyncDag + where Id: NodeId + Send, + N: Node, + Backend: DagBackend +{ + head: N, + backend: Backend, +} + +impl AsyncDag + where Id: NodeId + Send, + N: Node, + Backend: DagBackend +{ + pub async fn new(backend: Backend, head: N) -> Result { + backend + .get(head.id().clone()) + .await? + .map(|node| { + AsyncDag { + head: node, + backend: backend + } + }) + .ok_or_else(|| anyhow!("Head not found in backend")) + } + + pub async fn has_id(&self, id: &Id) -> Result { + self.stream() + .map(|r| -> Result { + r.map(|node| node.id() == id) + }) + .collect::>>() + .await + .into_iter() + .fold(Ok(false), |acc, e| { + match (acc, e) { + (Err(e), _) => Err(e), + (Ok(_), Err(e)) => Err(e), + (Ok(a), Ok(b)) => Ok(a || b), + } + }) + } + + async fn get_next(&self, id: Id) -> Result> { + self.backend + .get(id) + .await? + .ok_or_else(|| anyhow!("ID Not found"))? + .parent_ids() + .into_iter() + .map(|id| async move { + self.backend + .get(id) + .await + .transpose() + }) + .collect::>() + .collect::>() + .await + .into_iter() + .filter_map(|o| o) + .collect() + } + + fn stream(&self) -> Stream { + Stream { + dag: self, + backlog: { + let mut v = Vec::with_capacity(2); + v.push(self.backend.get(self.head.id().clone())); + v + } + } + } + +} + + +pub struct Stream<'a, Id, N, Backend> + where Id: NodeId + Send, + N: Node, + Backend: DagBackend +{ + dag: &'a AsyncDag, + backlog: Vec>> + std::marker::Send + 'a)>>>, +} + +impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend> + where Id: NodeId + Send, + N: Node, + Backend: DagBackend +{ + + type Item = Result; + + /// Attempt to resolve the next item in the stream. + /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value + /// is ready, and `Poll::Ready(None)` if the stream has completed. + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>) -> futures::task::Poll> { + if let Some(mut fut) = self.as_mut().backlog.pop() { + match fut.as_mut().poll(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Ready(Ok(Some(node))) => { + for parent in node.parent_ids().into_iter() { + let fut = self.dag.backend.get(parent); + self.as_mut().backlog.push(fut); + } + Poll::Ready(Some(Ok(node))) + }, + Poll::Ready(Ok(None)) => { + // backend.get() returned Ok(None), so the referenced node seems not to exist + // + // TODO: Decide whether we should return an error here. + cx.waker().wake_by_ref(); + Poll::Pending + }, + Poll::Pending => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } else { + Poll::Ready(None) + } + } +} + + +#[cfg(test)] +mod tests { + use std::pin::Pin; + + use anyhow::Result; + use anyhow::anyhow; + use async_trait::async_trait; + use tokio_test::block_on; + + use crate::DagBackend; + use crate::AsyncDag; + use crate::test_impl as test; + + #[test] + fn test_dag_two_nodes() { + let head = test::Node { + id: test::Id(1), + parents: vec![test::Id(0)], + data: 43, + }; + + let b = test::Backend(vec![ + { + Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + }) + }, + { + Some(head.clone()) + }, + ]); + + { + let node = tokio_test::block_on(b.get(test::Id(1))).unwrap().unwrap(); + assert_eq!(node.data, 43); + assert_eq!(node.id, test::Id(1)); + assert!(!node.parents.is_empty()); // to check whether the parent is set + } + + let dag = tokio_test::block_on(AsyncDag::new(b, head)); + assert!(dag.is_ok()); + let dag = dag.unwrap(); + + { + let has_id = tokio_test::block_on(dag.has_id(&test::Id(0))); + assert!(has_id.is_ok()); + let has_id = has_id.unwrap(); + assert!(has_id); + } + { + let has_id = tokio_test::block_on(dag.has_id(&test::Id(1))); + assert!(has_id.is_ok()); + let has_id = has_id.unwrap(); + assert!(has_id); + } + + { + let next = tokio_test::block_on(dag.get_next(test::Id(1))); + assert!(next.is_ok()); + let mut next = next.unwrap(); + assert_eq!(next.len(), 1); + let node = next.pop(); + assert!(node.is_some()); + let node = node.unwrap(); + assert_eq!(node.id, test::Id(0)); + assert_eq!(node.data, 42); + assert!(node.parents.is_empty()); + } + } + #[test] + fn test_dag_two_nodes_stream() { + use futures::StreamExt; + + let head = test::Node { + id: test::Id(1), + parents: vec![test::Id(0)], + data: 43, + }; + + let b = test::Backend(vec![ + { + Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + }) + }, + { + Some(head.clone()) + }, + ]); + + let dag = tokio_test::block_on(AsyncDag::new(b, head)); + assert!(dag.is_ok()); + let dag = dag.unwrap(); + + let v = tokio_test::block_on(dag.stream().collect::>()); + + assert_eq!(v.len(), 2, "Expected two nodes: {:?}", v); + assert_eq!(v[0].as_ref().unwrap().id, test::Id(1)); + assert_eq!(v[1].as_ref().unwrap().id, test::Id(0)); + } + +} + diff --git a/src/dag_backend.rs b/src/dag_backend.rs new file mode 100644 index 0000000..6a49146 --- /dev/null +++ b/src/dag_backend.rs @@ -0,0 +1,97 @@ +use anyhow::Result; +use async_trait::async_trait; + +use crate::NodeId; +use crate::Node; + +#[async_trait] +pub trait DagBackend + where N: Node, + Id: NodeId + Send +{ + async fn get(&self, id: Id) -> Result>; + async fn put(&mut self, node: N) -> Result; +} + +#[cfg(test)] +mod tests { + use std::pin::Pin; + + use anyhow::Result; + use anyhow::anyhow; + use async_trait::async_trait; + use tokio_test::block_on; + + use crate::test_impl as test; + use crate::*; + + #[test] + fn test_backend_get() { + let b = test::Backend(vec![Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + })]); + + let node = tokio_test::block_on(b.get(test::Id(0))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_some()); + let node = node.unwrap(); + + assert_eq!(node.data, 42); + assert_eq!(node.id, test::Id(0)); + assert!(node.parents.is_empty()); + } + + #[test] + fn test_backend_put() { + let mut b = test::Backend(vec![Some(test::Node { + id: test::Id(0), + parents: vec![], + data: 42, + })]); + + let id = tokio_test::block_on(b.put({ + test::Node { + id: test::Id(1), + parents: vec![], + data: 43, + } + })); + + { + let node = tokio_test::block_on(b.get(test::Id(0))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_some()); + let node = node.unwrap(); + + assert_eq!(node.data, 42); + assert_eq!(node.id, test::Id(0)); + assert!(node.parents.is_empty()); + } + { + let node = tokio_test::block_on(b.get(test::Id(1))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_some()); + let node = node.unwrap(); + + assert_eq!(node.data, 43); + assert_eq!(node.id, test::Id(1)); + assert!(node.parents.is_empty()); + } + { + let node = tokio_test::block_on(b.get(test::Id(2))); + assert!(node.is_ok()); + let node = node.unwrap(); + + assert!(node.is_none()); + } + } + +} diff --git a/src/id.rs b/src/id.rs deleted file mode 100644 index 018ae60..0000000 --- a/src/id.rs +++ /dev/null @@ -1,18 +0,0 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -use std::fmt::Debug; - -/// An "ID" is an object that can be used to uniquely identify a node in the DAG. -/// -/// In git-speak, this would be a SHA1 hash. -/// -pub trait Id : Clone + Debug + PartialEq + Eq { - - /* empty */ - -} - diff --git a/src/lib.rs b/src/lib.rs index bda0996..655cef1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,18 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // -pub mod id; -pub mod node; -pub mod repository; +mod async_dag; +pub use async_dag::*; + +mod dag_backend; +pub use dag_backend::*; + +mod node; +pub use node::*; + +mod node_id; +pub use node_id::*; + +#[cfg(test)] +mod test_impl; + diff --git a/src/node.rs b/src/node.rs index b6a088b..3759259 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,39 +1,9 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// +use crate::NodeId; -use std::fmt::Debug; +pub trait Node { + type Id: NodeId; -use crate::id::Id; - -use futures::future::Future; -use futures::stream::Stream; - -/// A "Node" is an object of the (DA)Graph -/// -/// In git-speak, this would be an "Object". -/// -/// -/// # Equality -/// -/// A node might be compared to another node. In git, for example, equality would be the equality -/// of the nodes ids, because objects are non-mutable. -/// -pub trait Node: Debug + PartialEq + Eq { - type Id: Id; - type NodePayload: Debug; - type Error: Debug; - type Payload: Future; - - /// It should be trivial to get the Id of a Node. - fn id(&self) -> Self::Id; - - /// Fetch the payload of the node. - fn payload(&self) -> Self::Payload; - - /// Fetch the Ids of the parents of this node - fn parent_ids(&self) -> Stream; + fn id(&self) -> &Self::Id; + fn parent_ids(&self) -> Vec; } diff --git a/src/node_id.rs b/src/node_id.rs new file mode 100644 index 0000000..e4fe6b4 --- /dev/null +++ b/src/node_id.rs @@ -0,0 +1,2 @@ +pub trait NodeId: Clone + Eq + PartialEq + std::hash::Hash { +} diff --git a/src/repository.rs b/src/repository.rs deleted file mode 100644 index 245b2e4..0000000 --- a/src/repository.rs +++ /dev/null @@ -1,23 +0,0 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -use std::fmt::Debug; - -use crate::node; -use crate::id; - -use futures::future::Future; - -/// -pub trait Repository: Debug + Sync + Send { - type Id: id::Id; - type Error: Debug; - type Node: node::Node; - - /// It should be trivial to get the Id of a Node. - fn get(&self, id: Self::Id) -> Box>; -} - diff --git a/src/test_impl.rs b/src/test_impl.rs new file mode 100644 index 0000000..b79b90b --- /dev/null +++ b/src/test_impl.rs @@ -0,0 +1,51 @@ +use anyhow::Result; +use async_trait::async_trait; + +#[derive(Copy, Clone, Eq, PartialEq, std::hash::Hash, Debug)] +pub struct Id(pub(crate) usize); + +impl crate::NodeId for Id {} + +#[derive(Clone, Debug)] +pub struct Node { + pub(crate) id: Id, + pub(crate) parents: Vec, + pub(crate) data: u8, +} + +impl crate::Node for Node { + type Id = Id; + + fn id(&self) -> &Self::Id { + &self.id + } + + fn parent_ids(&self) -> Vec { + self.parents.clone() + } +} + +#[derive(Debug)] +pub struct Backend(pub(crate) Vec>); + +#[async_trait] +impl crate::DagBackend for Backend { + async fn get(&self, id: Id) -> Result> { + if self.0.len() < id.0 + 1 { + Ok(None) + } else { + Ok(self.0[id.0].clone()) + } + } + + async fn put(&mut self, node: Node) -> Result { + while self.0.len() < node.id.0 + 1 { + self.0.push(None) + } + + let idx = node.id.0; + self.0[idx] = Some(node); + Ok(Id(idx)) + } +} + -- cgit v1.2.3