summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:04:31 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:14:37 +0100
commit542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 (patch)
tree89905044c44e1af21ab79b150b85011f96efa8fe
parent7675eba9244475c3847fd48eb13b5d8b54cf271c (diff)
Shrink idea of "Reactors"
A Reactor can be waaay less complex if we simply use it as "map"-helper for mapping over `Stream`s. If we map over a stream of Vec<u8> and deserialize them to GossipMessages in one step, and handle them appropriately in the next step, it is way less complex to implement these things and we do not have to care about this whole "how do I shut down the thing" because we can simply drop() everything and let the destructors do their job. This patch removes the Reactor nonsense. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/gossip/deserializer.rs55
-rw-r--r--lib/src/gossip/handler.rs73
-rw-r--r--lib/src/gossip/mod.rs9
-rw-r--r--lib/src/gossip/msg.rs (renamed from lib/src/reactor/gossip/msg.rs)0
-rw-r--r--lib/src/lib.rs2
-rw-r--r--lib/src/reactor/account.rs19
-rw-r--r--lib/src/reactor/ctrl.rs15
-rw-r--r--lib/src/reactor/device.rs19
-rw-r--r--lib/src/reactor/gossip/ctrl.rs19
-rw-r--r--lib/src/reactor/gossip/mod.rs329
-rw-r--r--lib/src/reactor/gossip/strategy.rs31
-rw-r--r--lib/src/reactor/mod.rs28
12 files changed, 138 insertions, 461 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs
new file mode 100644
index 0000000..a50644f
--- /dev/null
+++ b/lib/src/gossip/deserializer.rs
@@ -0,0 +1,55 @@
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+
+use crate::gossip::GossipMessage;
+
+pub struct GossipDeserializer<ErrStrategy = LogStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ strategy: std::marker::PhantomData<ErrStrategy>,
+}
+
+impl<ErrStrategy> GossipDeserializer<ErrStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ pub fn new() -> Self {
+ Self {
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage>
+ where S: Stream<Item = ipfs::PubsubMessage>
+ {
+ input.filter_map(|message| async move {
+ log::trace!("Received gossip message");
+
+ match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) {
+ Ok(m) => Some(m),
+ Err(e) => {
+ ErrStrategy::handle_error(e);
+ None
+ }
+ }
+ })
+ }
+}
+
+pub trait GossipDeserializerErrorStrategy {
+ fn handle_error(err: anyhow::Error);
+}
+
+pub struct LogStrategy;
+impl GossipDeserializerErrorStrategy for LogStrategy {
+ fn handle_error(err: anyhow::Error) {
+ log::trace!("Error: {}", err);
+ }
+}
+
+pub struct IgnoreStrategy;
+impl GossipDeserializerErrorStrategy for IgnoreStrategy {
+ fn handle_error(_: anyhow::Error) {
+ ()
+ }
+}
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
new file mode 100644
index 0000000..e524da8
--- /dev/null
+++ b/lib/src/gossip/handler.rs
@@ -0,0 +1,73 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::gossip::GossipMessage;
+
+#[derive(Debug)]
+pub struct GossipHandler<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<RwLock<Profile>>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<Strat> GossipHandler<Strat>
+ where Strat: GossipHandlingStrategy + Sync + Send
+{
+ pub fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ Self {
+ profile,
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)>
+ where S: Stream<Item = (ipfs::PeerId, GossipMessage)>
+ {
+ input.then(move |(source, msg)| {
+ let pr = self.profile.clone();
+ async move {
+ log::trace!("Received gossip message from {}: {:?}", source, msg);
+ let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await;
+ (msg, res)
+ }
+ })
+ }
+}
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ use std::ops::Deref;
+
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(peer_id);
+ let cid = cid::Cid::try_from(cid.deref());
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs
new file mode 100644
index 0000000..d6a6963
--- /dev/null
+++ b/lib/src/gossip/mod.rs
@@ -0,0 +1,9 @@
+mod msg;
+pub use msg::GossipMessage;
+
+mod handler;
+pub use handler::*;
+
+mod deserializer;
+pub use deserializer::*;
+
diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/gossip/msg.rs
index 049fc68..049fc68 100644
--- a/lib/src/reactor/gossip/msg.rs
+++ b/lib/src/gossip/msg.rs
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index b7b05e2..e836c37 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -4,4 +4,4 @@ pub mod ipfs_client;
pub mod profile;
pub mod stream;
pub mod types;
-pub mod reactor;
+pub mod gossip;
diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs
deleted file mode 100644
index 59913b5..0000000
--- a/lib/src/reactor/account.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for account handling (following accounts, caching account updates) using the gossip
-//! module for the lower-level handling
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-
-#[derive(Debug)]
-pub struct AccountReactor(GossipReactor);
-
-impl AccountReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
-}
diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs
deleted file mode 100644
index a32f1c3..0000000
--- a/lib/src/reactor/ctrl.rs
+++ /dev/null
@@ -1,15 +0,0 @@
-use tokio::sync::mpsc::UnboundedSender as Sender;
-use tokio::sync::mpsc::UnboundedReceiver as Receiver;
-
-/// Type for sending messages to a reactor
-pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
-
-/// Type that is used by a reactor for receiving messages
-pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
-
-/// Type that represents the channel that has to be send with a request to a reactor for getting an
-/// answer back
-pub type ReplySender<Reply> = Sender<Reply>;
-
-pub type ReplyReceiver<Reply> = Receiver<Reply>;
-
diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs
deleted file mode 100644
index 1014ca1..0000000
--- a/lib/src/reactor/device.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for multi-device support functionality,
-//! which uses the gossip module for the lower-level handling
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-
-#[derive(Debug)]
-pub struct DeviceReactor(GossipReactor);
-
-impl DeviceReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
-}
diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs
deleted file mode 100644
index 68fbf06..0000000
--- a/lib/src/reactor/gossip/ctrl.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-use anyhow::Result;
-
-#[derive(Debug)]
-pub enum GossipRequest {
- Exit,
- Ping,
- PublishMe,
- Connect(ipfs::MultiaddrWithPeerId),
-}
-
-#[derive(Debug)]
-pub enum GossipReply {
- Exiting,
- Pong,
- NoHead,
- PublishMeResult(Result<()>),
- ConnectResult(Result<()>),
-}
-
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
deleted file mode 100644
index 7658509..0000000
--- a/lib/src/reactor/gossip/mod.rs
+++ /dev/null
@@ -1,329 +0,0 @@
-//! Low-level module for gossip'ing code
-//!
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-//!
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::Reactor;
-use crate::reactor::ReactorBuilder;
-use crate::reactor::ctrl::ReactorReceiver;
-use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplySender;
-
-mod ctrl;
-pub use ctrl::GossipRequest;
-pub use ctrl::GossipReply;
-
-mod msg;
-pub use msg::GossipMessage;
-
-mod strategy;
-pub use strategy::GossipHandlingStrategy;
-pub use strategy::LogStrategy;
-
-#[derive(Debug)]
-pub struct GossipReactorBuilder {
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
-}
-
-impl GossipReactorBuilder {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
- Self { profile, gossip_topic_name }
- }
-}
-
-impl ReactorBuilder for GossipReactorBuilder {
- type Reactor = GossipReactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
- GossipReactor {
- profile: self.profile,
- gossip_topic_name: self.gossip_topic_name,
- receiver: rr,
- strategy: std::marker::PhantomData,
- }
- }
-}
-
-pub struct GossipReactor<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
-{
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
- receiver: ReactorReceiver<GossipRequest, GossipReply>,
- strategy: std::marker::PhantomData<Strategy>,
-}
-
-impl<S> std::fmt::Debug for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
- }
-}
-
-impl<S> GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(reply) {
- anyhow::bail!("Failed to send GossipReply::NoHead)")
- }
-
- Ok(())
- }
-
- async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
- let profile = self.profile.read().await;
-
- let head = profile.head();
- if head.is_none() {
- Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
- return Ok(())
- }
- let head = head.unwrap().to_bytes();
-
- let own_id = match profile.client().own_id().await {
- Ok(id) => id,
- Err(e) => {
- Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
- return Ok(()) // TODO: abort operation here for now, maybe not the best idea
- }
- };
-
- let publish_res = profile
- .client()
- .ipfs
- .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
- peer_id: own_id.to_bytes(),
- cid: head
- }.into_bytes()?)
- .await;
-
- match publish_res {
- Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
- Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
- }
- }
-
- async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
- log::trace!("Connecting GossipReactor with {:?}", addr);
- self.profile.read().await.client().connect(addr).await
- }
-
- #[cfg(test)]
- async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
- self.profile
- .read()
- .await
- .client()
- .ipfs
- .peers()
- .await
- .map(|connections| {
- connections.iter().any(|connection| connection.addr == addr)
- })
- }
-
-}
-
-#[async_trait::async_trait]
-impl<S> Reactor for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- type Request = GossipRequest;
- type Reply = GossipReply;
-
- async fn run(mut self) -> Result<()> {
- use futures::stream::StreamExt;
-
- log::trace!("Booting {:?}", self);
- let mut subscription_stream = self.profile
- .read()
- .await
- .client()
- .ipfs
- .pubsub_subscribe(self.gossip_topic_name.clone())
- .await?;
-
- log::trace!("{:?} main loop", self);
- loop {
- tokio::select! {
- next_control_msg = self.receiver.recv() => {
- log::trace!("Received control message: {:?}", next_control_msg);
- match next_control_msg {
- None => break,
- Some((GossipRequest::Exit, reply_channel)) => {
- if let Err(_) = reply_channel.send(GossipReply::Exiting) {
- anyhow::bail!("Failed sending EXITING reply")
- }
- break
- },
-
- Some((GossipRequest::Ping, reply_channel)) => {
- log::trace!("Replying with Pong");
- if let Err(_) = reply_channel.send(GossipReply::Pong) {
- anyhow::bail!("Failed sending PONG reply")
- }
- },
-
- Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
-
- Some((GossipRequest::Connect(addr), reply_channel)) => {
- let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
- if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
- anyhow::bail!("Failed sending Connect({}) reply", addr)
- }
- },
- }
- }
-
- next_gossip_message = subscription_stream.next() => {
- if let Some(msg) = next_gossip_message {
- log::trace!("Received gossip message");
- match serde_json::from_slice(&msg.data) {
- Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
- }
- } else {
- log::trace!("Gossip stream closed, breaking reactor loop");
- break;
- }
- }
- }
- }
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use std::convert::TryFrom;
- use std::sync::Arc;
- use tokio::sync::RwLock;
-
- #[tokio::test]
- async fn test_gossip_reactor_simple() {
- let _ = env_logger::try_init();
-
- let profile = Profile::new_inmemory("test-gossip-reactor-simple").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
-
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Ping, reply_sender)).unwrap();
-
- let mut pong_received = false;
- let _ = tokio::spawn(async move {
- reactor.run().await
- });
-
- match reply_receiver.recv().await {
- Some(GossipReply::Pong) => {
- pong_received = true;
- log::trace!("Pong received!");
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Exit, reply_sender)).unwrap();
- }
- Some(r) => {
- assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
- }
- None => {
- // nothing
- }
- }
-
- assert!(pong_received, "No pong received");
- }
-
- #[tokio::test]
- async fn test_gossip_reactor_gossipping() {
- let _ = env_logger::try_init();
-
- let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
- let (left_profile, left_reactor, left_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built left GossipReactor");
-
- let (right_profile, right_reactor, right_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built right GossipReactor");
-
- async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
- profile.read()
- .await
- .client()
- .ipfs
- .identity()
- .await
- .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
- .and_then(|(peerid, mut addr)| {
- ipfs::MultiaddrWithPeerId::try_from({
- addr.pop().expect("At least one address for client")
- })
- .map_err(anyhow::Error::from)
- })
- }
-
- let left_running_reactor = tokio::spawn(async move {
- left_reactor.run().await
- });
-
- let right_running_reactor = tokio::spawn(async move {
- right_reactor.run().await
- });
-
- let left_peer_id = get_peer_id(left_profile.clone()).await;
- log::trace!("Left GossipReactor = {:?}", left_peer_id);
- assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
- let left_peer_id = left_peer_id.unwrap();
-
- let right_peer_id = get_peer_id(right_profile.clone()).await;
- log::trace!("Right GossipReactor = {:?}", right_peer_id);
- assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
- let right_peer_id = right_peer_id.unwrap();
-
- let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor");
- right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
- match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
- Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
- Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
- log::trace!("Right GossipReactor is connected");
- assert!(true)
- },
- Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
- Ok(None) => assert!(false, "No reply from right reactor received"),
- }
- }
-}
diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs
deleted file mode 100644
index 6fe9d1a..0000000
--- a/lib/src/reactor/gossip/strategy.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::msg::GossipMessage;
-
-#[async_trait::async_trait]
-pub trait GossipHandlingStrategy: Sync + Send {
- async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>;
-}
-
-pub struct LogStrategy;
-
-#[async_trait::async_trait]
-impl GossipHandlingStrategy for LogStrategy {
- async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> {
- use std::convert::TryFrom;
- match msg {
- GossipMessage::CurrentProfileState { peer_id, cid } => {
- let peer_id = ipfs::PeerId::from_bytes(&peer_id);
- let cid = cid::Cid::try_from(&*cid);
-
- log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
- }
- }
-
- Ok(())
- }
-}
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
deleted file mode 100644
index 5299851..0000000
--- a/lib/src/reactor/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-use std::sync::Arc;
-use std::fmt::Debug;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-
-mod gossip;
-mod device;
-mod account;
-mod ctrl;
-
-use ctrl::ReactorReceiver;
-
-#[async_trait::async_trait]
-pub trait Reactor {
- type Request: Debug + Send + Sync;
- type Reply: Debug + Send + Sync;
-
- async fn run(self) -> Result<()>;
-}
-
-pub trait ReactorBuilder {
- type Reactor: Reactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor;
-}