diff options
Diffstat (limited to 'lib/src/gossip')
-rw-r--r-- | lib/src/gossip/deserializer.rs | 8 | ||||
-rw-r--r-- | lib/src/gossip/handler.rs | 8 | ||||
-rw-r--r-- | lib/src/gossip/mod.rs | 7 | ||||
-rw-r--r-- | lib/src/gossip/msg.rs | 2 |
4 files changed, 12 insertions, 13 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs index a50644f..dcd5082 100644 --- a/lib/src/gossip/deserializer.rs +++ b/lib/src/gossip/deserializer.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use futures::Stream; use futures::StreamExt; @@ -19,14 +21,14 @@ impl<ErrStrategy> GossipDeserializer<ErrStrategy> } } - pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage> - where S: Stream<Item = ipfs::PubsubMessage> + pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)> + where S: Stream<Item = Arc<ipfs::PubsubMessage>> { input.filter_map(|message| async move { log::trace!("Received gossip message"); match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { - Ok(m) => Some(m), + Ok(m) => Some((message.source, m)), Err(e) => { ErrStrategy::handle_error(e); None diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs index e524da8..7c9ffa6 100644 --- a/lib/src/gossip/handler.rs +++ b/lib/src/gossip/handler.rs @@ -18,14 +18,14 @@ use crate::gossip::GossipMessage; pub struct GossipHandler<Strategy = LogStrategy> where Strategy: GossipHandlingStrategy + Sync + Send { - profile: Arc<RwLock<Profile>>, + profile: Arc<Profile>, strategy: std::marker::PhantomData<Strategy>, } impl<Strat> GossipHandler<Strat> where Strat: GossipHandlingStrategy + Sync + Send { - pub fn new(profile: Arc<RwLock<Profile>>) -> Self { + pub fn new(profile: Arc<Profile>) -> Self { Self { profile, strategy: std::marker::PhantomData, @@ -48,14 +48,14 @@ impl<Strat> GossipHandler<Strat> #[async_trait::async_trait] pub trait GossipHandlingStrategy: Sync + Send { - async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; + async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; } pub struct LogStrategy; #[async_trait::async_trait] impl GossipHandlingStrategy for LogStrategy { - async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { + async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { use std::convert::TryFrom; use std::ops::Deref; diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs index d6a6963..4f4d143 100644 --- a/lib/src/gossip/mod.rs +++ b/lib/src/gossip/mod.rs @@ -1,9 +1,6 @@ mod msg; pub use msg::GossipMessage; -mod handler; -pub use handler::*; - -mod deserializer; -pub use deserializer::*; +pub mod deserializer; +pub mod handler; diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs index 049fc68..f364762 100644 --- a/lib/src/gossip/msg.rs +++ b/lib/src/gossip/msg.rs @@ -1,6 +1,6 @@ use anyhow::Result; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum GossipMessage { CurrentProfileState { peer_id: Vec<u8>, |