summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-18 12:00:25 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 16:35:48 +0100
commit013a4203cc141e2ed4484b14b88cc0687c8ac318 (patch)
treeb75619d2a74728cda1aa7a0cc45dbb0844d3dc33
parent1ff0d68c37eaa20af77e15cd64d5625b1c4ef830 (diff)
Move strategy for handling of gossip message to own types
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/gossip/mod.rs46
-rw-r--r--lib/src/reactor/gossip/strategy.rs31
2 files changed, 54 insertions, 23 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
index 9567ccf..c8a7d28 100644
--- a/lib/src/reactor/gossip/mod.rs
+++ b/lib/src/reactor/gossip/mod.rs
@@ -23,6 +23,10 @@ pub use ctrl::GossipReply;
mod msg;
pub use msg::GossipMessage;
+mod strategy;
+pub use strategy::GossipHandlingStrategy;
+pub use strategy::LogStrategy;
+
#[derive(Debug)]
pub struct GossipReactorBuilder {
profile: Arc<RwLock<Profile>>,
@@ -43,23 +47,31 @@ impl ReactorBuilder for GossipReactorBuilder {
profile: self.profile,
gossip_topic_name: self.gossip_topic_name,
receiver: rr,
+ strategy: std::marker::PhantomData,
}
}
}
-pub struct GossipReactor {
+pub struct GossipReactor<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
profile: Arc<RwLock<Profile>>,
gossip_topic_name: String,
receiver: ReactorReceiver<GossipRequest, GossipReply>,
+ strategy: std::marker::PhantomData<Strategy>,
}
-impl std::fmt::Debug for GossipReactor {
+impl<S> std::fmt::Debug for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
}
}
-impl GossipReactor {
+impl<S> GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
if let Err(_) = channel.send(reply) {
anyhow::bail!("Failed to send GossipReply::NoHead)")
@@ -120,27 +132,12 @@ impl GossipReactor {
})
}
- async fn handle_gossip_message(&self, msg: Arc<ipfs::PubsubMessage>) -> Result<()> {
- use std::convert::TryFrom;
-
- match serde_json::from_slice(&msg.data) {
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
- Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => {
- let peer_id = ipfs::PeerId::from_bytes(&peer_id);
- let cid = cid::Cid::try_from(&*cid);
- log::trace!("Peer {:?} is at {:?}", peer_id, cid);
-
- // TODO start dispatched node chain fetching
- }
- }
-
- Ok(())
- }
-
}
#[async_trait::async_trait]
-impl Reactor for GossipReactor {
+impl<S> Reactor for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
type Request = GossipRequest;
type Reply = GossipReply;
@@ -189,9 +186,12 @@ impl Reactor for GossipReactor {
}
next_gossip_message = subscription_stream.next() => {
- if let Some(next_gossip_message) = next_gossip_message {
+ if let Some(msg) = next_gossip_message {
log::trace!("Received gossip message");
- self.handle_gossip_message(next_gossip_message).await?;
+ match serde_json::from_slice(&msg.data) {
+ Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
+ }
} else {
log::trace!("Gossip stream closed, breaking reactor loop");
break;
diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs
new file mode 100644
index 0000000..b54dddc
--- /dev/null
+++ b/lib/src/reactor/gossip/strategy.rs
@@ -0,0 +1,31 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::msg::GossipMessage;
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(&peer_id);
+ let cid = cid::Cid::try_from(&*cid);
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}