diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/src/gossip/deserializer.rs | 55 | ||||
-rw-r--r-- | lib/src/gossip/handler.rs | 73 | ||||
-rw-r--r-- | lib/src/gossip/mod.rs | 9 | ||||
-rw-r--r-- | lib/src/gossip/msg.rs (renamed from lib/src/reactor/gossip/msg.rs) | 0 | ||||
-rw-r--r-- | lib/src/lib.rs | 2 | ||||
-rw-r--r-- | lib/src/reactor/account.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/ctrl.rs | 15 | ||||
-rw-r--r-- | lib/src/reactor/device.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/gossip/ctrl.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 329 | ||||
-rw-r--r-- | lib/src/reactor/gossip/strategy.rs | 31 | ||||
-rw-r--r-- | lib/src/reactor/mod.rs | 28 |
12 files changed, 138 insertions, 461 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs new file mode 100644 index 0000000..a50644f --- /dev/null +++ b/lib/src/gossip/deserializer.rs @@ -0,0 +1,55 @@ +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; + +use crate::gossip::GossipMessage; + +pub struct GossipDeserializer<ErrStrategy = LogStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + strategy: std::marker::PhantomData<ErrStrategy>, +} + +impl<ErrStrategy> GossipDeserializer<ErrStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + pub fn new() -> Self { + Self { + strategy: std::marker::PhantomData, + } + } + + pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage> + where S: Stream<Item = ipfs::PubsubMessage> + { + input.filter_map(|message| async move { + log::trace!("Received gossip message"); + + match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { + Ok(m) => Some(m), + Err(e) => { + ErrStrategy::handle_error(e); + None + } + } + }) + } +} + +pub trait GossipDeserializerErrorStrategy { + fn handle_error(err: anyhow::Error); +} + +pub struct LogStrategy; +impl GossipDeserializerErrorStrategy for LogStrategy { + fn handle_error(err: anyhow::Error) { + log::trace!("Error: {}", err); + } +} + +pub struct IgnoreStrategy; +impl GossipDeserializerErrorStrategy for IgnoreStrategy { + fn handle_error(_: anyhow::Error) { + () + } +} diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs new file mode 100644 index 0000000..e524da8 --- /dev/null +++ b/lib/src/gossip/handler.rs @@ -0,0 +1,73 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::gossip::GossipMessage; + +#[derive(Debug)] +pub struct GossipHandler<Strategy = LogStrategy> + where Strategy: GossipHandlingStrategy + Sync + Send +{ + profile: Arc<RwLock<Profile>>, + strategy: std::marker::PhantomData<Strategy>, +} + +impl<Strat> GossipHandler<Strat> + where Strat: GossipHandlingStrategy + Sync + Send +{ + pub fn new(profile: Arc<RwLock<Profile>>) -> Self { + Self { + profile, + strategy: std::marker::PhantomData, + } + } + + pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)> + where S: Stream<Item = (ipfs::PeerId, GossipMessage)> + { + input.then(move |(source, msg)| { + let pr = self.profile.clone(); + async move { + log::trace!("Received gossip message from {}: {:?}", source, msg); + let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await; + (msg, res) + } + }) + } +} + +#[async_trait::async_trait] +pub trait GossipHandlingStrategy: Sync + Send { + async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; +} + +pub struct LogStrategy; + +#[async_trait::async_trait] +impl GossipHandlingStrategy for LogStrategy { + async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { + use std::convert::TryFrom; + use std::ops::Deref; + + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + let peer_id = ipfs::PeerId::from_bytes(peer_id); + let cid = cid::Cid::try_from(cid.deref()); + + log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); + } + } + + Ok(()) + } +} diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs new file mode 100644 index 0000000..d6a6963 --- /dev/null +++ b/lib/src/gossip/mod.rs @@ -0,0 +1,9 @@ +mod msg; +pub use msg::GossipMessage; + +mod handler; +pub use handler::*; + +mod deserializer; +pub use deserializer::*; + diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/gossip/msg.rs index 049fc68..049fc68 100644 --- a/lib/src/reactor/gossip/msg.rs +++ b/lib/src/gossip/msg.rs diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b7b05e2..e836c37 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -4,4 +4,4 @@ pub mod ipfs_client; pub mod profile; pub mod stream; pub mod types; -pub mod reactor; +pub mod gossip; diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs deleted file mode 100644 index 59913b5..0000000 --- a/lib/src/reactor/account.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Module for account handling (following accounts, caching account updates) using the gossip -//! module for the lower-level handling - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::GossipReactor; - -#[derive(Debug)] -pub struct AccountReactor(GossipReactor); - -impl AccountReactor { - pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { - unimplemented!() - } -} diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs deleted file mode 100644 index a32f1c3..0000000 --- a/lib/src/reactor/ctrl.rs +++ /dev/null @@ -1,15 +0,0 @@ -use tokio::sync::mpsc::UnboundedSender as Sender; -use tokio::sync::mpsc::UnboundedReceiver as Receiver; - -/// Type for sending messages to a reactor -pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>; - -/// Type that is used by a reactor for receiving messages -pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>; - -/// Type that represents the channel that has to be send with a request to a reactor for getting an -/// answer back -pub type ReplySender<Reply> = Sender<Reply>; - -pub type ReplyReceiver<Reply> = Receiver<Reply>; - diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs deleted file mode 100644 index 1014ca1..0000000 --- a/lib/src/reactor/device.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Module for multi-device support functionality, -//! which uses the gossip module for the lower-level handling - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::GossipReactor; - -#[derive(Debug)] -pub struct DeviceReactor(GossipReactor); - -impl DeviceReactor { - pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { - unimplemented!() - } -} diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs deleted file mode 100644 index 68fbf06..0000000 --- a/lib/src/reactor/gossip/ctrl.rs +++ /dev/null @@ -1,19 +0,0 @@ -use anyhow::Result; - -#[derive(Debug)] -pub enum GossipRequest { - Exit, - Ping, - PublishMe, - Connect(ipfs::MultiaddrWithPeerId), -} - -#[derive(Debug)] -pub enum GossipReply { - Exiting, - Pong, - NoHead, - PublishMeResult(Result<()>), - ConnectResult(Result<()>), -} - diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs deleted file mode 100644 index 7658509..0000000 --- a/lib/src/reactor/gossip/mod.rs +++ /dev/null @@ -1,329 +0,0 @@ -//! Low-level module for gossip'ing code -//! -//! This module implements the low-level gossiping functionality that other modules use to -//! implement actual behaviours on -//! - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::Reactor; -use crate::reactor::ReactorBuilder; -use crate::reactor::ctrl::ReactorReceiver; -use crate::reactor::ctrl::ReactorSender; -use crate::reactor::ctrl::ReplySender; - -mod ctrl; -pub use ctrl::GossipRequest; -pub use ctrl::GossipReply; - -mod msg; -pub use msg::GossipMessage; - -mod strategy; -pub use strategy::GossipHandlingStrategy; -pub use strategy::LogStrategy; - -#[derive(Debug)] -pub struct GossipReactorBuilder { - profile: Arc<RwLock<Profile>>, - gossip_topic_name: String, -} - -impl GossipReactorBuilder { - pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self { - Self { profile, gossip_topic_name } - } -} - -impl ReactorBuilder for GossipReactorBuilder { - type Reactor = GossipReactor; - - fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor { - GossipReactor { - profile: self.profile, - gossip_topic_name: self.gossip_topic_name, - receiver: rr, - strategy: std::marker::PhantomData, - } - } -} - -pub struct GossipReactor<Strategy = LogStrategy> - where Strategy: GossipHandlingStrategy + Sync + Send -{ - profile: Arc<RwLock<Profile>>, - gossip_topic_name: String, - receiver: ReactorReceiver<GossipRequest, GossipReply>, - strategy: std::marker::PhantomData<Strategy>, -} - -impl<S> std::fmt::Debug for GossipReactor<S> - where S: GossipHandlingStrategy + Sync + Send -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) - } -} - -impl<S> GossipReactor<S> - where S: GossipHandlingStrategy + Sync + Send -{ - fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> { - if let Err(_) = channel.send(reply) { - anyhow::bail!("Failed to send GossipReply::NoHead)") - } - - Ok(()) - } - - async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> { - let profile = self.profile.read().await; - - let head = profile.head(); - if head.is_none() { - Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; - return Ok(()) - } - let head = head.unwrap().to_bytes(); - - let own_id = match profile.client().own_id().await { - Ok(id) => id, - Err(e) => { - Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; - return Ok(()) // TODO: abort operation here for now, maybe not the best idea - } - }; - - let publish_res = profile - .client() - .ipfs - .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { - peer_id: own_id.to_bytes(), - cid: head - }.into_bytes()?) - .await; - - match publish_res { - Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), - Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), - } - } - - async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { - log::trace!("Connecting GossipReactor with {:?}", addr); - self.profile.read().await.client().connect(addr).await - } - - #[cfg(test)] - async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> { - self.profile - .read() - .await - .client() - .ipfs - .peers() - .await - .map(|connections| { - connections.iter().any(|connection| connection.addr == addr) - }) - } - -} - -#[async_trait::async_trait] -impl<S> Reactor for GossipReactor<S> - where S: GossipHandlingStrategy + Sync + Send -{ - type Request = GossipRequest; - type Reply = GossipReply; - - async fn run(mut self) -> Result<()> { - use futures::stream::StreamExt; - - log::trace!("Booting {:?}", self); - let mut subscription_stream = self.profile - .read() - .await - .client() - .ipfs - .pubsub_subscribe(self.gossip_topic_name.clone()) - .await?; - - log::trace!("{:?} main loop", self); - loop { - tokio::select! { - next_control_msg = self.receiver.recv() => { - log::trace!("Received control message: {:?}", next_control_msg); - match next_control_msg { - None => break, - Some((GossipRequest::Exit, reply_channel)) => { - if let Err(_) = reply_channel.send(GossipReply::Exiting) { - anyhow::bail!("Failed sending EXITING reply") - } - break - }, - - Some((GossipRequest::Ping, reply_channel)) => { - log::trace!("Replying with Pong"); - if let Err(_) = reply_channel.send(GossipReply::Pong) { - anyhow::bail!("Failed sending PONG reply") - } - }, - - Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?, - - Some((GossipRequest::Connect(addr), reply_channel)) => { - let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); - if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { - anyhow::bail!("Failed sending Connect({}) reply", addr) - } - }, - } - } - - next_gossip_message = subscription_stream.next() => { - if let Some(msg) = next_gossip_message { - log::trace!("Received gossip message"); - match serde_json::from_slice(&msg.data) { - Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?, - Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), - } - } else { - log::trace!("Gossip stream closed, breaking reactor loop"); - break; - } - } - } - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::convert::TryFrom; - use std::sync::Arc; - use tokio::sync::RwLock; - - #[tokio::test] - async fn test_gossip_reactor_simple() { - let _ = env_logger::try_init(); - - let profile = Profile::new_inmemory("test-gossip-reactor-simple").await; - assert!(profile.is_ok()); - let profile = Arc::new(RwLock::new(profile.unwrap())); - - let gossip_topic_name = String::from("test-gossip-reactor-simple-topic"); - let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx); - - let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - rx.send((GossipRequest::Ping, reply_sender)).unwrap(); - - let mut pong_received = false; - let _ = tokio::spawn(async move { - reactor.run().await - }); - - match reply_receiver.recv().await { - Some(GossipReply::Pong) => { - pong_received = true; - log::trace!("Pong received!"); - let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - rx.send((GossipRequest::Exit, reply_sender)).unwrap(); - } - Some(r) => { - assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); - } - None => { - // nothing - } - } - - assert!(pong_received, "No pong received"); - } - - #[tokio::test] - async fn test_gossip_reactor_gossipping() { - let _ = env_logger::try_init(); - - let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic"); - let (left_profile, left_reactor, left_rx) = { - let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await; - assert!(profile.is_ok()); - let profile = Arc::new(RwLock::new(profile.unwrap())); - - let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); - (profile, reactor, rx) - }; - log::trace!("Built left GossipReactor"); - - let (right_profile, right_reactor, right_rx) = { - let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await; - assert!(profile.is_ok()); - let profile = Arc::new(RwLock::new(profile.unwrap())); - - let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); - (profile, reactor, rx) - }; - log::trace!("Built right GossipReactor"); - - async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> { - profile.read() - .await - .client() - .ipfs - .identity() - .await - .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr)) - .and_then(|(peerid, mut addr)| { - ipfs::MultiaddrWithPeerId::try_from({ - addr.pop().expect("At least one address for client") - }) - .map_err(anyhow::Error::from) - }) - } - - let left_running_reactor = tokio::spawn(async move { - left_reactor.run().await - }); - - let right_running_reactor = tokio::spawn(async move { - right_reactor.run().await - }); - - let left_peer_id = get_peer_id(left_profile.clone()).await; - log::trace!("Left GossipReactor = {:?}", left_peer_id); - assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id); - let left_peer_id = left_peer_id.unwrap(); - - let right_peer_id = get_peer_id(right_profile.clone()).await; - log::trace!("Right GossipReactor = {:?}", right_peer_id); - assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id); - let right_peer_id = right_peer_id.unwrap(); - - let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - - log::trace!("Right GossipReactor should now connect to left GossipReactor"); - right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap(); - - log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply"); - match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await { - Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"), - Ok(Some(GossipReply::ConnectResult(Ok(())))) => { - log::trace!("Right GossipReactor is connected"); - assert!(true) - }, - Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), - Ok(None) => assert!(false, "No reply from right reactor received"), - } - } -} diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs deleted file mode 100644 index 6fe9d1a..0000000 --- a/lib/src/reactor/gossip/strategy.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::msg::GossipMessage; - -#[async_trait::async_trait] -pub trait GossipHandlingStrategy: Sync + Send { - async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>; -} - -pub struct LogStrategy; - -#[async_trait::async_trait] -impl GossipHandlingStrategy for LogStrategy { - async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> { - use std::convert::TryFrom; - match msg { - GossipMessage::CurrentProfileState { peer_id, cid } => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - - log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); - } - } - - Ok(()) - } -} diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs deleted file mode 100644 index 5299851..0000000 --- a/lib/src/reactor/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::sync::Arc; -use std::fmt::Debug; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; - -mod gossip; -mod device; -mod account; -mod ctrl; - -use ctrl::ReactorReceiver; - -#[async_trait::async_trait] -pub trait Reactor { - type Request: Debug + Send + Sync; - type Reply: Debug + Send + Sync; - - async fn run(self) -> Result<()>; -} - -pub trait ReactorBuilder { - type Reactor: Reactor; - - fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor; -} |