summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-10 17:56:57 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-10 17:56:57 +0100
commit5856b8b59044c5c3dc007a35191931fb13b6aef1 (patch)
tree75dfc8ee81fc38827ceb6a9c149bfde729b54884
parent20f376f70c107e686af11d6f09284c4fafc48d1a (diff)
Add implementation for first GossipMessage
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/gossip.rs114
1 files changed, 108 insertions, 6 deletions
diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs
index ba950db..0da0c97 100644
--- a/lib/src/reactor/gossip.rs
+++ b/lib/src/reactor/gossip.rs
@@ -21,22 +21,48 @@ use crate::reactor::ctrl::ReplyChannel;
#[derive(Debug)]
pub struct GossipReactor {
inner: Reactor<GossipRequest, GossipReply>,
+ gossip_topic_name: String,
}
#[derive(Debug)]
pub enum GossipRequest {
Ping,
+ PublishMe,
}
#[derive(Debug)]
pub enum GossipReply {
Pong,
+ NoHead,
+ PublishMeResult(Result<()>),
}
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
+pub enum GossipMessage {
+ CurrentProfileState {
+ peer_id: Vec<u8>,
+ cid: Vec<u8>,
+ },
+}
+
+impl GossipMessage {
+ fn into_bytes(self) -> Result<Vec<u8>> {
+ serde_json::to_string(&self)
+ .map(String::into_bytes)
+ .map_err(anyhow::Error::from)
+ }
+}
+
+
impl GossipReactor {
- pub fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<GossipRequest, GossipReply>) {
+ pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> (Self, ReactorSender<GossipRequest, GossipReply>) {
let (inner, sender) = Reactor::<GossipRequest, GossipReply>::new(profile);
- (Self { inner }, sender)
+ let reactor = Self {
+ inner,
+ gossip_topic_name,
+ };
+
+ (reactor, sender)
}
pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest<GossipRequest>, ReplyChannel<GossipReply>)> {
@@ -52,15 +78,91 @@ impl GossipReactor {
}
Ok(())
- }
+ },
+
+ Some((GossipRequest::PublishMe, reply_channel)) => {
+ let profile = self.inner.profile();
+ let profile = profile.read().await;
+
+ let head = profile.head();
+ if head.is_none() {
+ if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) {
+ anyhow::bail!("Failed to send GossipReply::NoHead)")
+ }
+ }
+ let head = head.unwrap().to_bytes();
+
+ let own_id = match profile.client().own_id().await {
+ Ok(id) => id,
+ Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) {
+ anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))")
+ } else {
+ return Ok(()) // TODO: abort operation here for now, maybe not the best idea
+ }
+ };
+
+ let publish_res = profile
+ .client()
+ .ipfs
+ .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
+ peer_id: own_id.to_bytes(),
+ cid: head
+ }.into_bytes()?)
+ .await;
+
+ match publish_res {
+ Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) {
+ anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))")
+ } else {
+ Ok(())
+ },
+
+ Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) {
+ anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))")
+ } else {
+ Ok(())
+ }
+
+ }
+ },
}
}
pub async fn run(mut self) -> Result<()> {
+ use std::convert::TryFrom;
+ use futures::stream::StreamExt;
+
+ let mut subscription_stream = self.inner.profile()
+ .read()
+ .await
+ .client()
+ .ipfs
+ .pubsub_subscribe(self.gossip_topic_name.clone())
+ .await?;
+
loop {
- match self.receive_next_message().await {
- None => break,
- Some(tpl) => self.process_reactor_message(tpl).await?,
+ tokio::select! {
+ next_control_msg = self.receive_next_message() => {
+ match next_control_msg {
+ None => break,
+ Some(tpl) => self.process_reactor_message(tpl).await?,
+ }
+ }
+
+ next_gossip_message = subscription_stream.next() => {
+ if let Some(next_gossip_message) = next_gossip_message {
+ match serde_json::from_slice(&next_gossip_message.data) {
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source),
+ Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => {
+ let peer_id = ipfs::PeerId::from_bytes(&peer_id);
+ let cid = cid::Cid::try_from(&*cid);
+ log::trace!("Peer {:?} is at {:?}", peer_id, cid)
+ }
+ }
+ } else {
+ break;
+ }
+ }
}
}
Ok(())