diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-10 16:48:18 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-10 16:49:00 +0100 |
commit | 6bc7a060fd42c06b76406eb4e8976f449afb59c6 (patch) | |
tree | 0a27d5edf1a15e46c6a00fa63f6bb8c596862956 | |
parent | ec205af0d61ad834bd3033219c2590a7d83fdc9b (diff) |
Add basis for gossip reactor implementation
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | lib/src/reactor/gossip.rs | 51 |
1 files changed, 48 insertions, 3 deletions
diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs index e695dac..998112f 100644 --- a/lib/src/reactor/gossip.rs +++ b/lib/src/reactor/gossip.rs @@ -10,15 +10,60 @@ use anyhow::Result; use tokio::sync::RwLock; use crate::profile::Profile; +use crate::reactor::Reactor; +use crate::reactor::ctrl::ReactorReceiver; +use crate::reactor::ctrl::ReactorReply; +use crate::reactor::ctrl::ReactorRequest; +use crate::reactor::ctrl::ReactorSender; +use crate::reactor::ctrl::ReplyChannel; + #[derive(Debug)] pub struct GossipReactor { - profile: Arc<RwLock<Profile>>, + inner: Reactor<GossipRequest, GossipReply>, +} + +#[derive(Debug)] +pub enum GossipRequest { + Ping, +} + +#[derive(Debug)] +pub enum GossipReply { + Pong, } impl GossipReactor { - pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { - Self { profile } + pub fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<GossipRequest, GossipReply>) { + let (inner, sender) = Reactor::<GossipRequest, GossipReply>::new(profile); + (Self { inner }, sender) + } + + pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)> { + self.inner.receive_next_message().await + } + + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)) -> Result<()> { + match self.inner.process_reactor_message(request).await? { + None => Ok(()), + Some((GossipRequest::Ping, reply_channel)) => { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) { + anyhow::bail!("Failed sening PONG reply") + } + + Ok(()) + } + } + } + + pub async fn run(mut self) -> Result<()> { + loop { + match self.receive_next_message().await { + None => break, + Some(tpl) => self.process_reactor_message(tpl).await?, + } + } + Ok(()) } } |