summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-10 18:11:22 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-10 18:12:00 +0100
commit88450e0d8c2ae0046998180469345c0ce7f3a5aa (patch)
tree3f29bd12a87be97a2513ba2ff5d01cc6fd9fbcf1
parent80caa9e67cdb64b79328acc74d0131789fc47c6d (diff)
Refactor: Move handling of pubsub message to own fn
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/gossip/mod.rs27
1 files changed, 18 insertions, 9 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
index 9ba1d05..7056b26 100644
--- a/lib/src/reactor/gossip/mod.rs
+++ b/lib/src/reactor/gossip/mod.rs
@@ -103,8 +103,24 @@ impl GossipReactor {
}
}
- pub async fn run(mut self) -> Result<()> {
+ async fn handle_gossip_message(&self, msg: Arc<ipfs::PubsubMessage>) -> Result<()> {
use std::convert::TryFrom;
+
+ match serde_json::from_slice(&msg.data) {
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
+ Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => {
+ let peer_id = ipfs::PeerId::from_bytes(&peer_id);
+ let cid = cid::Cid::try_from(&*cid);
+ log::trace!("Peer {:?} is at {:?}", peer_id, cid);
+
+ // TODO start dispatched node chain fetching
+ }
+ }
+
+ Ok(())
+ }
+
+ pub async fn run(mut self) -> Result<()> {
use futures::stream::StreamExt;
let mut subscription_stream = self.inner.profile()
@@ -126,14 +142,7 @@ impl GossipReactor {
next_gossip_message = subscription_stream.next() => {
if let Some(next_gossip_message) = next_gossip_message {
- match serde_json::from_slice(&next_gossip_message.data) {
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source),
- Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => {
- let peer_id = ipfs::PeerId::from_bytes(&peer_id);
- let cid = cid::Cid::try_from(&*cid);
- log::trace!("Peer {:?} is at {:?}", peer_id, cid)
- }
- }
+ self.handle_gossip_message(next_gossip_message).await?;
} else {
break;
}