From 013a4203cc141e2ed4484b14b88cc0687c8ac318 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 18 Dec 2021 12:00:25 +0100 Subject: Move strategy for handling of gossip message to own types Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 46 +++++++++++++++++++------------------- lib/src/reactor/gossip/strategy.rs | 31 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 23 deletions(-) create mode 100644 lib/src/reactor/gossip/strategy.rs (limited to 'lib') diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 9567ccf..c8a7d28 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -23,6 +23,10 @@ pub use ctrl::GossipReply; mod msg; pub use msg::GossipMessage; +mod strategy; +pub use strategy::GossipHandlingStrategy; +pub use strategy::LogStrategy; + #[derive(Debug)] pub struct GossipReactorBuilder { profile: Arc>, @@ -43,23 +47,31 @@ impl ReactorBuilder for GossipReactorBuilder { profile: self.profile, gossip_topic_name: self.gossip_topic_name, receiver: rr, + strategy: std::marker::PhantomData, } } } -pub struct GossipReactor { +pub struct GossipReactor + where Strategy: GossipHandlingStrategy + Sync + Send +{ profile: Arc>, gossip_topic_name: String, receiver: ReactorReceiver, + strategy: std::marker::PhantomData, } -impl std::fmt::Debug for GossipReactor { +impl std::fmt::Debug for GossipReactor + where S: GossipHandlingStrategy + Sync + Send +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) } } -impl GossipReactor { +impl GossipReactor + where S: GossipHandlingStrategy + Sync + Send +{ fn send_gossip_reply(channel: ReplySender, reply: GossipReply) -> Result<()> { if let Err(_) = channel.send(reply) { anyhow::bail!("Failed to send GossipReply::NoHead)") @@ -120,27 +132,12 @@ impl GossipReactor { }) } - async fn handle_gossip_message(&self, msg: Arc) -> Result<()> { - use std::convert::TryFrom; - - match serde_json::from_slice(&msg.data) { - Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), - Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - log::trace!("Peer {:?} is at {:?}", peer_id, cid); - - // TODO start dispatched node chain fetching - } - } - - Ok(()) - } - } #[async_trait::async_trait] -impl Reactor for GossipReactor { +impl Reactor for GossipReactor + where S: GossipHandlingStrategy + Sync + Send +{ type Request = GossipRequest; type Reply = GossipReply; @@ -189,9 +186,12 @@ impl Reactor for GossipReactor { } next_gossip_message = subscription_stream.next() => { - if let Some(next_gossip_message) = next_gossip_message { + if let Some(msg) = next_gossip_message { log::trace!("Received gossip message"); - self.handle_gossip_message(next_gossip_message).await?; + match serde_json::from_slice(&msg.data) { + Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?, + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + } } else { log::trace!("Gossip stream closed, breaking reactor loop"); break; diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs new file mode 100644 index 0000000..b54dddc --- /dev/null +++ b/lib/src/reactor/gossip/strategy.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::msg::GossipMessage; + +#[async_trait::async_trait] +pub trait GossipHandlingStrategy: Sync + Send { + async fn handle_gossip_message(profile: Arc>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>; +} + +pub struct LogStrategy; + +#[async_trait::async_trait] +impl GossipHandlingStrategy for LogStrategy { + async fn handle_gossip_message(profile: Arc>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> { + use std::convert::TryFrom; + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + + log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); + } + } + + Ok(()) + } +} -- cgit v1.2.3