From 88450e0d8c2ae0046998180469345c0ce7f3a5aa Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 18:11:22 +0100 Subject: Refactor: Move handling of pubsub message to own fn Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 9ba1d05..7056b26 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -103,8 +103,24 @@ impl GossipReactor { } } - pub async fn run(mut self) -> Result<()> { + async fn handle_gossip_message(&self, msg: Arc) -> Result<()> { use std::convert::TryFrom; + + match serde_json::from_slice(&msg.data) { + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + log::trace!("Peer {:?} is at {:?}", peer_id, cid); + + // TODO start dispatched node chain fetching + } + } + + Ok(()) + } + + pub async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; let mut subscription_stream = self.inner.profile() @@ -126,14 +142,7 @@ impl GossipReactor { next_gossip_message = subscription_stream.next() => { if let Some(next_gossip_message) = next_gossip_message { - match serde_json::from_slice(&next_gossip_message.data) { - Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), - Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - log::trace!("Peer {:?} is at {:?}", peer_id, cid) - } - } + self.handle_gossip_message(next_gossip_message).await?; } else { break; } -- cgit v1.2.3