summaryrefslogtreecommitdiffstats
path: root/lib/src/reactor/gossip/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/reactor/gossip/mod.rs')
-rw-r--r--lib/src/reactor/gossip/mod.rs148
1 files changed, 148 insertions, 0 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
new file mode 100644
index 0000000..884f42a
--- /dev/null
+++ b/lib/src/reactor/gossip/mod.rs
@@ -0,0 +1,148 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::Reactor;
+use crate::reactor::ReactorReply;
+use crate::reactor::ReactorRequest;
+use crate::reactor::ctrl::ReactorReceiver;
+use crate::reactor::ctrl::ReactorSender;
+use crate::reactor::ctrl::ReplyChannel;
+
+mod ctrl;
+pub use ctrl::GossipRequest;
+pub use ctrl::GossipReply;
+
+mod msg;
+pub use msg::GossipMessage;
+
+#[derive(Debug)]
+pub struct GossipReactor {
+ inner: Reactor<GossipRequest, GossipReply>,
+ gossip_topic_name: String,
+}
+
+
+impl GossipReactor {
+ pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> (Self, ReactorSender<GossipRequest, GossipReply>) {
+ let (inner, sender) = Reactor::<GossipRequest, GossipReply>::new(profile);
+ let reactor = Self {
+ inner,
+ gossip_topic_name,
+ };
+
+ (reactor, sender)
+ }
+
+ pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)> {
+ self.inner.receive_next_message().await
+ }
+
+ pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)) -> Result<()> {
+ match self.inner.process_reactor_message(request).await? {
+ None => Ok(()),
+ Some((GossipRequest::Ping, reply_channel)) => {
+ if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) {
+ anyhow::bail!("Failed sening PONG reply")
+ }
+
+ Ok(())
+ },
+
+ Some((GossipRequest::PublishMe, reply_channel)) => {
+ let profile = self.inner.profile();
+ let profile = profile.read().await;
+
+ let head = profile.head();
+ if head.is_none() {
+ if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) {
+ anyhow::bail!("Failed to send GossipReply::NoHead)")
+ }
+ }
+ let head = head.unwrap().to_bytes();
+
+ let own_id = match profile.client().own_id().await {
+ Ok(id) => id,
+ Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) {
+ anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))")
+ } else {
+ return Ok(()) // TODO: abort operation here for now, maybe not the best idea
+ }
+ };
+
+ let publish_res = profile
+ .client()
+ .ipfs
+ .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
+ peer_id: own_id.to_bytes(),
+ cid: head
+ }.into_bytes()?)
+ .await;
+
+ match publish_res {
+ Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) {
+ anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))")
+ } else {
+ Ok(())
+ },
+
+ Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) {
+ anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))")
+ } else {
+ Ok(())
+ }
+
+ }
+ },
+ }
+ }
+
+ pub async fn run(mut self) -> Result<()> {
+ use std::convert::TryFrom;
+ use futures::stream::StreamExt;
+
+ let mut subscription_stream = self.inner.profile()
+ .read()
+ .await
+ .client()
+ .ipfs
+ .pubsub_subscribe(self.gossip_topic_name.clone())
+ .await?;
+
+ loop {
+ tokio::select! {
+ next_control_msg = self.receive_next_message() => {
+ match next_control_msg {
+ None => break,
+ Some(tpl) => self.process_reactor_message(tpl).await?,
+ }
+ }
+
+ next_gossip_message = subscription_stream.next() => {
+ if let Some(next_gossip_message) = next_gossip_message {
+ match serde_json::from_slice(&next_gossip_message.data) {
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source),
+ Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => {
+ let peer_id = ipfs::PeerId::from_bytes(&peer_id);
+ let cid = cid::Cid::try_from(&*cid);
+ log::trace!("Peer {:?} is at {:?}", peer_id, cid)
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+}
+