summaryrefslogtreecommitdiffstats
path: root/lib/src/reactor/gossip/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/src/reactor/gossip/mod.rs')
-rw-r--r--lib/src/reactor/gossip/mod.rs329
1 files changed, 0 insertions, 329 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
deleted file mode 100644
index 7658509..0000000
--- a/lib/src/reactor/gossip/mod.rs
+++ /dev/null
@@ -1,329 +0,0 @@
-//! Low-level module for gossip'ing code
-//!
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-//!
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::Reactor;
-use crate::reactor::ReactorBuilder;
-use crate::reactor::ctrl::ReactorReceiver;
-use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplySender;
-
-mod ctrl;
-pub use ctrl::GossipRequest;
-pub use ctrl::GossipReply;
-
-mod msg;
-pub use msg::GossipMessage;
-
-mod strategy;
-pub use strategy::GossipHandlingStrategy;
-pub use strategy::LogStrategy;
-
-#[derive(Debug)]
-pub struct GossipReactorBuilder {
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
-}
-
-impl GossipReactorBuilder {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
- Self { profile, gossip_topic_name }
- }
-}
-
-impl ReactorBuilder for GossipReactorBuilder {
- type Reactor = GossipReactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
- GossipReactor {
- profile: self.profile,
- gossip_topic_name: self.gossip_topic_name,
- receiver: rr,
- strategy: std::marker::PhantomData,
- }
- }
-}
-
-pub struct GossipReactor<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
-{
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
- receiver: ReactorReceiver<GossipRequest, GossipReply>,
- strategy: std::marker::PhantomData<Strategy>,
-}
-
-impl<S> std::fmt::Debug for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
- }
-}
-
-impl<S> GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(reply) {
- anyhow::bail!("Failed to send GossipReply::NoHead)")
- }
-
- Ok(())
- }
-
- async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
- let profile = self.profile.read().await;
-
- let head = profile.head();
- if head.is_none() {
- Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
- return Ok(())
- }
- let head = head.unwrap().to_bytes();
-
- let own_id = match profile.client().own_id().await {
- Ok(id) => id,
- Err(e) => {
- Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
- return Ok(()) // TODO: abort operation here for now, maybe not the best idea
- }
- };
-
- let publish_res = profile
- .client()
- .ipfs
- .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
- peer_id: own_id.to_bytes(),
- cid: head
- }.into_bytes()?)
- .await;
-
- match publish_res {
- Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
- Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
- }
- }
-
- async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
- log::trace!("Connecting GossipReactor with {:?}", addr);
- self.profile.read().await.client().connect(addr).await
- }
-
- #[cfg(test)]
- async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
- self.profile
- .read()
- .await
- .client()
- .ipfs
- .peers()
- .await
- .map(|connections| {
- connections.iter().any(|connection| connection.addr == addr)
- })
- }
-
-}
-
-#[async_trait::async_trait]
-impl<S> Reactor for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- type Request = GossipRequest;
- type Reply = GossipReply;
-
- async fn run(mut self) -> Result<()> {
- use futures::stream::StreamExt;
-
- log::trace!("Booting {:?}", self);
- let mut subscription_stream = self.profile
- .read()
- .await
- .client()
- .ipfs
- .pubsub_subscribe(self.gossip_topic_name.clone())
- .await?;
-
- log::trace!("{:?} main loop", self);
- loop {
- tokio::select! {
- next_control_msg = self.receiver.recv() => {
- log::trace!("Received control message: {:?}", next_control_msg);
- match next_control_msg {
- None => break,
- Some((GossipRequest::Exit, reply_channel)) => {
- if let Err(_) = reply_channel.send(GossipReply::Exiting) {
- anyhow::bail!("Failed sending EXITING reply")
- }
- break
- },
-
- Some((GossipRequest::Ping, reply_channel)) => {
- log::trace!("Replying with Pong");
- if let Err(_) = reply_channel.send(GossipReply::Pong) {
- anyhow::bail!("Failed sending PONG reply")
- }
- },
-
- Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
-
- Some((GossipRequest::Connect(addr), reply_channel)) => {
- let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
- if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
- anyhow::bail!("Failed sending Connect({}) reply", addr)
- }
- },
- }
- }
-
- next_gossip_message = subscription_stream.next() => {
- if let Some(msg) = next_gossip_message {
- log::trace!("Received gossip message");
- match serde_json::from_slice(&msg.data) {
- Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
- }
- } else {
- log::trace!("Gossip stream closed, breaking reactor loop");
- break;
- }
- }
- }
- }
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use std::convert::TryFrom;
- use std::sync::Arc;
- use tokio::sync::RwLock;
-
- #[tokio::test]
- async fn test_gossip_reactor_simple() {
- let _ = env_logger::try_init();
-
- let profile = Profile::new_inmemory("test-gossip-reactor-simple").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
-
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Ping, reply_sender)).unwrap();
-
- let mut pong_received = false;
- let _ = tokio::spawn(async move {
- reactor.run().await
- });
-
- match reply_receiver.recv().await {
- Some(GossipReply::Pong) => {
- pong_received = true;
- log::trace!("Pong received!");
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Exit, reply_sender)).unwrap();
- }
- Some(r) => {
- assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
- }
- None => {
- // nothing
- }
- }
-
- assert!(pong_received, "No pong received");
- }
-
- #[tokio::test]
- async fn test_gossip_reactor_gossipping() {
- let _ = env_logger::try_init();
-
- let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
- let (left_profile, left_reactor, left_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built left GossipReactor");
-
- let (right_profile, right_reactor, right_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built right GossipReactor");
-
- async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
- profile.read()
- .await
- .client()
- .ipfs
- .identity()
- .await
- .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
- .and_then(|(peerid, mut addr)| {
- ipfs::MultiaddrWithPeerId::try_from({
- addr.pop().expect("At least one address for client")
- })
- .map_err(anyhow::Error::from)
- })
- }
-
- let left_running_reactor = tokio::spawn(async move {
- left_reactor.run().await
- });
-
- let right_running_reactor = tokio::spawn(async move {
- right_reactor.run().await
- });
-
- let left_peer_id = get_peer_id(left_profile.clone()).await;
- log::trace!("Left GossipReactor = {:?}", left_peer_id);
- assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
- let left_peer_id = left_peer_id.unwrap();
-
- let right_peer_id = get_peer_id(right_profile.clone()).await;
- log::trace!("Right GossipReactor = {:?}", right_peer_id);
- assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
- let right_peer_id = right_peer_id.unwrap();
-
- let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor");
- right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
- match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
- Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
- Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
- log::trace!("Right GossipReactor is connected");
- assert!(true)
- },
- Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
- Ok(None) => assert!(false, "No reply from right reactor received"),
- }
- }
-}