summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-17 21:41:03 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-17 22:17:18 +0100
commitdbf9acf5f3cdf9c2c84040f9408872b3ee1e69a0 (patch)
tree7e82ed9021feaff7e870302bb0b35f2c18077778
parent5afe7e6a6b71ad0851f08c56834f9f38412d0f22 (diff)
Rewrite Reactor abstraction to be a trait
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/ctrl.rs13
-rw-r--r--lib/src/reactor/gossip/ctrl.rs2
-rw-r--r--lib/src/reactor/gossip/mod.rs144
-rw-r--r--lib/src/reactor/mod.rs130
4 files changed, 94 insertions, 195 deletions
diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs
index a745394..a32f1c3 100644
--- a/lib/src/reactor/ctrl.rs
+++ b/lib/src/reactor/ctrl.rs
@@ -1,20 +1,15 @@
-use std::fmt::Debug;
-
use tokio::sync::mpsc::UnboundedSender as Sender;
use tokio::sync::mpsc::UnboundedReceiver as Receiver;
-use crate::reactor::ReactorReply;
-use crate::reactor::ReactorRequest;
-
/// Type for sending messages to a reactor
-pub type ReactorSender<CustomRequest, CustomReply> = Sender<(ReactorRequest<CustomRequest>, ReplyChannel<CustomReply>)>;
+pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
/// Type that is used by a reactor for receiving messages
-pub type ReactorReceiver<CustomRequest, CustomReply> = Receiver<(ReactorRequest<CustomRequest>, ReplyChannel<CustomReply>)>;
+pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
/// Type that represents the channel that has to be send with a request to a reactor for getting an
/// answer back
-pub type ReplyChannel<CustomReply> = Sender<ReactorReply<CustomReply>>;
+pub type ReplySender<Reply> = Sender<Reply>;
-pub type ReplyReceiver<CustomReply> = Receiver<ReactorReply<CustomReply>>;
+pub type ReplyReceiver<Reply> = Receiver<Reply>;
diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs
index d65af9c..68fbf06 100644
--- a/lib/src/reactor/gossip/ctrl.rs
+++ b/lib/src/reactor/gossip/ctrl.rs
@@ -2,6 +2,7 @@ use anyhow::Result;
#[derive(Debug)]
pub enum GossipRequest {
+ Exit,
Ping,
PublishMe,
Connect(ipfs::MultiaddrWithPeerId),
@@ -9,6 +10,7 @@ pub enum GossipRequest {
#[derive(Debug)]
pub enum GossipReply {
+ Exiting,
Pong,
NoHead,
PublishMeResult(Result<()>),
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
index 58e8828..689bf37 100644
--- a/lib/src/reactor/gossip/mod.rs
+++ b/lib/src/reactor/gossip/mod.rs
@@ -11,11 +11,10 @@ use tokio::sync::RwLock;
use crate::profile::Profile;
use crate::reactor::Reactor;
-use crate::reactor::ReactorReply;
-use crate::reactor::ReactorRequest;
+use crate::reactor::ReactorBuilder;
use crate::reactor::ctrl::ReactorReceiver;
use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplyChannel;
+use crate::reactor::ctrl::ReplySender;
mod ctrl;
pub use ctrl::GossipRequest;
@@ -25,63 +24,49 @@ mod msg;
pub use msg::GossipMessage;
#[derive(Debug)]
-pub struct GossipReactor {
- inner: Reactor<GossipRequest, GossipReply>,
+pub struct GossipReactorBuilder {
+ profile: Arc<RwLock<Profile>>,
gossip_topic_name: String,
}
+impl GossipReactorBuilder {
+ pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
+ Self { profile, gossip_topic_name }
+ }
+}
-impl GossipReactor {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> (Self, ReactorSender<GossipRequest, GossipReply>) {
- let (inner, sender) = Reactor::<GossipRequest, GossipReply>::new(profile);
- let reactor = Self {
- inner,
- gossip_topic_name,
- };
+impl ReactorBuilder for GossipReactorBuilder {
+ type Reactor = GossipReactor;
- (reactor, sender)
+ fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
+ GossipReactor {
+ running: false,
+ profile: self.profile,
+ gossip_topic_name: self.gossip_topic_name,
+ receiver: rr,
+ }
}
+}
- pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)> {
- self.inner.receive_next_message().await
- }
+#[derive(Debug)]
+pub struct GossipReactor {
+ running: bool,
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+ receiver: ReactorReceiver<GossipRequest, GossipReply>,
+}
- fn send_gossip_reply(channel: ReplyChannel<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(ReactorReply::Custom(reply)) {
+impl GossipReactor {
+ fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
+ if let Err(_) = channel.send(reply) {
anyhow::bail!("Failed to send GossipReply::NoHead)")
}
Ok(())
}
- pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)) -> Result<()> {
- match self.inner.process_reactor_message(request).await? {
- None => Ok(()),
- Some((GossipRequest::Ping, reply_channel)) => {
- if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) {
- anyhow::bail!("Failed sening PONG reply")
- }
-
- Ok(())
- },
-
- Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await,
-
- Some((GossipRequest::Connect(addr), reply_channel)) => {
- let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
- if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
- anyhow::bail!("Failed sending Connect({}) reply", addr)
- }
-
- Ok(())
- },
-
- }
- }
-
- async fn publish_me(&self, reply_channel: ReplyChannel<GossipReply>) -> Result<()> {
- let profile = self.inner.profile();
- let profile = profile.read().await;
+ async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
+ let profile = self.profile.read().await;
let head = profile.head();
if head.is_none() {
@@ -114,13 +99,12 @@ impl GossipReactor {
}
async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
- self.inner.profile().read().await.client().connect(addr).await
+ self.profile.read().await.client().connect(addr).await
}
#[cfg(test)]
async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
- self.inner
- .profile()
+ self.profile
.read()
.await
.client()
@@ -149,11 +133,18 @@ impl GossipReactor {
Ok(())
}
- pub async fn run(mut self) -> Result<()> {
+}
+
+#[async_trait::async_trait]
+impl Reactor for GossipReactor {
+ type Request = GossipRequest;
+ type Reply = GossipReply;
+
+ async fn run(mut self) -> Result<()> {
use futures::stream::StreamExt;
- self.inner.set_running(true);
- let mut subscription_stream = self.inner.profile()
+ self.running = true;
+ let mut subscription_stream = self.profile
.read()
.await
.client()
@@ -163,10 +154,30 @@ impl GossipReactor {
loop {
tokio::select! {
- next_control_msg = self.receive_next_message() => {
+ next_control_msg = self.receiver.recv() => {
match next_control_msg {
None => break,
- Some(tpl) => self.process_reactor_message(tpl).await?,
+ Some((GossipRequest::Exit, reply_channel)) => {
+ if let Err(_) = reply_channel.send(GossipReply::Exiting) {
+ anyhow::bail!("Failed sending EXITING reply")
+ }
+ break
+ },
+
+ Some((GossipRequest::Ping, reply_channel)) => {
+ if let Err(_) = reply_channel.send(GossipReply::Pong) {
+ anyhow::bail!("Failed sending PONG reply")
+ }
+ },
+
+ Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
+
+ Some((GossipRequest::Connect(addr), reply_channel)) => {
+ let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
+ if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
+ anyhow::bail!("Failed sending Connect({}) reply", addr)
+ }
+ },
}
}
@@ -179,7 +190,7 @@ impl GossipReactor {
}
}
- if !self.inner.running() {
+ if !self.running {
break;
}
}
@@ -206,19 +217,20 @@ mod tests {
let profile = Arc::new(RwLock::new(profile.unwrap()));
let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
- let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name);
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- tx.send((ReactorRequest::Ping, reply_sender));
+ rx.send((GossipRequest::Ping, reply_sender));
let mut pong_received = false;
tokio::select! {
reply = reply_receiver.recv() => {
match reply {
- Some(ReactorReply::Pong) => {
+ Some(GossipReply::Pong) => {
pong_received = true;
let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- tx.send((ReactorRequest::Exit, reply_sender));
+ rx.send((GossipRequest::Exit, reply_sender));
}
Some(r) => {
assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
@@ -253,8 +265,9 @@ mod tests {
assert!(profile.is_ok());
let profile = Arc::new(RwLock::new(profile.unwrap()));
- let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name.clone());
- (profile, reactor, tx)
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
};
let (right_profile, right_reactor, right_tx) = {
@@ -262,8 +275,9 @@ mod tests {
assert!(profile.is_ok());
let profile = Arc::new(RwLock::new(profile.unwrap()));
- let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name.clone());
- (profile, reactor, tx)
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
};
async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
@@ -286,11 +300,11 @@ mod tests {
assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
let left_peer_id = left_peer_id.unwrap();
let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- right_tx.send((ReactorRequest::Connect(left_peer_id), right_reply_sender));
+ right_tx.send((GossipRequest::Connect(left_peer_id), right_reply_sender));
if let Some(reply) = right_reply_receiver.recv().await {
match reply {
- ReactorReply::Custom(GossipReply::ConnectResult(Ok(()))) => assert!(true),
+ GossipReply::ConnectResult(Ok(())) => assert!(true),
other => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
}
} else {
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
index 1f8159d..5299851 100644
--- a/lib/src/reactor/mod.rs
+++ b/lib/src/reactor/mod.rs
@@ -11,130 +11,18 @@ mod device;
mod account;
mod ctrl;
-pub use ctrl::ReactorReceiver;
-pub use ctrl::ReactorSender;
-pub use ctrl::ReplyChannel;
+use ctrl::ReactorReceiver;
-/// Send control messages to the reactor
-#[derive(Debug)]
-pub enum ReactorRequest<CustomRequest: Debug + Send + Sync> {
- /// check if the reactor still responds
- Ping,
+#[async_trait::async_trait]
+pub trait Reactor {
+ type Request: Debug + Send + Sync;
+ type Reply: Debug + Send + Sync;
- /// Quit the reactor
- Exit,
-
- Connect(ipfs::MultiaddrWithPeerId),
-
- Custom(CustomRequest),
-}
-
-#[derive(Debug)]
-pub enum ReactorReply<CustomReply: Debug + Send + Sync> {
- Pong,
- Exiting,
-
- ConnectResult((Result<()>, ipfs::MultiaddrWithPeerId)),
-
- Custom(CustomReply),
-}
-
-/// Reactor type, for running the application logic
-///
-/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and
-/// keeping profile updates of other accounts, communication on the gossipsub topics... etc
-#[derive(Debug, getset::Getters, getset::Setters)]
-pub(super) struct Reactor<CustomReactorRequest, CustomReactorReply>
- where CustomReactorRequest: Debug + Send + Sync,
- CustomReactorReply: Debug + Send + Sync
-{
- #[getset(get = "pub", set = "pub")]
- running: bool,
- profile: Arc<RwLock<Profile>>,
- rx: ReactorReceiver<CustomReactorRequest, CustomReactorReply>,
+ async fn run(self) -> Result<()>;
}
-impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, CustomReactorReply>
- where CustomReactorRequest: Debug + Send + Sync,
- CustomReactorReply: Debug + Send + Sync
-{
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<CustomReactorRequest, CustomReactorReply>) {
- let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = Reactor {
- running: true,
- profile,
- rx,
- };
-
- (reactor, tx)
- }
-
- pub async fn head(&self) -> Option<cid::Cid> {
- self.profile.read().await.head().map(cid::Cid::clone)
- }
-
- pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
- self.profile.read().await.connect(peer).await
- }
-
- pub fn profile(&self) -> Arc<RwLock<Profile>> {
- self.profile.clone()
- }
-
- pub async fn exit(self) -> Result<()> {
- let mut inner = self.profile;
- loop {
- match Arc::try_unwrap(inner) {
- Err(arc) => inner = arc,
- Ok(inner) => return inner.into_inner().exit().await,
- }
- }
- }
-
- pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)> {
- self.rx.recv().await
- }
-
- /// Process the request if it is not a specialized request,
- /// return the specialized request if it is one and cannot be processed by this reactor
- /// implementation
- pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)) -> Result<Option<(CustomReactorRequest, ReplyChannel<CustomReactorReply>)>> {
- match request {
- (ReactorRequest::Ping, reply_channel) => {
- if let Err(_) = reply_channel.send(ReactorReply::Pong) {
- anyhow::bail!("Failed sending PONG reply")
- }
- Ok(None)
- },
-
- (ReactorRequest::Exit, reply_channel) => {
- self.running = false;
- if let Err(_) = reply_channel.send(ReactorReply::Exiting) {
- anyhow::bail!("Failed sending EXITING reply")
- }
- Ok(None)
- },
-
- (ReactorRequest::Connect(addr), reply_channel) => {
- match self.profile.read().await.client().connect(addr.clone()).await {
- Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::ConnectResult((Ok(()), addr.clone()))) {
- anyhow::bail!("Failed sending ConnectResult({}, Ok(()))", addr)
- } else {
- Ok(None)
- }
-
- Err(e) => if let Err(_) = reply_channel.send(ReactorReply::ConnectResult((Err(e), addr.clone()))) {
- anyhow::bail!("Failed sending ConnectResult({}, Err(_))", addr)
- } else {
- Ok(None)
- }
- }
- }
-
- (ReactorRequest::Custom(c), reply_channel) => {
- Ok(Some((c, reply_channel)))
- }
- }
- }
+pub trait ReactorBuilder {
+ type Reactor: Reactor;
+ fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor;
}