diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 12:00:25 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 16:35:48 +0100 |
commit | 013a4203cc141e2ed4484b14b88cc0687c8ac318 (patch) | |
tree | b75619d2a74728cda1aa7a0cc45dbb0844d3dc33 | |
parent | 1ff0d68c37eaa20af77e15cd64d5625b1c4ef830 (diff) |
Move strategy for handling of gossip message to own types
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 46 | ||||
-rw-r--r-- | lib/src/reactor/gossip/strategy.rs | 31 |
2 files changed, 54 insertions, 23 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 9567ccf..c8a7d28 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -23,6 +23,10 @@ pub use ctrl::GossipReply; mod msg; pub use msg::GossipMessage; +mod strategy; +pub use strategy::GossipHandlingStrategy; +pub use strategy::LogStrategy; + #[derive(Debug)] pub struct GossipReactorBuilder { profile: Arc<RwLock<Profile>>, @@ -43,23 +47,31 @@ impl ReactorBuilder for GossipReactorBuilder { profile: self.profile, gossip_topic_name: self.gossip_topic_name, receiver: rr, + strategy: std::marker::PhantomData, } } } -pub struct GossipReactor { +pub struct GossipReactor<Strategy = LogStrategy> + where Strategy: GossipHandlingStrategy + Sync + Send +{ profile: Arc<RwLock<Profile>>, gossip_topic_name: String, receiver: ReactorReceiver<GossipRequest, GossipReply>, + strategy: std::marker::PhantomData<Strategy>, } -impl std::fmt::Debug for GossipReactor { +impl<S> std::fmt::Debug for GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) } } -impl GossipReactor { +impl<S> GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> { if let Err(_) = channel.send(reply) { anyhow::bail!("Failed to send GossipReply::NoHead)") @@ -120,27 +132,12 @@ impl GossipReactor { }) } - async fn handle_gossip_message(&self, msg: Arc<ipfs::PubsubMessage>) -> Result<()> { - use std::convert::TryFrom; - - match serde_json::from_slice(&msg.data) { - Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), - Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - log::trace!("Peer {:?} is at {:?}", peer_id, cid); - - // TODO start dispatched node chain fetching - } - } - - Ok(()) - } - } #[async_trait::async_trait] -impl Reactor for GossipReactor { +impl<S> Reactor for GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ type Request = GossipRequest; type Reply = GossipReply; @@ -189,9 +186,12 @@ impl Reactor for GossipReactor { } next_gossip_message = subscription_stream.next() => { - if let Some(next_gossip_message) = next_gossip_message { + if let Some(msg) = next_gossip_message { log::trace!("Received gossip message"); - self.handle_gossip_message(next_gossip_message).await?; + match serde_json::from_slice(&msg.data) { + Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?, + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + } } else { log::trace!("Gossip stream closed, breaking reactor loop"); break; diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs new file mode 100644 index 0000000..b54dddc --- /dev/null +++ b/lib/src/reactor/gossip/strategy.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::msg::GossipMessage; + +#[async_trait::async_trait] +pub trait GossipHandlingStrategy: Sync + Send { + async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>; +} + +pub struct LogStrategy; + +#[async_trait::async_trait] +impl GossipHandlingStrategy for LogStrategy { + async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> { + use std::convert::TryFrom; + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + + log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); + } + } + + Ok(()) + } +} |