From dbf9acf5f3cdf9c2c84040f9408872b3ee1e69a0 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 17 Dec 2021 21:41:03 +0100 Subject: Rewrite Reactor abstraction to be a trait Signed-off-by: Matthias Beyer --- lib/src/reactor/ctrl.rs | 13 ++-- lib/src/reactor/gossip/ctrl.rs | 2 + lib/src/reactor/gossip/mod.rs | 144 ++++++++++++++++++++++------------------- lib/src/reactor/mod.rs | 130 +++---------------------------------- 4 files changed, 94 insertions(+), 195 deletions(-) diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs index a745394..a32f1c3 100644 --- a/lib/src/reactor/ctrl.rs +++ b/lib/src/reactor/ctrl.rs @@ -1,20 +1,15 @@ -use std::fmt::Debug; - use tokio::sync::mpsc::UnboundedSender as Sender; use tokio::sync::mpsc::UnboundedReceiver as Receiver; -use crate::reactor::ReactorReply; -use crate::reactor::ReactorRequest; - /// Type for sending messages to a reactor -pub type ReactorSender = Sender<(ReactorRequest, ReplyChannel)>; +pub type ReactorSender = Sender<(Request, ReplySender)>; /// Type that is used by a reactor for receiving messages -pub type ReactorReceiver = Receiver<(ReactorRequest, ReplyChannel)>; +pub type ReactorReceiver = Receiver<(Request, ReplySender)>; /// Type that represents the channel that has to be send with a request to a reactor for getting an /// answer back -pub type ReplyChannel = Sender>; +pub type ReplySender = Sender; -pub type ReplyReceiver = Receiver>; +pub type ReplyReceiver = Receiver; diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs index d65af9c..68fbf06 100644 --- a/lib/src/reactor/gossip/ctrl.rs +++ b/lib/src/reactor/gossip/ctrl.rs @@ -2,6 +2,7 @@ use anyhow::Result; #[derive(Debug)] pub enum GossipRequest { + Exit, Ping, PublishMe, Connect(ipfs::MultiaddrWithPeerId), @@ -9,6 +10,7 @@ pub enum GossipRequest { #[derive(Debug)] pub enum GossipReply { + Exiting, Pong, NoHead, PublishMeResult(Result<()>), diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 58e8828..689bf37 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -11,11 +11,10 @@ use tokio::sync::RwLock; use crate::profile::Profile; use crate::reactor::Reactor; -use crate::reactor::ReactorReply; -use crate::reactor::ReactorRequest; +use crate::reactor::ReactorBuilder; use crate::reactor::ctrl::ReactorReceiver; use crate::reactor::ctrl::ReactorSender; -use crate::reactor::ctrl::ReplyChannel; +use crate::reactor::ctrl::ReplySender; mod ctrl; pub use ctrl::GossipRequest; @@ -25,63 +24,49 @@ mod msg; pub use msg::GossipMessage; #[derive(Debug)] -pub struct GossipReactor { - inner: Reactor, +pub struct GossipReactorBuilder { + profile: Arc>, gossip_topic_name: String, } +impl GossipReactorBuilder { + pub fn new(profile: Arc>, gossip_topic_name: String) -> Self { + Self { profile, gossip_topic_name } + } +} -impl GossipReactor { - pub fn new(profile: Arc>, gossip_topic_name: String) -> (Self, ReactorSender) { - let (inner, sender) = Reactor::::new(profile); - let reactor = Self { - inner, - gossip_topic_name, - }; +impl ReactorBuilder for GossipReactorBuilder { + type Reactor = GossipReactor; - (reactor, sender) + fn build_with_receiver(self, rr: ReactorReceiver) -> Self::Reactor { + GossipReactor { + running: false, + profile: self.profile, + gossip_topic_name: self.gossip_topic_name, + receiver: rr, + } } +} - pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { - self.inner.receive_next_message().await - } +#[derive(Debug)] +pub struct GossipReactor { + running: bool, + profile: Arc>, + gossip_topic_name: String, + receiver: ReactorReceiver, +} - fn send_gossip_reply(channel: ReplyChannel, reply: GossipReply) -> Result<()> { - if let Err(_) = channel.send(ReactorReply::Custom(reply)) { +impl GossipReactor { + fn send_gossip_reply(channel: ReplySender, reply: GossipReply) -> Result<()> { + if let Err(_) = channel.send(reply) { anyhow::bail!("Failed to send GossipReply::NoHead)") } Ok(()) } - pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result<()> { - match self.inner.process_reactor_message(request).await? { - None => Ok(()), - Some((GossipRequest::Ping, reply_channel)) => { - if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) { - anyhow::bail!("Failed sening PONG reply") - } - - Ok(()) - }, - - Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await, - - Some((GossipRequest::Connect(addr), reply_channel)) => { - let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); - if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { - anyhow::bail!("Failed sending Connect({}) reply", addr) - } - - Ok(()) - }, - - } - } - - async fn publish_me(&self, reply_channel: ReplyChannel) -> Result<()> { - let profile = self.inner.profile(); - let profile = profile.read().await; + async fn publish_me(&self, reply_channel: ReplySender) -> Result<()> { + let profile = self.profile.read().await; let head = profile.head(); if head.is_none() { @@ -114,13 +99,12 @@ impl GossipReactor { } async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { - self.inner.profile().read().await.client().connect(addr).await + self.profile.read().await.client().connect(addr).await } #[cfg(test)] async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result { - self.inner - .profile() + self.profile .read() .await .client() @@ -149,11 +133,18 @@ impl GossipReactor { Ok(()) } - pub async fn run(mut self) -> Result<()> { +} + +#[async_trait::async_trait] +impl Reactor for GossipReactor { + type Request = GossipRequest; + type Reply = GossipReply; + + async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; - self.inner.set_running(true); - let mut subscription_stream = self.inner.profile() + self.running = true; + let mut subscription_stream = self.profile .read() .await .client() @@ -163,10 +154,30 @@ impl GossipReactor { loop { tokio::select! { - next_control_msg = self.receive_next_message() => { + next_control_msg = self.receiver.recv() => { match next_control_msg { None => break, - Some(tpl) => self.process_reactor_message(tpl).await?, + Some((GossipRequest::Exit, reply_channel)) => { + if let Err(_) = reply_channel.send(GossipReply::Exiting) { + anyhow::bail!("Failed sending EXITING reply") + } + break + }, + + Some((GossipRequest::Ping, reply_channel)) => { + if let Err(_) = reply_channel.send(GossipReply::Pong) { + anyhow::bail!("Failed sending PONG reply") + } + }, + + Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?, + + Some((GossipRequest::Connect(addr), reply_channel)) => { + let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); + if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { + anyhow::bail!("Failed sending Connect({}) reply", addr) + } + }, } } @@ -179,7 +190,7 @@ impl GossipReactor { } } - if !self.inner.running() { + if !self.running { break; } } @@ -206,19 +217,20 @@ mod tests { let profile = Arc::new(RwLock::new(profile.unwrap())); let gossip_topic_name = String::from("test-gossip-reactor-simple-topic"); - let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name); + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx); let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - tx.send((ReactorRequest::Ping, reply_sender)); + rx.send((GossipRequest::Ping, reply_sender)); let mut pong_received = false; tokio::select! { reply = reply_receiver.recv() => { match reply { - Some(ReactorReply::Pong) => { + Some(GossipReply::Pong) => { pong_received = true; let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - tx.send((ReactorRequest::Exit, reply_sender)); + rx.send((GossipRequest::Exit, reply_sender)); } Some(r) => { assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); @@ -253,8 +265,9 @@ mod tests { assert!(profile.is_ok()); let profile = Arc::new(RwLock::new(profile.unwrap())); - let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name.clone()); - (profile, reactor, tx) + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); + (profile, reactor, rx) }; let (right_profile, right_reactor, right_tx) = { @@ -262,8 +275,9 @@ mod tests { assert!(profile.is_ok()); let profile = Arc::new(RwLock::new(profile.unwrap())); - let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name.clone()); - (profile, reactor, tx) + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); + (profile, reactor, rx) }; async fn get_peer_id(profile: Arc>) -> Result { @@ -286,11 +300,11 @@ mod tests { assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id); let left_peer_id = left_peer_id.unwrap(); let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - right_tx.send((ReactorRequest::Connect(left_peer_id), right_reply_sender)); + right_tx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)); if let Some(reply) = right_reply_receiver.recv().await { match reply { - ReactorReply::Custom(GossipReply::ConnectResult(Ok(()))) => assert!(true), + GossipReply::ConnectResult(Ok(())) => assert!(true), other => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), } } else { diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index 1f8159d..5299851 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -11,130 +11,18 @@ mod device; mod account; mod ctrl; -pub use ctrl::ReactorReceiver; -pub use ctrl::ReactorSender; -pub use ctrl::ReplyChannel; +use ctrl::ReactorReceiver; -/// Send control messages to the reactor -#[derive(Debug)] -pub enum ReactorRequest { - /// check if the reactor still responds - Ping, +#[async_trait::async_trait] +pub trait Reactor { + type Request: Debug + Send + Sync; + type Reply: Debug + Send + Sync; - /// Quit the reactor - Exit, - - Connect(ipfs::MultiaddrWithPeerId), - - Custom(CustomRequest), -} - -#[derive(Debug)] -pub enum ReactorReply { - Pong, - Exiting, - - ConnectResult((Result<()>, ipfs::MultiaddrWithPeerId)), - - Custom(CustomReply), -} - -/// Reactor type, for running the application logic -/// -/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and -/// keeping profile updates of other accounts, communication on the gossipsub topics... etc -#[derive(Debug, getset::Getters, getset::Setters)] -pub(super) struct Reactor - where CustomReactorRequest: Debug + Send + Sync, - CustomReactorReply: Debug + Send + Sync -{ - #[getset(get = "pub", set = "pub")] - running: bool, - profile: Arc>, - rx: ReactorReceiver, + async fn run(self) -> Result<()>; } -impl Reactor - where CustomReactorRequest: Debug + Send + Sync, - CustomReactorReply: Debug + Send + Sync -{ - pub(super) fn new(profile: Arc>) -> (Self, ReactorSender) { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = Reactor { - running: true, - profile, - rx, - }; - - (reactor, tx) - } - - pub async fn head(&self) -> Option { - self.profile.read().await.head().map(cid::Cid::clone) - } - - pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { - self.profile.read().await.connect(peer).await - } - - pub fn profile(&self) -> Arc> { - self.profile.clone() - } - - pub async fn exit(self) -> Result<()> { - let mut inner = self.profile; - loop { - match Arc::try_unwrap(inner) { - Err(arc) => inner = arc, - Ok(inner) => return inner.into_inner().exit().await, - } - } - } - - pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { - self.rx.recv().await - } - - /// Process the request if it is not a specialized request, - /// return the specialized request if it is one and cannot be processed by this reactor - /// implementation - pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result)>> { - match request { - (ReactorRequest::Ping, reply_channel) => { - if let Err(_) = reply_channel.send(ReactorReply::Pong) { - anyhow::bail!("Failed sending PONG reply") - } - Ok(None) - }, - - (ReactorRequest::Exit, reply_channel) => { - self.running = false; - if let Err(_) = reply_channel.send(ReactorReply::Exiting) { - anyhow::bail!("Failed sending EXITING reply") - } - Ok(None) - }, - - (ReactorRequest::Connect(addr), reply_channel) => { - match self.profile.read().await.client().connect(addr.clone()).await { - Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::ConnectResult((Ok(()), addr.clone()))) { - anyhow::bail!("Failed sending ConnectResult({}, Ok(()))", addr) - } else { - Ok(None) - } - - Err(e) => if let Err(_) = reply_channel.send(ReactorReply::ConnectResult((Err(e), addr.clone()))) { - anyhow::bail!("Failed sending ConnectResult({}, Err(_))", addr) - } else { - Ok(None) - } - } - } - - (ReactorRequest::Custom(c), reply_channel) => { - Ok(Some((c, reply_channel))) - } - } - } +pub trait ReactorBuilder { + type Reactor: Reactor; + fn build_with_receiver(self, rr: ReactorReceiver<::Request, ::Reply>) -> Self::Reactor; } -- cgit v1.2.3