summaryrefslogtreecommitdiffstats
path: root/lib/src/reactor/gossip/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/reactor/gossip/mod.rs')
-rw-r--r--lib/src/reactor/gossip/mod.rs331
1 files changed, 331 insertions, 0 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
new file mode 100644
index 0000000..c8a7d28
--- /dev/null
+++ b/lib/src/reactor/gossip/mod.rs
@@ -0,0 +1,331 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::Reactor;
+use crate::reactor::ReactorBuilder;
+use crate::reactor::ctrl::ReactorReceiver;
+use crate::reactor::ctrl::ReactorSender;
+use crate::reactor::ctrl::ReplySender;
+
+mod ctrl;
+pub use ctrl::GossipRequest;
+pub use ctrl::GossipReply;
+
+mod msg;
+pub use msg::GossipMessage;
+
+mod strategy;
+pub use strategy::GossipHandlingStrategy;
+pub use strategy::LogStrategy;
+
+#[derive(Debug)]
+pub struct GossipReactorBuilder {
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+}
+
+impl GossipReactorBuilder {
+ pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
+ Self { profile, gossip_topic_name }
+ }
+}
+
+impl ReactorBuilder for GossipReactorBuilder {
+ type Reactor = GossipReactor;
+
+ fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
+ GossipReactor {
+ profile: self.profile,
+ gossip_topic_name: self.gossip_topic_name,
+ receiver: rr,
+ strategy: std::marker::PhantomData,
+ }
+ }
+}
+
+pub struct GossipReactor<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+ receiver: ReactorReceiver<GossipRequest, GossipReply>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<S> std::fmt::Debug for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
+ }
+}
+
+impl<S> GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
+ if let Err(_) = channel.send(reply) {
+ anyhow::bail!("Failed to send GossipReply::NoHead)")
+ }
+
+ Ok(())
+ }
+
+ async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
+ let profile = self.profile.read().await;
+
+ let head = profile.head();
+ if head.is_none() {
+ Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
+ return Ok(())
+ }
+ let head = head.unwrap().to_bytes();
+
+ let own_id = match profile.client().own_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
+ return Ok(()) // TODO: abort operation here for now, maybe not the best idea
+ }
+ };
+
+ let publish_res = profile
+ .client()
+ .ipfs
+ .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
+ peer_id: own_id.to_bytes(),
+ cid: head
+ }.into_bytes()?)
+ .await;
+
+ match publish_res {
+ Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
+ Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
+ }
+ }
+
+ async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
+ log::trace!("Connecting GossipReactor with {:?}", addr);
+ self.profile.read().await.client().connect(addr).await
+ }
+
+ #[cfg(test)]
+ async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
+ self.profile
+ .read()
+ .await
+ .client()
+ .ipfs
+ .peers()
+ .await
+ .map(|connections| {
+ connections.iter().any(|connection| connection.addr == addr)
+ })
+ }
+
+}
+
+#[async_trait::async_trait]
+impl<S> Reactor for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ type Request = GossipRequest;
+ type Reply = GossipReply;
+
+ async fn run(mut self) -> Result<()> {
+ use futures::stream::StreamExt;
+
+ log::trace!("Booting {:?}", self);
+ let mut subscription_stream = self.profile
+ .read()
+ .await
+ .client()
+ .ipfs
+ .pubsub_subscribe(self.gossip_topic_name.clone())
+ .await?;
+
+ log::trace!("{:?} main loop", self);
+ loop {
+ tokio::select! {
+ next_control_msg = self.receiver.recv() => {
+ log::trace!("Received control message: {:?}", next_control_msg);
+ match next_control_msg {
+ None => break,
+ Some((GossipRequest::Exit, reply_channel)) => {
+ if let Err(_) = reply_channel.send(GossipReply::Exiting) {
+ anyhow::bail!("Failed sending EXITING reply")
+ }
+ break
+ },
+
+ Some((GossipRequest::Ping, reply_channel)) => {
+ log::trace!("Replying with Pong");
+ if let Err(_) = reply_channel.send(GossipReply::Pong) {
+ anyhow::bail!("Failed sending PONG reply")
+ }
+ },
+
+ Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
+
+ Some((GossipRequest::Connect(addr), reply_channel)) => {
+ let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
+ if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
+ anyhow::bail!("Failed sending Connect({}) reply", addr)
+ }
+ },
+ }
+ }
+
+ next_gossip_message = subscription_stream.next() => {
+ if let Some(msg) = next_gossip_message {
+ log::trace!("Received gossip message");
+ match serde_json::from_slice(&msg.data) {
+ Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
+ }
+ } else {
+ log::trace!("Gossip stream closed, breaking reactor loop");
+ break;
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::convert::TryFrom;
+ use std::sync::Arc;
+ use tokio::sync::RwLock;
+
+ use crate::config::Config;
+
+ #[tokio::test]
+ async fn test_gossip_reactor_simple() {
+ let _ = env_logger::try_init();
+
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
+
+ let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+ rx.send((GossipRequest::Ping, reply_sender)).unwrap();
+
+ let mut pong_received = false;
+ let _ = tokio::spawn(async move {
+ reactor.run().await
+ });
+
+ match reply_receiver.recv().await {
+ Some(GossipReply::Pong) => {
+ pong_received = true;
+ log::trace!("Pong received!");
+ let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+ rx.send((GossipRequest::Exit, reply_sender)).unwrap();
+ }
+ Some(r) => {
+ assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
+ }
+ None => {
+ // nothing
+ }
+ }
+
+ assert!(pong_received, "No pong received");
+ }
+
+ #[tokio::test]
+ async fn test_gossip_reactor_gossipping() {
+ let _ = env_logger::try_init();
+
+ let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
+ let (left_profile, left_reactor, left_rx) = {
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
+ };
+ log::trace!("Built left GossipReactor");
+
+ let (right_profile, right_reactor, right_rx) = {
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
+ };
+ log::trace!("Built right GossipReactor");
+
+ async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
+ profile.read()
+ .await
+ .client()
+ .ipfs
+ .identity()
+ .await
+ .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
+ .and_then(|(peerid, mut addr)| {
+ ipfs::MultiaddrWithPeerId::try_from({
+ addr.pop().expect("At least one address for client")
+ })
+ .map_err(anyhow::Error::from)
+ })
+ }
+
+ let left_running_reactor = tokio::spawn(async move {
+ left_reactor.run().await
+ });
+
+ let right_running_reactor = tokio::spawn(async move {
+ right_reactor.run().await
+ });
+
+ let left_peer_id = get_peer_id(left_profile.clone()).await;
+ log::trace!("Left GossipReactor = {:?}", left_peer_id);
+ assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
+ let left_peer_id = left_peer_id.unwrap();
+
+ let right_peer_id = get_peer_id(right_profile.clone()).await;
+ log::trace!("Right GossipReactor = {:?}", right_peer_id);
+ assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
+ let right_peer_id = right_peer_id.unwrap();
+
+ let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor");
+ right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
+ match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
+ Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
+ Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
+ log::trace!("Right GossipReactor is connected");
+ assert!(true)
+ },
+ Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
+ Ok(None) => assert!(false, "No reply from right reactor received"),
+ }
+ }
+}