From 5856b8b59044c5c3dc007a35191931fb13b6aef1 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 17:56:57 +0100 Subject: Add implementation for first GossipMessage Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip.rs | 114 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 6 deletions(-) diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs index ba950db..0da0c97 100644 --- a/lib/src/reactor/gossip.rs +++ b/lib/src/reactor/gossip.rs @@ -21,22 +21,48 @@ use crate::reactor::ctrl::ReplyChannel; #[derive(Debug)] pub struct GossipReactor { inner: Reactor, + gossip_topic_name: String, } #[derive(Debug)] pub enum GossipRequest { Ping, + PublishMe, } #[derive(Debug)] pub enum GossipReply { Pong, + NoHead, + PublishMeResult(Result<()>), } +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum GossipMessage { + CurrentProfileState { + peer_id: Vec, + cid: Vec, + }, +} + +impl GossipMessage { + fn into_bytes(self) -> Result> { + serde_json::to_string(&self) + .map(String::into_bytes) + .map_err(anyhow::Error::from) + } +} + + impl GossipReactor { - pub fn new(profile: Arc>) -> (Self, ReactorSender) { + pub fn new(profile: Arc>, gossip_topic_name: String) -> (Self, ReactorSender) { let (inner, sender) = Reactor::::new(profile); - (Self { inner }, sender) + let reactor = Self { + inner, + gossip_topic_name, + }; + + (reactor, sender) } pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { @@ -52,15 +78,91 @@ impl GossipReactor { } Ok(()) - } + }, + + Some((GossipRequest::PublishMe, reply_channel)) => { + let profile = self.inner.profile(); + let profile = profile.read().await; + + let head = profile.head(); + if head.is_none() { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) { + anyhow::bail!("Failed to send GossipReply::NoHead)") + } + } + let head = head.unwrap().to_bytes(); + + let own_id = match profile.client().own_id().await { + Ok(id) => id, + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") + } else { + return Ok(()) // TODO: abort operation here for now, maybe not the best idea + } + }; + + let publish_res = profile + .client() + .ipfs + .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { + peer_id: own_id.to_bytes(), + cid: head + }.into_bytes()?) + .await; + + match publish_res { + Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))") + } else { + Ok(()) + }, + + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") + } else { + Ok(()) + } + + } + }, } } pub async fn run(mut self) -> Result<()> { + use std::convert::TryFrom; + use futures::stream::StreamExt; + + let mut subscription_stream = self.inner.profile() + .read() + .await + .client() + .ipfs + .pubsub_subscribe(self.gossip_topic_name.clone()) + .await?; + loop { - match self.receive_next_message().await { - None => break, - Some(tpl) => self.process_reactor_message(tpl).await?, + tokio::select! { + next_control_msg = self.receive_next_message() => { + match next_control_msg { + None => break, + Some(tpl) => self.process_reactor_message(tpl).await?, + } + } + + next_gossip_message = subscription_stream.next() => { + if let Some(next_gossip_message) = next_gossip_message { + match serde_json::from_slice(&next_gossip_message.data) { + Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), + Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + log::trace!("Peer {:?} is at {:?}", peer_id, cid) + } + } + } else { + break; + } + } } } Ok(()) -- cgit v1.2.3