summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-10 16:48:18 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-10 16:49:00 +0100
commit6bc7a060fd42c06b76406eb4e8976f449afb59c6 (patch)
tree0a27d5edf1a15e46c6a00fa63f6bb8c596862956
parentec205af0d61ad834bd3033219c2590a7d83fdc9b (diff)
Add basis for gossip reactor implementation
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/gossip.rs51
1 files changed, 48 insertions, 3 deletions
diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs
index e695dac..998112f 100644
--- a/lib/src/reactor/gossip.rs
+++ b/lib/src/reactor/gossip.rs
@@ -10,15 +10,60 @@ use anyhow::Result;
use tokio::sync::RwLock;
use crate::profile::Profile;
+use crate::reactor::Reactor;
+use crate::reactor::ctrl::ReactorReceiver;
+use crate::reactor::ctrl::ReactorReply;
+use crate::reactor::ctrl::ReactorRequest;
+use crate::reactor::ctrl::ReactorSender;
+use crate::reactor::ctrl::ReplyChannel;
+
#[derive(Debug)]
pub struct GossipReactor {
- profile: Arc<RwLock<Profile>>,
+ inner: Reactor<GossipRequest, GossipReply>,
+}
+
+#[derive(Debug)]
+pub enum GossipRequest {
+ Ping,
+}
+
+#[derive(Debug)]
+pub enum GossipReply {
+ Pong,
}
impl GossipReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- Self { profile }
+ pub fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<GossipRequest, GossipReply>) {
+ let (inner, sender) = Reactor::<GossipRequest, GossipReply>::new(profile);
+ (Self { inner }, sender)
+ }
+
+ pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)> {
+ self.inner.receive_next_message().await
+ }
+
+ pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)) -> Result<()> {
+ match self.inner.process_reactor_message(request).await? {
+ None => Ok(()),
+ Some((GossipRequest::Ping, reply_channel)) => {
+ if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) {
+ anyhow::bail!("Failed sening PONG reply")
+ }
+
+ Ok(())
+ }
+ }
+ }
+
+ pub async fn run(mut self) -> Result<()> {
+ loop {
+ match self.receive_next_message().await {
+ None => break,
+ Some(tpl) => self.process_reactor_message(tpl).await?,
+ }
+ }
+ Ok(())
}
}