diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-10 18:11:22 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-10 18:12:00 +0100 |
commit | 88450e0d8c2ae0046998180469345c0ce7f3a5aa (patch) | |
tree | 3f29bd12a87be97a2513ba2ff5d01cc6fd9fbcf1 | |
parent | 80caa9e67cdb64b79328acc74d0131789fc47c6d (diff) |
Refactor: Move handling of pubsub message to own fn
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 9ba1d05..7056b26 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -103,8 +103,24 @@ impl GossipReactor { } } - pub async fn run(mut self) -> Result<()> { + async fn handle_gossip_message(&self, msg: Arc<ipfs::PubsubMessage>) -> Result<()> { use std::convert::TryFrom; + + match serde_json::from_slice(&msg.data) { + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + log::trace!("Peer {:?} is at {:?}", peer_id, cid); + + // TODO start dispatched node chain fetching + } + } + + Ok(()) + } + + pub async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; let mut subscription_stream = self.inner.profile() @@ -126,14 +142,7 @@ impl GossipReactor { next_gossip_message = subscription_stream.next() => { if let Some(next_gossip_message) = next_gossip_message { - match serde_json::from_slice(&next_gossip_message.data) { - Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), - Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - log::trace!("Peer {:?} is at {:?}", peer_id, cid) - } - } + self.handle_gossip_message(next_gossip_message).await?; } else { break; } |