summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-04-04 13:18:23 +0200
committerMatthias Beyer <mail@beyermatthias.de>2021-04-04 13:18:35 +0200
commitfb364c1ff338e7a67d717b77d431937888d8c009 (patch)
tree0698dbd412322ec9d2c09cf599ef1fe863be9c0e /src
parent2d25173aeafe9357f7cbd5c1edf6225d9ccf1114 (diff)
Import rewrite from distrox repository
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r--src/async_dag.rs248
-rw-r--r--src/dag_backend.rs97
-rw-r--r--src/id.rs18
-rw-r--r--src/lib.rs18
-rw-r--r--src/node.rs40
-rw-r--r--src/node_id.rs2
-rw-r--r--src/repository.rs23
-rw-r--r--src/test_impl.rs51
8 files changed, 418 insertions, 79 deletions
diff --git a/src/async_dag.rs b/src/async_dag.rs
new file mode 100644
index 0000000..f882b4d
--- /dev/null
+++ b/src/async_dag.rs
@@ -0,0 +1,248 @@
+use std::pin::Pin;
+
+use anyhow::Result;
+use anyhow::anyhow;
+use futures::stream::StreamExt;
+use futures::task::Poll;
+
+use crate::Node;
+use crate::NodeId;
+use crate::DagBackend;
+
+/// An async DAG, generic over Node, Node identifier and Backend implementation
+pub struct AsyncDag<Id, N, Backend>
+ where Id: NodeId + Send,
+ N: Node<Id = Id>,
+ Backend: DagBackend<Id, N>
+{
+ head: N,
+ backend: Backend,
+}
+
+impl<Id, N, Backend> AsyncDag<Id, N, Backend>
+ where Id: NodeId + Send,
+ N: Node<Id = Id>,
+ Backend: DagBackend<Id, N>
+{
+ pub async fn new(backend: Backend, head: N) -> Result<Self> {
+ backend
+ .get(head.id().clone())
+ .await?
+ .map(|node| {
+ AsyncDag {
+ head: node,
+ backend: backend
+ }
+ })
+ .ok_or_else(|| anyhow!("Head not found in backend"))
+ }
+
+ pub async fn has_id(&self, id: &Id) -> Result<bool> {
+ self.stream()
+ .map(|r| -> Result<bool> {
+ r.map(|node| node.id() == id)
+ })
+ .collect::<Vec<Result<bool>>>()
+ .await
+ .into_iter()
+ .fold(Ok(false), |acc, e| {
+ match (acc, e) {
+ (Err(e), _) => Err(e),
+ (Ok(_), Err(e)) => Err(e),
+ (Ok(a), Ok(b)) => Ok(a || b),
+ }
+ })
+ }
+
+ async fn get_next(&self, id: Id) -> Result<Vec<N>> {
+ self.backend
+ .get(id)
+ .await?
+ .ok_or_else(|| anyhow!("ID Not found"))?
+ .parent_ids()
+ .into_iter()
+ .map(|id| async move {
+ self.backend
+ .get(id)
+ .await
+ .transpose()
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<_>>()
+ .await
+ .into_iter()
+ .filter_map(|o| o)
+ .collect()
+ }
+
+ fn stream(&self) -> Stream<Id, N, Backend> {
+ Stream {
+ dag: self,
+ backlog: {
+ let mut v = Vec::with_capacity(2);
+ v.push(self.backend.get(self.head.id().clone()));
+ v
+ }
+ }
+ }
+
+}
+
+
+pub struct Stream<'a, Id, N, Backend>
+ where Id: NodeId + Send,
+ N: Node<Id = Id>,
+ Backend: DagBackend<Id, N>
+{
+ dag: &'a AsyncDag<Id, N, Backend>,
+ backlog: Vec<Pin<Box<(dyn futures::future::Future<Output = Result<Option<N>>> + std::marker::Send + 'a)>>>,
+}
+
+impl<'a, Id, N, Backend> futures::stream::Stream for Stream<'a, Id, N, Backend>
+ where Id: NodeId + Send,
+ N: Node<Id = Id>,
+ Backend: DagBackend<Id, N>
+{
+
+ type Item = Result<N>;
+
+ /// Attempt to resolve the next item in the stream.
+ /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
+ /// is ready, and `Poll::Ready(None)` if the stream has completed.
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>) -> futures::task::Poll<Option<Self::Item>> {
+ if let Some(mut fut) = self.as_mut().backlog.pop() {
+ match fut.as_mut().poll(cx) {
+ Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
+ Poll::Ready(Ok(Some(node))) => {
+ for parent in node.parent_ids().into_iter() {
+ let fut = self.dag.backend.get(parent);
+ self.as_mut().backlog.push(fut);
+ }
+ Poll::Ready(Some(Ok(node)))
+ },
+ Poll::Ready(Ok(None)) => {
+ // backend.get() returned Ok(None), so the referenced node seems not to exist
+ //
+ // TODO: Decide whether we should return an error here.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ },
+ Poll::Pending => {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ } else {
+ Poll::Ready(None)
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use std::pin::Pin;
+
+ use anyhow::Result;
+ use anyhow::anyhow;
+ use async_trait::async_trait;
+ use tokio_test::block_on;
+
+ use crate::DagBackend;
+ use crate::AsyncDag;
+ use crate::test_impl as test;
+
+ #[test]
+ fn test_dag_two_nodes() {
+ let head = test::Node {
+ id: test::Id(1),
+ parents: vec![test::Id(0)],
+ data: 43,
+ };
+
+ let b = test::Backend(vec![
+ {
+ Some(test::Node {
+ id: test::Id(0),
+ parents: vec![],
+ data: 42,
+ })
+ },
+ {
+ Some(head.clone())
+ },
+ ]);
+
+ {
+ let node = tokio_test::block_on(b.get(test::Id(1))).unwrap().unwrap();
+ assert_eq!(node.data, 43);
+ assert_eq!(node.id, test::Id(1));
+ assert!(!node.parents.is_empty()); // to check whether the parent is set
+ }
+
+ let dag = tokio_test::block_on(AsyncDag::new(b, head));
+ assert!(dag.is_ok());
+ let dag = dag.unwrap();
+
+ {
+ let has_id = tokio_test::block_on(dag.has_id(&test::Id(0)));
+ assert!(has_id.is_ok());
+ let has_id = has_id.unwrap();
+ assert!(has_id);
+ }
+ {
+ let has_id = tokio_test::block_on(dag.has_id(&test::Id(1)));
+ assert!(has_id.is_ok());
+ let has_id = has_id.unwrap();
+ assert!(has_id);
+ }
+
+ {
+ let next = tokio_test::block_on(dag.get_next(test::Id(1)));
+ assert!(next.is_ok());
+ let mut next = next.unwrap();
+ assert_eq!(next.len(), 1);
+ let node = next.pop();
+ assert!(node.is_some());
+ let node = node.unwrap();
+ assert_eq!(node.id, test::Id(0));
+ assert_eq!(node.data, 42);
+ assert!(node.parents.is_empty());
+ }
+ }
+ #[test]
+ fn test_dag_two_nodes_stream() {
+ use futures::StreamExt;
+
+ let head = test::Node {
+ id: test::Id(1),
+ parents: vec![test::Id(0)],
+ data: 43,
+ };
+
+ let b = test::Backend(vec![
+ {
+ Some(test::Node {
+ id: test::Id(0),
+ parents: vec![],
+ data: 42,
+ })
+ },
+ {
+ Some(head.clone())
+ },
+ ]);
+
+ let dag = tokio_test::block_on(AsyncDag::new(b, head));
+ assert!(dag.is_ok());
+ let dag = dag.unwrap();
+
+ let v = tokio_test::block_on(dag.stream().collect::<Vec<_>>());
+
+ assert_eq!(v.len(), 2, "Expected two nodes: {:?}", v);
+ assert_eq!(v[0].as_ref().unwrap().id, test::Id(1));
+ assert_eq!(v[1].as_ref().unwrap().id, test::Id(0));
+ }
+
+}
+
diff --git a/src/dag_backend.rs b/src/dag_backend.rs
new file mode 100644
index 0000000..6a49146
--- /dev/null
+++ b/src/dag_backend.rs
@@ -0,0 +1,97 @@
+use anyhow::Result;
+use async_trait::async_trait;
+
+use crate::NodeId;
+use crate::Node;
+
+#[async_trait]
+pub trait DagBackend<Id, N>
+ where N: Node,
+ Id: NodeId + Send
+{
+ async fn get(&self, id: Id) -> Result<Option<N>>;
+ async fn put(&mut self, node: N) -> Result<Id>;
+}
+
+#[cfg(test)]
+mod tests {
+ use std::pin::Pin;
+
+ use anyhow::Result;
+ use anyhow::anyhow;
+ use async_trait::async_trait;
+ use tokio_test::block_on;
+
+ use crate::test_impl as test;
+ use crate::*;
+
+ #[test]
+ fn test_backend_get() {
+ let b = test::Backend(vec![Some(test::Node {
+ id: test::Id(0),
+ parents: vec![],
+ data: 42,
+ })]);
+
+ let node = tokio_test::block_on(b.get(test::Id(0)));
+ assert!(node.is_ok());
+ let node = node.unwrap();
+
+ assert!(node.is_some());
+ let node = node.unwrap();
+
+ assert_eq!(node.data, 42);
+ assert_eq!(node.id, test::Id(0));
+ assert!(node.parents.is_empty());
+ }
+
+ #[test]
+ fn test_backend_put() {
+ let mut b = test::Backend(vec![Some(test::Node {
+ id: test::Id(0),
+ parents: vec![],
+ data: 42,
+ })]);
+
+ let id = tokio_test::block_on(b.put({
+ test::Node {
+ id: test::Id(1),
+ parents: vec![],
+ data: 43,
+ }
+ }));
+
+ {
+ let node = tokio_test::block_on(b.get(test::Id(0)));
+ assert!(node.is_ok());
+ let node = node.unwrap();
+
+ assert!(node.is_some());
+ let node = node.unwrap();
+
+ assert_eq!(node.data, 42);
+ assert_eq!(node.id, test::Id(0));
+ assert!(node.parents.is_empty());
+ }
+ {
+ let node = tokio_test::block_on(b.get(test::Id(1)));
+ assert!(node.is_ok());
+ let node = node.unwrap();
+
+ assert!(node.is_some());
+ let node = node.unwrap();
+
+ assert_eq!(node.data, 43);
+ assert_eq!(node.id, test::Id(1));
+ assert!(node.parents.is_empty());
+ }
+ {
+ let node = tokio_test::block_on(b.get(test::Id(2)));
+ assert!(node.is_ok());
+ let node = node.unwrap();
+
+ assert!(node.is_none());
+ }
+ }
+
+}
diff --git a/src/id.rs b/src/id.rs
deleted file mode 100644
index 018ae60..0000000
--- a/src/id.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-//
-
-use std::fmt::Debug;
-
-/// An "ID" is an object that can be used to uniquely identify a node in the DAG.
-///
-/// In git-speak, this would be a SHA1 hash.
-///
-pub trait Id : Clone + Debug + PartialEq + Eq {
-
- /* empty */
-
-}
-
diff --git a/src/lib.rs b/src/lib.rs
index bda0996..655cef1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,6 +4,18 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
-pub mod id;
-pub mod node;
-pub mod repository;
+mod async_dag;
+pub use async_dag::*;
+
+mod dag_backend;
+pub use dag_backend::*;
+
+mod node;
+pub use node::*;
+
+mod node_id;
+pub use node_id::*;
+
+#[cfg(test)]
+mod test_impl;
+
diff --git a/src/node.rs b/src/node.rs
index b6a088b..3759259 100644
--- a/src/node.rs
+++ b/src/node.rs
@@ -1,39 +1,9 @@
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-//
+use crate::NodeId;
-use std::fmt::Debug;
+pub trait Node {
+ type Id: NodeId;
-use crate::id::Id;
-
-use futures::future::Future;
-use futures::stream::Stream;
-
-/// A "Node" is an object of the (DA)Graph
-///
-/// In git-speak, this would be an "Object".
-///
-///
-/// # Equality
-///
-/// A node might be compared to another node. In git, for example, equality would be the equality
-/// of the nodes ids, because objects are non-mutable.
-///
-pub trait Node: Debug + PartialEq + Eq {
- type Id: Id;
- type NodePayload: Debug;
- type Error: Debug;
- type Payload: Future<Item = Self::NodePayload, Error = Self::Error>;
-
- /// It should be trivial to get the Id of a Node.
- fn id(&self) -> Self::Id;
-
- /// Fetch the payload of the node.
- fn payload(&self) -> Self::Payload;
-
- /// Fetch the Ids of the parents of this node
- fn parent_ids(&self) -> Stream<Item = Self::Id, Error = Self::Error>;
+ fn id(&self) -> &Self::Id;
+ fn parent_ids(&self) -> Vec<Self::Id>;
}
diff --git a/src/node_id.rs b/src/node_id.rs
new file mode 100644
index 0000000..e4fe6b4
--- /dev/null
+++ b/src/node_id.rs
@@ -0,0 +1,2 @@
+pub trait NodeId: Clone + Eq + PartialEq + std::hash::Hash {
+}
diff --git a/src/repository.rs b/src/repository.rs
deleted file mode 100644
index 245b2e4..0000000
--- a/src/repository.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-//
-
-use std::fmt::Debug;
-
-use crate::node;
-use crate::id;
-
-use futures::future::Future;
-
-///
-pub trait Repository: Debug + Sync + Send {
- type Id: id::Id;
- type Error: Debug;
- type Node: node::Node<Id = Self::Id, Error = Self::Error>;
-
- /// It should be trivial to get the Id of a Node.
- fn get(&self, id: Self::Id) -> Box<Future<Item = Self::Node, Error = Self::Error>>;
-}
-
diff --git a/src/test_impl.rs b/src/test_impl.rs
new file mode 100644
index 0000000..b79b90b
--- /dev/null
+++ b/src/test_impl.rs
@@ -0,0 +1,51 @@
+use anyhow::Result;
+use async_trait::async_trait;
+
+#[derive(Copy, Clone, Eq, PartialEq, std::hash::Hash, Debug)]
+pub struct Id(pub(crate) usize);
+
+impl crate::NodeId for Id {}
+
+#[derive(Clone, Debug)]
+pub struct Node {
+ pub(crate) id: Id,
+ pub(crate) parents: Vec<Id>,
+ pub(crate) data: u8,
+}
+
+impl crate::Node for Node {
+ type Id = Id;
+
+ fn id(&self) -> &Self::Id {
+ &self.id
+ }
+
+ fn parent_ids(&self) -> Vec<Self::Id> {
+ self.parents.clone()
+ }
+}
+
+#[derive(Debug)]
+pub struct Backend(pub(crate) Vec<Option<Node>>);
+
+#[async_trait]
+impl crate::DagBackend<Id, Node> for Backend {
+ async fn get(&self, id: Id) -> Result<Option<Node>> {
+ if self.0.len() < id.0 + 1 {
+ Ok(None)
+ } else {
+ Ok(self.0[id.0].clone())
+ }
+ }
+
+ async fn put(&mut self, node: Node) -> Result<Id> {
+ while self.0.len() < node.id.0 + 1 {
+ self.0.push(None)
+ }
+
+ let idx = node.id.0;
+ self.0[idx] = Some(node);
+ Ok(Id(idx))
+ }
+}
+