diff options
-rw-r--r-- | lib/src/reactor/gossip.rs | 51 |
1 files changed, 48 insertions, 3 deletions
diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs index e695dac..998112f 100644 --- a/lib/src/reactor/gossip.rs +++ b/lib/src/reactor/gossip.rs @@ -10,15 +10,60 @@ use anyhow::Result; use tokio::sync::RwLock; use crate::profile::Profile; +use crate::reactor::Reactor; +use crate::reactor::ctrl::ReactorReceiver; +use crate::reactor::ctrl::ReactorReply; +use crate::reactor::ctrl::ReactorRequest; +use crate::reactor::ctrl::ReactorSender; +use crate::reactor::ctrl::ReplyChannel; + #[derive(Debug)] pub struct GossipReactor { - profile: Arc<RwLock<Profile>>, + inner: Reactor<GossipRequest, GossipReply>, +} + +#[derive(Debug)] +pub enum GossipRequest { + Ping, +} + +#[derive(Debug)] +pub enum GossipReply { + Pong, } impl GossipReactor { - pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { - Self { profile } + pub fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<GossipRequest, GossipReply>) { + let (inner, sender) = Reactor::<GossipRequest, GossipReply>::new(profile); + (Self { inner }, sender) + } + + pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)> { + self.inner.receive_next_message().await + } + + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)) -> Result<()> { + match self.inner.process_reactor_message(request).await? { + None => Ok(()), + Some((GossipRequest::Ping, reply_channel)) => { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) { + anyhow::bail!("Failed sening PONG reply") + } + + Ok(()) + } + } + } + + pub async fn run(mut self) -> Result<()> { + loop { + match self.receive_next_message().await { + None => break, + Some(tpl) => self.process_reactor_message(tpl).await?, + } + } + Ok(()) } } |