summaryrefslogtreecommitdiffstats
path: root/lib/src/reactor
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/reactor')
-rw-r--r--lib/src/reactor/account.rs19
-rw-r--r--lib/src/reactor/ctrl.rs15
-rw-r--r--lib/src/reactor/device.rs19
-rw-r--r--lib/src/reactor/gossip/ctrl.rs19
-rw-r--r--lib/src/reactor/gossip/mod.rs331
-rw-r--r--lib/src/reactor/gossip/msg.rs17
-rw-r--r--lib/src/reactor/gossip/strategy.rs31
-rw-r--r--lib/src/reactor/mod.rs28
8 files changed, 479 insertions, 0 deletions
diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs
new file mode 100644
index 0000000..59913b5
--- /dev/null
+++ b/lib/src/reactor/account.rs
@@ -0,0 +1,19 @@
+//! Module for account handling (following accounts, caching account updates) using the gossip
+//! module for the lower-level handling
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::GossipReactor;
+
+#[derive(Debug)]
+pub struct AccountReactor(GossipReactor);
+
+impl AccountReactor {
+ pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ unimplemented!()
+ }
+}
diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs
new file mode 100644
index 0000000..a32f1c3
--- /dev/null
+++ b/lib/src/reactor/ctrl.rs
@@ -0,0 +1,15 @@
+use tokio::sync::mpsc::UnboundedSender as Sender;
+use tokio::sync::mpsc::UnboundedReceiver as Receiver;
+
+/// Type for sending messages to a reactor
+pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
+
+/// Type that is used by a reactor for receiving messages
+pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
+
+/// Type that represents the channel that has to be send with a request to a reactor for getting an
+/// answer back
+pub type ReplySender<Reply> = Sender<Reply>;
+
+pub type ReplyReceiver<Reply> = Receiver<Reply>;
+
diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs
new file mode 100644
index 0000000..1014ca1
--- /dev/null
+++ b/lib/src/reactor/device.rs
@@ -0,0 +1,19 @@
+//! Module for multi-device support functionality,
+//! which uses the gossip module for the lower-level handling
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::GossipReactor;
+
+#[derive(Debug)]
+pub struct DeviceReactor(GossipReactor);
+
+impl DeviceReactor {
+ pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ unimplemented!()
+ }
+}
diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs
new file mode 100644
index 0000000..68fbf06
--- /dev/null
+++ b/lib/src/reactor/gossip/ctrl.rs
@@ -0,0 +1,19 @@
+use anyhow::Result;
+
+#[derive(Debug)]
+pub enum GossipRequest {
+ Exit,
+ Ping,
+ PublishMe,
+ Connect(ipfs::MultiaddrWithPeerId),
+}
+
+#[derive(Debug)]
+pub enum GossipReply {
+ Exiting,
+ Pong,
+ NoHead,
+ PublishMeResult(Result<()>),
+ ConnectResult(Result<()>),
+}
+
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
new file mode 100644
index 0000000..c8a7d28
--- /dev/null
+++ b/lib/src/reactor/gossip/mod.rs
@@ -0,0 +1,331 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::Reactor;
+use crate::reactor::ReactorBuilder;
+use crate::reactor::ctrl::ReactorReceiver;
+use crate::reactor::ctrl::ReactorSender;
+use crate::reactor::ctrl::ReplySender;
+
+mod ctrl;
+pub use ctrl::GossipRequest;
+pub use ctrl::GossipReply;
+
+mod msg;
+pub use msg::GossipMessage;
+
+mod strategy;
+pub use strategy::GossipHandlingStrategy;
+pub use strategy::LogStrategy;
+
+#[derive(Debug)]
+pub struct GossipReactorBuilder {
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+}
+
+impl GossipReactorBuilder {
+ pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
+ Self { profile, gossip_topic_name }
+ }
+}
+
+impl ReactorBuilder for GossipReactorBuilder {
+ type Reactor = GossipReactor;
+
+ fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
+ GossipReactor {
+ profile: self.profile,
+ gossip_topic_name: self.gossip_topic_name,
+ receiver: rr,
+ strategy: std::marker::PhantomData,
+ }
+ }
+}
+
+pub struct GossipReactor<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+ receiver: ReactorReceiver<GossipRequest, GossipReply>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<S> std::fmt::Debug for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
+ }
+}
+
+impl<S> GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
+ if let Err(_) = channel.send(reply) {
+ anyhow::bail!("Failed to send GossipReply::NoHead)")
+ }
+
+ Ok(())
+ }
+
+ async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
+ let profile = self.profile.read().await;
+
+ let head = profile.head();
+ if head.is_none() {
+ Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
+ return Ok(())
+ }
+ let head = head.unwrap().to_bytes();
+
+ let own_id = match profile.client().own_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
+ return Ok(()) // TODO: abort operation here for now, maybe not the best idea
+ }
+ };
+
+ let publish_res = profile
+ .client()
+ .ipfs
+ .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
+ peer_id: own_id.to_bytes(),
+ cid: head
+ }.into_bytes()?)
+ .await;
+
+ match publish_res {
+ Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
+ Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
+ }
+ }
+
+ async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
+ log::trace!("Connecting GossipReactor with {:?}", addr);
+ self.profile.read().await.client().connect(addr).await
+ }
+
+ #[cfg(test)]
+ async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
+ self.profile
+ .read()
+ .await
+ .client()
+ .ipfs
+ .peers()
+ .await
+ .map(|connections| {
+ connections.iter().any(|connection| connection.addr == addr)
+ })
+ }
+
+}
+
+#[async_trait::async_trait]
+impl<S> Reactor for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ type Request = GossipRequest;
+ type Reply = GossipReply;
+
+ async fn run(mut self) -> Result<()> {
+ use futures::stream::StreamExt;
+
+ log::trace!("Booting {:?}", self);
+ let mut subscription_stream = self.profile
+ .read()
+ .await
+ .client()
+ .ipfs
+ .pubsub_subscribe(self.gossip_topic_name.clone())
+ .await?;
+
+ log::trace!("{:?} main loop", self);
+ loop {
+ tokio::select! {
+ next_control_msg = self.receiver.recv() => {
+ log::trace!("Received control message: {:?}", next_control_msg);
+ match next_control_msg {
+ None => break,
+ Some((GossipRequest::Exit, reply_channel)) => {
+ if let Err(_) = reply_channel.send(GossipReply::Exiting) {
+ anyhow::bail!("Failed sending EXITING reply")
+ }
+ break
+ },
+
+ Some((GossipRequest::Ping, reply_channel)) => {
+ log::trace!("Replying with Pong");
+ if let Err(_) = reply_channel.send(GossipReply::Pong) {
+ anyhow::bail!("Failed sending PONG reply")
+ }
+ },
+
+ Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
+
+ Some((GossipRequest::Connect(addr), reply_channel)) => {
+ let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
+ if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
+ anyhow::bail!("Failed sending Connect({}) reply", addr)
+ }
+ },
+ }
+ }
+
+ next_gossip_message = subscription_stream.next() => {
+ if let Some(msg) = next_gossip_message {
+ log::trace!("Received gossip message");
+ match serde_json::from_slice(&msg.data) {
+ Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
+ }
+ } else {
+ log::trace!("Gossip stream closed, breaking reactor loop");
+ break;
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::convert::TryFrom;
+ use std::sync::Arc;
+ use tokio::sync::RwLock;
+
+ use crate::config::Config;
+
+ #[tokio::test]
+ async fn test_gossip_reactor_simple() {
+ let _ = env_logger::try_init();
+
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
+
+ let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+ rx.send((GossipRequest::Ping, reply_sender)).unwrap();
+
+ let mut pong_received = false;
+ let _ = tokio::spawn(async move {
+ reactor.run().await
+ });
+
+ match reply_receiver.recv().await {
+ Some(GossipReply::Pong) => {
+ pong_received = true;
+ log::trace!("Pong received!");
+ let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+ rx.send((GossipRequest::Exit, reply_sender)).unwrap();
+ }
+ Some(r) => {
+ assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
+ }
+ None => {
+ // nothing
+ }
+ }
+
+ assert!(pong_received, "No pong received");
+ }
+
+ #[tokio::test]
+ async fn test_gossip_reactor_gossipping() {
+ let _ = env_logger::try_init();
+
+ let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
+ let (left_profile, left_reactor, left_rx) = {
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
+ };
+ log::trace!("Built left GossipReactor");
+
+ let (right_profile, right_reactor, right_rx) = {
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
+ };
+ log::trace!("Built right GossipReactor");
+
+ async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
+ profile.read()
+ .await
+ .client()
+ .ipfs
+ .identity()
+ .await
+ .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
+ .and_then(|(peerid, mut addr)| {
+ ipfs::MultiaddrWithPeerId::try_from({
+ addr.pop().expect("At least one address for client")
+ })
+ .map_err(anyhow::Error::from)
+ })
+ }
+
+ let left_running_reactor = tokio::spawn(async move {
+ left_reactor.run().await
+ });
+
+ let right_running_reactor = tokio::spawn(async move {
+ right_reactor.run().await
+ });
+
+ let left_peer_id = get_peer_id(left_profile.clone()).await;
+ log::trace!("Left GossipReactor = {:?}", left_peer_id);
+ assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
+ let left_peer_id = left_peer_id.unwrap();
+
+ let right_peer_id = get_peer_id(right_profile.clone()).await;
+ log::trace!("Right GossipReactor = {:?}", right_peer_id);
+ assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
+ let right_peer_id = right_peer_id.unwrap();
+
+ let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor");
+ right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
+ match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
+ Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
+ Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
+ log::trace!("Right GossipReactor is connected");
+ assert!(true)
+ },
+ Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
+ Ok(None) => assert!(false, "No reply from right reactor received"),
+ }
+ }
+}
diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/reactor/gossip/msg.rs
new file mode 100644
index 0000000..049fc68
--- /dev/null
+++ b/lib/src/reactor/gossip/msg.rs
@@ -0,0 +1,17 @@
+use anyhow::Result;
+
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
+pub enum GossipMessage {
+ CurrentProfileState {
+ peer_id: Vec<u8>,
+ cid: Vec<u8>,
+ },
+}
+
+impl GossipMessage {
+ pub(super) fn into_bytes(self) -> Result<Vec<u8>> {
+ serde_json::to_string(&self)
+ .map(String::into_bytes)
+ .map_err(anyhow::Error::from)
+ }
+}
diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs
new file mode 100644
index 0000000..b54dddc
--- /dev/null
+++ b/lib/src/reactor/gossip/strategy.rs
@@ -0,0 +1,31 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::msg::GossipMessage;
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(&peer_id);
+ let cid = cid::Cid::try_from(&*cid);
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
new file mode 100644
index 0000000..5299851
--- /dev/null
+++ b/lib/src/reactor/mod.rs
@@ -0,0 +1,28 @@
+use std::sync::Arc;
+use std::fmt::Debug;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+
+mod gossip;
+mod device;
+mod account;
+mod ctrl;
+
+use ctrl::ReactorReceiver;
+
+#[async_trait::async_trait]
+pub trait Reactor {
+ type Request: Debug + Send + Sync;
+ type Reply: Debug + Send + Sync;
+
+ async fn run(self) -> Result<()>;
+}
+
+pub trait ReactorBuilder {
+ type Reactor: Reactor;
+
+ fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor;
+}