summaryrefslogtreecommitdiffstats
path: root/lib/src/gossip/handler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/gossip/handler.rs')
-rw-r--r--lib/src/gossip/handler.rs73
1 files changed, 73 insertions, 0 deletions
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
new file mode 100644
index 0000000..7c9ffa6
--- /dev/null
+++ b/lib/src/gossip/handler.rs
@@ -0,0 +1,73 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::gossip::GossipMessage;
+
+#[derive(Debug)]
+pub struct GossipHandler<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<Profile>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<Strat> GossipHandler<Strat>
+ where Strat: GossipHandlingStrategy + Sync + Send
+{
+ pub fn new(profile: Arc<Profile>) -> Self {
+ Self {
+ profile,
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)>
+ where S: Stream<Item = (ipfs::PeerId, GossipMessage)>
+ {
+ input.then(move |(source, msg)| {
+ let pr = self.profile.clone();
+ async move {
+ log::trace!("Received gossip message from {}: {:?}", source, msg);
+ let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await;
+ (msg, res)
+ }
+ })
+ }
+}
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ use std::ops::Deref;
+
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(peer_id);
+ let cid = cid::Cid::try_from(cid.deref());
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}