diff options
Diffstat (limited to 'lib/src/reactor/gossip/mod.rs')
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs new file mode 100644 index 0000000..c8a7d28 --- /dev/null +++ b/lib/src/reactor/gossip/mod.rs @@ -0,0 +1,331 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::Reactor; +use crate::reactor::ReactorBuilder; +use crate::reactor::ctrl::ReactorReceiver; +use crate::reactor::ctrl::ReactorSender; +use crate::reactor::ctrl::ReplySender; + +mod ctrl; +pub use ctrl::GossipRequest; +pub use ctrl::GossipReply; + +mod msg; +pub use msg::GossipMessage; + +mod strategy; +pub use strategy::GossipHandlingStrategy; +pub use strategy::LogStrategy; + +#[derive(Debug)] +pub struct GossipReactorBuilder { + profile: Arc<RwLock<Profile>>, + gossip_topic_name: String, +} + +impl GossipReactorBuilder { + pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self { + Self { profile, gossip_topic_name } + } +} + +impl ReactorBuilder for GossipReactorBuilder { + type Reactor = GossipReactor; + + fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor { + GossipReactor { + profile: self.profile, + gossip_topic_name: self.gossip_topic_name, + receiver: rr, + strategy: std::marker::PhantomData, + } + } +} + +pub struct GossipReactor<Strategy = LogStrategy> + where Strategy: GossipHandlingStrategy + Sync + Send +{ + profile: Arc<RwLock<Profile>>, + gossip_topic_name: String, + receiver: ReactorReceiver<GossipRequest, GossipReply>, + strategy: std::marker::PhantomData<Strategy>, +} + +impl<S> std::fmt::Debug for GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) + } +} + +impl<S> GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ + fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> { + if let Err(_) = channel.send(reply) { + anyhow::bail!("Failed to send GossipReply::NoHead)") + } + + Ok(()) + } + + async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> { + let profile = self.profile.read().await; + + let head = profile.head(); + if head.is_none() { + Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; + return Ok(()) + } + let head = head.unwrap().to_bytes(); + + let own_id = match profile.client().own_id().await { + Ok(id) => id, + Err(e) => { + Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; + return Ok(()) // TODO: abort operation here for now, maybe not the best idea + } + }; + + let publish_res = profile + .client() + .ipfs + .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { + peer_id: own_id.to_bytes(), + cid: head + }.into_bytes()?) + .await; + + match publish_res { + Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), + Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), + } + } + + async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { + log::trace!("Connecting GossipReactor with {:?}", addr); + self.profile.read().await.client().connect(addr).await + } + + #[cfg(test)] + async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> { + self.profile + .read() + .await + .client() + .ipfs + .peers() + .await + .map(|connections| { + connections.iter().any(|connection| connection.addr == addr) + }) + } + +} + +#[async_trait::async_trait] +impl<S> Reactor for GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ + type Request = GossipRequest; + type Reply = GossipReply; + + async fn run(mut self) -> Result<()> { + use futures::stream::StreamExt; + + log::trace!("Booting {:?}", self); + let mut subscription_stream = self.profile + .read() + .await + .client() + .ipfs + .pubsub_subscribe(self.gossip_topic_name.clone()) + .await?; + + log::trace!("{:?} main loop", self); + loop { + tokio::select! { + next_control_msg = self.receiver.recv() => { + log::trace!("Received control message: {:?}", next_control_msg); + match next_control_msg { + None => break, + Some((GossipRequest::Exit, reply_channel)) => { + if let Err(_) = reply_channel.send(GossipReply::Exiting) { + anyhow::bail!("Failed sending EXITING reply") + } + break + }, + + Some((GossipRequest::Ping, reply_channel)) => { + log::trace!("Replying with Pong"); + if let Err(_) = reply_channel.send(GossipReply::Pong) { + anyhow::bail!("Failed sending PONG reply") + } + }, + + Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?, + + Some((GossipRequest::Connect(addr), reply_channel)) => { + let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); + if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { + anyhow::bail!("Failed sending Connect({}) reply", addr) + } + }, + } + } + + next_gossip_message = subscription_stream.next() => { + if let Some(msg) = next_gossip_message { + log::trace!("Received gossip message"); + match serde_json::from_slice(&msg.data) { + Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?, + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + } + } else { + log::trace!("Gossip stream closed, breaking reactor loop"); + break; + } + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::convert::TryFrom; + use std::sync::Arc; + use tokio::sync::RwLock; + + use crate::config::Config; + + #[tokio::test] + async fn test_gossip_reactor_simple() { + let _ = env_logger::try_init(); + + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let gossip_topic_name = String::from("test-gossip-reactor-simple-topic"); + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx); + + let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + rx.send((GossipRequest::Ping, reply_sender)).unwrap(); + + let mut pong_received = false; + let _ = tokio::spawn(async move { + reactor.run().await + }); + + match reply_receiver.recv().await { + Some(GossipReply::Pong) => { + pong_received = true; + log::trace!("Pong received!"); + let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + rx.send((GossipRequest::Exit, reply_sender)).unwrap(); + } + Some(r) => { + assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); + } + None => { + // nothing + } + } + + assert!(pong_received, "No pong received"); + } + + #[tokio::test] + async fn test_gossip_reactor_gossipping() { + let _ = env_logger::try_init(); + + let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic"); + let (left_profile, left_reactor, left_rx) = { + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); + (profile, reactor, rx) + }; + log::trace!("Built left GossipReactor"); + + let (right_profile, right_reactor, right_rx) = { + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); + (profile, reactor, rx) + }; + log::trace!("Built right GossipReactor"); + + async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> { + profile.read() + .await + .client() + .ipfs + .identity() + .await + .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr)) + .and_then(|(peerid, mut addr)| { + ipfs::MultiaddrWithPeerId::try_from({ + addr.pop().expect("At least one address for client") + }) + .map_err(anyhow::Error::from) + }) + } + + let left_running_reactor = tokio::spawn(async move { + left_reactor.run().await + }); + + let right_running_reactor = tokio::spawn(async move { + right_reactor.run().await + }); + + let left_peer_id = get_peer_id(left_profile.clone()).await; + log::trace!("Left GossipReactor = {:?}", left_peer_id); + assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id); + let left_peer_id = left_peer_id.unwrap(); + + let right_peer_id = get_peer_id(right_profile.clone()).await; + log::trace!("Right GossipReactor = {:?}", right_peer_id); + assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id); + let right_peer_id = right_peer_id.unwrap(); + + let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + + log::trace!("Right GossipReactor should now connect to left GossipReactor"); + right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap(); + + log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply"); + match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await { + Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"), + Ok(Some(GossipReply::ConnectResult(Ok(())))) => { + log::trace!("Right GossipReactor is connected"); + assert!(true) + }, + Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), + Ok(None) => assert!(false, "No reply from right reactor received"), + } + } +} |