summaryrefslogtreecommitdiffstats
path: root/lib/src/gossip
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/gossip')
-rw-r--r--lib/src/gossip/deserializer.rs8
-rw-r--r--lib/src/gossip/handler.rs8
-rw-r--r--lib/src/gossip/mod.rs7
-rw-r--r--lib/src/gossip/msg.rs2
4 files changed, 12 insertions, 13 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs
index a50644f..dcd5082 100644
--- a/lib/src/gossip/deserializer.rs
+++ b/lib/src/gossip/deserializer.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use anyhow::Result;
use futures::Stream;
use futures::StreamExt;
@@ -19,14 +21,14 @@ impl<ErrStrategy> GossipDeserializer<ErrStrategy>
}
}
- pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage>
- where S: Stream<Item = ipfs::PubsubMessage>
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)>
+ where S: Stream<Item = Arc<ipfs::PubsubMessage>>
{
input.filter_map(|message| async move {
log::trace!("Received gossip message");
match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) {
- Ok(m) => Some(m),
+ Ok(m) => Some((message.source, m)),
Err(e) => {
ErrStrategy::handle_error(e);
None
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
index e524da8..7c9ffa6 100644
--- a/lib/src/gossip/handler.rs
+++ b/lib/src/gossip/handler.rs
@@ -18,14 +18,14 @@ use crate::gossip::GossipMessage;
pub struct GossipHandler<Strategy = LogStrategy>
where Strategy: GossipHandlingStrategy + Sync + Send
{
- profile: Arc<RwLock<Profile>>,
+ profile: Arc<Profile>,
strategy: std::marker::PhantomData<Strategy>,
}
impl<Strat> GossipHandler<Strat>
where Strat: GossipHandlingStrategy + Sync + Send
{
- pub fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ pub fn new(profile: Arc<Profile>) -> Self {
Self {
profile,
strategy: std::marker::PhantomData,
@@ -48,14 +48,14 @@ impl<Strat> GossipHandler<Strat>
#[async_trait::async_trait]
pub trait GossipHandlingStrategy: Sync + Send {
- async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
+ async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
}
pub struct LogStrategy;
#[async_trait::async_trait]
impl GossipHandlingStrategy for LogStrategy {
- async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
+ async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
use std::convert::TryFrom;
use std::ops::Deref;
diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs
index d6a6963..4f4d143 100644
--- a/lib/src/gossip/mod.rs
+++ b/lib/src/gossip/mod.rs
@@ -1,9 +1,6 @@
mod msg;
pub use msg::GossipMessage;
-mod handler;
-pub use handler::*;
-
-mod deserializer;
-pub use deserializer::*;
+pub mod deserializer;
+pub mod handler;
diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs
index 049fc68..f364762 100644
--- a/lib/src/gossip/msg.rs
+++ b/lib/src/gossip/msg.rs
@@ -1,6 +1,6 @@
use anyhow::Result;
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum GossipMessage {
CurrentProfileState {
peer_id: Vec<u8>,