summaryrefslogtreecommitdiffstats
path: root/lib/src/reactor/gossip/mod.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:04:31 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:14:37 +0100
commit542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 (patch)
tree89905044c44e1af21ab79b150b85011f96efa8fe /lib/src/reactor/gossip/mod.rs
parent7675eba9244475c3847fd48eb13b5d8b54cf271c (diff)
Shrink idea of "Reactors"
A Reactor can be waaay less complex if we simply use it as "map"-helper for mapping over `Stream`s. If we map over a stream of Vec<u8> and deserialize them to GossipMessages in one step, and handle them appropriately in the next step, it is way less complex to implement these things and we do not have to care about this whole "how do I shut down the thing" because we can simply drop() everything and let the destructors do their job. This patch removes the Reactor nonsense. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'lib/src/reactor/gossip/mod.rs')
-rw-r--r--lib/src/reactor/gossip/mod.rs329
1 files changed, 0 insertions, 329 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
deleted file mode 100644
index 7658509..0000000
--- a/lib/src/reactor/gossip/mod.rs
+++ /dev/null
@@ -1,329 +0,0 @@
-//! Low-level module for gossip'ing code
-//!
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-//!
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::Reactor;
-use crate::reactor::ReactorBuilder;
-use crate::reactor::ctrl::ReactorReceiver;
-use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplySender;
-
-mod ctrl;
-pub use ctrl::GossipRequest;
-pub use ctrl::GossipReply;
-
-mod msg;
-pub use msg::GossipMessage;
-
-mod strategy;
-pub use strategy::GossipHandlingStrategy;
-pub use strategy::LogStrategy;
-
-#[derive(Debug)]
-pub struct GossipReactorBuilder {
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
-}
-
-impl GossipReactorBuilder {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
- Self { profile, gossip_topic_name }
- }
-}
-
-impl ReactorBuilder for GossipReactorBuilder {
- type Reactor = GossipReactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
- GossipReactor {
- profile: self.profile,
- gossip_topic_name: self.gossip_topic_name,
- receiver: rr,
- strategy: std::marker::PhantomData,
- }
- }
-}
-
-pub struct GossipReactor<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
-{
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
- receiver: ReactorReceiver<GossipRequest, GossipReply>,
- strategy: std::marker::PhantomData<Strategy>,
-}
-
-impl<S> std::fmt::Debug for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
- }
-}
-
-impl<S> GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(reply) {
- anyhow::bail!("Failed to send GossipReply::NoHead)")
- }
-
- Ok(())
- }
-
- async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
- let profile = self.profile.read().await;
-
- let head = profile.head();
- if head.is_none() {
- Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
- return Ok(())
- }
- let head = head.unwrap().to_bytes();
-
- let own_id = match profile.client().own_id().await {
- Ok(id) => id,
- Err(e) => {
- Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
- return Ok(()) // TODO: abort operation here for now, maybe not the best idea
- }
- };
-
- let publish_res = profile
- .client()
- .ipfs
- .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
- peer_id: own_id.to_bytes(),
- cid: head
- }.into_bytes()?)
- .await;
-
- match publish_res {
- Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
- Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
- }
- }
-
- async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
- log::trace!("Connecting GossipReactor with {:?}", addr);
- self.profile.read().await.client().connect(addr).await
- }
-
- #[cfg(test)]
- async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
- self.profile
- .read()
- .await
- .client()
- .ipfs
- .peers()
- .await
- .map(|connections| {
- connections.iter().any(|connection| connection.addr == addr)
- })
- }
-
-}
-
-#[async_trait::async_trait]
-impl<S> Reactor for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- type Request = GossipRequest;
- type Reply = GossipReply;
-
- async fn run(mut self) -> Result<()> {
- use futures::stream::StreamExt;
-
- log::trace!("Booting {:?}", self);
- let mut subscription_stream = self.profile
- .read()
- .await
- .client()
- .ipfs
- .pubsub_subscribe(self.gossip_topic_name.clone())
- .await?;
-
- log::trace!("{:?} main loop", self);
- loop {
- tokio::select! {
- next_control_msg = self.receiver.recv() => {
- log::trace!("Received control message: {:?}", next_control_msg);
- match next_control_msg {
- None => break,
- Some((GossipRequest::Exit, reply_channel)) => {
- if let Err(_) = reply_channel.send(GossipReply::Exiting) {
- anyhow::bail!("Failed sending EXITING reply")
- }
- break
- },
-
- Some((GossipRequest::Ping, reply_channel)) => {
- log::trace!("Replying with Pong");
- if let Err(_) = reply_channel.send(GossipReply::Pong) {
- anyhow::bail!("Failed sending PONG reply")
- }
- },
-
- Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
-
- Some((GossipRequest::Connect(addr), reply_channel)) => {
- let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
- if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
- anyhow::bail!("Failed sending Connect({}) reply", addr)
- }
- },
- }
- }
-
- next_gossip_message = subscription_stream.next() => {
- if let Some(msg) = next_gossip_message {
- log::trace!("Received gossip message");
- match serde_json::from_slice(&msg.data) {
- Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
- }
- } else {
- log::trace!("Gossip stream closed, breaking reactor loop");
- break;
- }
- }
- }
- }
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use std::convert::TryFrom;
- use std::sync::Arc;
- use tokio::sync::RwLock;
-
- #[tokio::test]
- async fn test_gossip_reactor_simple() {
- let _ = env_logger::try_init();
-
- let profile = Profile::new_inmemory("test-gossip-reactor-simple").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
-
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Ping, reply_sender)).unwrap();
-
- let mut pong_received = false;
- let _ = tokio::spawn(async move {
- reactor.run().await
- });
-
- match reply_receiver.recv().await {
- Some(GossipReply::Pong) => {
- pong_received = true;
- log::trace!("Pong received!");
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Exit, reply_sender)).unwrap();
- }
- Some(r) => {
- assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
- }
- None => {
- // nothing
- }
- }
-
- assert!(pong_received, "No pong received");
- }
-
- #[tokio::test]
- async fn test_gossip_reactor_gossipping() {
- let _ = env_logger::try_init();
-
- let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
- let (left_profile, left_reactor, left_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built left GossipReactor");
-
- let (right_profile, right_reactor, right_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built right GossipReactor");
-
- async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
- profile.read()
- .await
- .client()
- .ipfs
- .identity()
- .await
- .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
- .and_then(|(peerid, mut addr)| {
- ipfs::MultiaddrWithPeerId::try_from({
- addr.pop().expect("At least one address for client")
- })
- .map_err(anyhow::Error::from)
- })
- }
-
- let left_running_reactor = tokio::spawn(async move {
- left_reactor.run().await
- });
-
- let right_running_reactor = tokio::spawn(async move {
- right_reactor.run().await
- });
-
- let left_peer_id = get_peer_id(left_profile.clone()).await;
- log::trace!("Left GossipReactor = {:?}", left_peer_id);
- assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
- let left_peer_id = left_peer_id.unwrap();
-
- let right_peer_id = get_peer_id(right_profile.clone()).await;
- log::trace!("Right GossipReactor = {:?}", right_peer_id);
- assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
- let right_peer_id = right_peer_id.unwrap();
-
- let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor");
- right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
- match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
- Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
- Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
- log::trace!("Right GossipReactor is connected");
- assert!(true)
- },
- Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
- Ok(None) => assert!(false, "No reply from right reactor received"),
- }
- }
-}