path: root/lib/src/reactor
diff options
Diffstat (limited to 'lib/src/reactor')
8 files changed, 0 insertions, 477 deletions
diff --git a/lib/src/reactor/ b/lib/src/reactor/
deleted file mode 100644
index 59913b5..0000000
--- a/lib/src/reactor/
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for account handling (following accounts, caching account updates) using the gossip
-//! module for the lower-level handling
-use std::sync::Arc;
-use anyhow::Result;
-use tokio::sync::RwLock;
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-pub struct AccountReactor(GossipReactor);
-impl AccountReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
diff --git a/lib/src/reactor/ b/lib/src/reactor/
deleted file mode 100644
index a32f1c3..0000000
--- a/lib/src/reactor/
+++ /dev/null
@@ -1,15 +0,0 @@
-use tokio::sync::mpsc::UnboundedSender as Sender;
-use tokio::sync::mpsc::UnboundedReceiver as Receiver;
-/// Type for sending messages to a reactor
-pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
-/// Type that is used by a reactor for receiving messages
-pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
-/// Type that represents the channel that has to be send with a request to a reactor for getting an
-/// answer back
-pub type ReplySender<Reply> = Sender<Reply>;
-pub type ReplyReceiver<Reply> = Receiver<Reply>;
diff --git a/lib/src/reactor/ b/lib/src/reactor/
deleted file mode 100644
index 1014ca1..0000000
--- a/lib/src/reactor/
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for multi-device support functionality,
-//! which uses the gossip module for the lower-level handling
-use std::sync::Arc;
-use anyhow::Result;
-use tokio::sync::RwLock;
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-pub struct DeviceReactor(GossipReactor);
-impl DeviceReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
diff --git a/lib/src/reactor/gossip/ b/lib/src/reactor/gossip/
deleted file mode 100644
index 68fbf06..0000000
--- a/lib/src/reactor/gossip/
+++ /dev/null
@@ -1,19 +0,0 @@
-use anyhow::Result;
-pub enum GossipRequest {
- Exit,
- Ping,
- PublishMe,
- Connect(ipfs::MultiaddrWithPeerId),
-pub enum GossipReply {
- Exiting,
- Pong,
- NoHead,
- PublishMeResult(Result<()>),
- ConnectResult(Result<()>),
diff --git a/lib/src/reactor/gossip/ b/lib/src/reactor/gossip/
deleted file mode 100644
index 7658509..0000000
--- a/lib/src/reactor/gossip/
+++ /dev/null
@@ -1,329 +0,0 @@
-//! Low-level module for gossip'ing code
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-use std::sync::Arc;
-use anyhow::Result;
-use tokio::sync::RwLock;
-use crate::profile::Profile;
-use crate::reactor::Reactor;
-use crate::reactor::ReactorBuilder;
-use crate::reactor::ctrl::ReactorReceiver;
-use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplySender;
-mod ctrl;
-pub use ctrl::GossipRequest;
-pub use ctrl::GossipReply;
-mod msg;
-pub use msg::GossipMessage;
-mod strategy;
-pub use strategy::GossipHandlingStrategy;
-pub use strategy::LogStrategy;
-pub struct GossipReactorBuilder {
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
-impl GossipReactorBuilder {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
- Self { profile, gossip_topic_name }
- }
-impl ReactorBuilder for GossipReactorBuilder {
- type Reactor = GossipReactor;
- fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
- GossipReactor {
- profile: self.profile,
- gossip_topic_name: self.gossip_topic_name,
- receiver: rr,
- strategy: std::marker::PhantomData,
- }
- }
-pub struct GossipReactor<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
- receiver: ReactorReceiver<GossipRequest, GossipReply>,
- strategy: std::marker::PhantomData<Strategy>,
-impl<S> std::fmt::Debug for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
- }
-impl<S> GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
- fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(reply) {
- anyhow::bail!("Failed to send GossipReply::NoHead)")
- }
- Ok(())
- }
- async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
- let profile =;
- let head = profile.head();
- if head.is_none() {
- Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
- return Ok(())
- }
- let head = head.unwrap().to_bytes();
- let own_id = match profile.client().own_id().await {
- Ok(id) => id,
- Err(e) => {
- Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
- return Ok(()) // TODO: abort operation here for now, maybe not the best idea
- }
- };
- let publish_res = profile
- .client()
- .ipfs
- .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
- peer_id: own_id.to_bytes(),
- cid: head
- }.into_bytes()?)
- .await;
- match publish_res {
- Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
- Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
- }
- }
- async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
- log::trace!("Connecting GossipReactor with {:?}", addr);
- }
- #[cfg(test)]
- async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
- self.profile
- .read()
- .await
- .client()
- .ipfs
- .peers()
- .await
- .map(|connections| {
- connections.iter().any(|connection| connection.addr == addr)
- })
- }
-impl<S> Reactor for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
- type Request = GossipRequest;
- type Reply = GossipReply;
- async fn run(mut self) -> Result<()> {
- use futures::stream::StreamExt;
- log::trace!("Booting {:?}", self);
- let mut subscription_stream = self.profile
- .read()
- .await
- .client()
- .ipfs
- .pubsub_subscribe(self.gossip_topic_name.clone())
- .await?;
- log::trace!("{:?} main loop", self);
- loop {
- tokio::select! {
- next_control_msg = self.receiver.recv() => {
- log::trace!("Received control message: {:?}", next_control_msg);
- match next_control_msg {
- None => break,
- Some((GossipRequest::Exit, reply_channel)) => {
- if let Err(_) = reply_channel.send(GossipReply::Exiting) {
- anyhow::bail!("Failed sending EXITING reply")
- }
- break
- },
- Some((GossipRequest::Ping, reply_channel)) => {
- log::trace!("Replying with Pong");
- if let Err(_) = reply_channel.send(GossipReply::Pong) {
- anyhow::bail!("Failed sending PONG reply")
- }
- },
- Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
- Some((GossipRequest::Connect(addr), reply_channel)) => {
- let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
- if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
- anyhow::bail!("Failed sending Connect({}) reply", addr)
- }
- },
- }
- }
- next_gossip_message = => {
- if let Some(msg) = next_gossip_message {
- log::trace!("Received gossip message");
- match serde_json::from_slice(& {
- Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
- }
- } else {
- log::trace!("Gossip stream closed, breaking reactor loop");
- break;
- }
- }
- }
- }
- Ok(())
- }
-mod tests {
- use super::*;
- use std::convert::TryFrom;
- use std::sync::Arc;
- use tokio::sync::RwLock;
- #[tokio::test]
- async fn test_gossip_reactor_simple() {
- let _ = env_logger::try_init();
- let profile = Profile::new_inmemory("test-gossip-reactor-simple").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
- let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Ping, reply_sender)).unwrap();
- let mut pong_received = false;
- let _ = tokio::spawn(async move {
- });
- match reply_receiver.recv().await {
- Some(GossipReply::Pong) => {
- pong_received = true;
- log::trace!("Pong received!");
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Exit, reply_sender)).unwrap();
- }
- Some(r) => {
- assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
- }
- None => {
- // nothing
- }
- }
- assert!(pong_received, "No pong received");
- }
- #[tokio::test]
- async fn test_gossip_reactor_gossipping() {
- let _ = env_logger::try_init();
- let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
- let (left_profile, left_reactor, left_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built left GossipReactor");
- let (right_profile, right_reactor, right_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built right GossipReactor");
- async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
- .await
- .client()
- .ipfs
- .identity()
- .await
- .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
- .and_then(|(peerid, mut addr)| {
- ipfs::MultiaddrWithPeerId::try_from({
- addr.pop().expect("At least one address for client")
- })
- .map_err(anyhow::Error::from)
- })
- }
- let left_running_reactor = tokio::spawn(async move {
- });
- let right_running_reactor = tokio::spawn(async move {
- });
- let left_peer_id = get_peer_id(left_profile.clone()).await;
- log::trace!("Left GossipReactor = {:?}", left_peer_id);
- assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
- let left_peer_id = left_peer_id.unwrap();
- let right_peer_id = get_peer_id(right_profile.clone()).await;
- log::trace!("Right GossipReactor = {:?}", right_peer_id);
- assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
- let right_peer_id = right_peer_id.unwrap();
- let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- log::trace!("Right GossipReactor should now connect to left GossipReactor");
- right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
- log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
- match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
- Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
- Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
- log::trace!("Right GossipReactor is connected");
- assert!(true)
- },
- Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
- Ok(None) => assert!(false, "No reply from right reactor received"),
- }
- }
diff --git a/lib/src/reactor/gossip/ b/lib/src/reactor/gossip/
deleted file mode 100644
index 049fc68..0000000
--- a/lib/src/reactor/gossip/
+++ /dev/null
@@ -1,17 +0,0 @@
-use anyhow::Result;
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
-pub enum GossipMessage {
- CurrentProfileState {
- peer_id: Vec<u8>,
- cid: Vec<u8>,
- },
-impl GossipMessage {
- pub(super) fn into_bytes(self) -> Result<Vec<u8>> {
- serde_json::to_string(&self)
- .map(String::into_bytes)
- .map_err(anyhow::Error::from)
- }
diff --git a/lib/src/reactor/gossip/ b/lib/src/reactor/gossip/
deleted file mode 100644
index 6fe9d1a..0000000
--- a/lib/src/reactor/gossip/
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::sync::Arc;
-use anyhow::Result;
-use tokio::sync::RwLock;
-use crate::profile::Profile;
-use crate::reactor::gossip::msg::GossipMessage;
-pub trait GossipHandlingStrategy: Sync + Send {
- async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>;
-pub struct LogStrategy;
-impl GossipHandlingStrategy for LogStrategy {
- async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> {
- use std::convert::TryFrom;
- match msg {
- GossipMessage::CurrentProfileState { peer_id, cid } => {
- let peer_id = ipfs::PeerId::from_bytes(&peer_id);
- let cid = cid::Cid::try_from(&*cid);
- log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
- }
- }
- Ok(())
- }
diff --git a/lib/src/reactor/ b/lib/src/reactor/
deleted file mode 100644
index 5299851..0000000
--- a/lib/src/reactor/
+++ /dev/null
@@ -1,28 +0,0 @@
-use std::sync::Arc;
-use std::fmt::Debug;
-use anyhow::Result;
-use tokio::sync::RwLock;
-use crate::profile::Profile;
-mod gossip;
-mod device;
-mod account;
-mod ctrl;
-use ctrl::ReactorReceiver;
-pub trait Reactor {
- type Request: Debug + Send + Sync;
- type Reply: Debug + Send + Sync;
- async fn run(self) -> Result<()>;
-pub trait ReactorBuilder {
- type Reactor: Reactor;
- fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor;