summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/src/gossip/deserializer.rs55
-rw-r--r--lib/src/gossip/handler.rs73
-rw-r--r--lib/src/gossip/mod.rs9
-rw-r--r--lib/src/gossip/msg.rs (renamed from lib/src/reactor/gossip/msg.rs)0
-rw-r--r--lib/src/lib.rs2
-rw-r--r--lib/src/reactor/account.rs19
-rw-r--r--lib/src/reactor/ctrl.rs15
-rw-r--r--lib/src/reactor/device.rs19
-rw-r--r--lib/src/reactor/gossip/ctrl.rs19
-rw-r--r--lib/src/reactor/gossip/mod.rs329
-rw-r--r--lib/src/reactor/gossip/strategy.rs31
-rw-r--r--lib/src/reactor/mod.rs28
12 files changed, 138 insertions, 461 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs
new file mode 100644
index 0000000..a50644f
--- /dev/null
+++ b/lib/src/gossip/deserializer.rs
@@ -0,0 +1,55 @@
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+
+use crate::gossip::GossipMessage;
+
+pub struct GossipDeserializer<ErrStrategy = LogStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ strategy: std::marker::PhantomData<ErrStrategy>,
+}
+
+impl<ErrStrategy> GossipDeserializer<ErrStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ pub fn new() -> Self {
+ Self {
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage>
+ where S: Stream<Item = ipfs::PubsubMessage>
+ {
+ input.filter_map(|message| async move {
+ log::trace!("Received gossip message");
+
+ match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) {
+ Ok(m) => Some(m),
+ Err(e) => {
+ ErrStrategy::handle_error(e);
+ None
+ }
+ }
+ })
+ }
+}
+
+pub trait GossipDeserializerErrorStrategy {
+ fn handle_error(err: anyhow::Error);
+}
+
+pub struct LogStrategy;
+impl GossipDeserializerErrorStrategy for LogStrategy {
+ fn handle_error(err: anyhow::Error) {
+ log::trace!("Error: {}", err);
+ }
+}
+
+pub struct IgnoreStrategy;
+impl GossipDeserializerErrorStrategy for IgnoreStrategy {
+ fn handle_error(_: anyhow::Error) {
+ ()
+ }
+}
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
new file mode 100644
index 0000000..e524da8
--- /dev/null
+++ b/lib/src/gossip/handler.rs
@@ -0,0 +1,73 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::gossip::GossipMessage;
+
+#[derive(Debug)]
+pub struct GossipHandler<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<RwLock<Profile>>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<Strat> GossipHandler<Strat>
+ where Strat: GossipHandlingStrategy + Sync + Send
+{
+ pub fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ Self {
+ profile,
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)>
+ where S: Stream<Item = (ipfs::PeerId, GossipMessage)>
+ {
+ input.then(move |(source, msg)| {
+ let pr = self.profile.clone();
+ async move {
+ log::trace!("Received gossip message from {}: {:?}", source, msg);
+ let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await;
+ (msg, res)
+ }
+ })
+ }
+}
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ use std::ops::Deref;
+
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(peer_id);
+ let cid = cid::Cid::try_from(cid.deref());
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs
new file mode 100644
index 0000000..d6a6963
--- /dev/null
+++ b/lib/src/gossip/mod.rs
@@ -0,0 +1,9 @@
+mod msg;
+pub use msg::GossipMessage;
+
+mod handler;
+pub use handler::*;
+
+mod deserializer;
+pub use deserializer::*;
+
diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/gossip/msg.rs
index 049fc68..049fc68 100644
--- a/lib/src/reactor/gossip/msg.rs
+++ b/lib/src/gossip/msg.rs
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index b7b05e2..e836c37 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -4,4 +4,4 @@ pub mod ipfs_client;
pub mod profile;
pub mod stream;
pub mod types;
-pub mod reactor;
+pub mod gossip;
diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs
deleted file mode 100644
index 59913b5..0000000
--- a/lib/src/reactor/account.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for account handling (following accounts, caching account updates) using the gossip
-//! module for the lower-level handling
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-
-#[derive(Debug)]
-pub struct AccountReactor(GossipReactor);
-
-impl AccountReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
-}
diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs
deleted file mode 100644
index a32f1c3..0000000
--- a/lib/src/reactor/ctrl.rs
+++ /dev/null
@@ -1,15 +0,0 @@
-use tokio::sync::mpsc::UnboundedSender as Sender;
-use tokio::sync::mpsc::UnboundedReceiver as Receiver;
-
-/// Type for sending messages to a reactor
-pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
-
-/// Type that is used by a reactor for receiving messages
-pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
-
-/// Type that represents the channel that has to be send with a request to a reactor for getting an
-/// answer back
-pub type ReplySender<Reply> = Sender<Reply>;
-
-pub type ReplyReceiver<Reply> = Receiver<Reply>;
-
diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs
deleted file mode 100644
index 1014ca1..0000000
--- a/lib/src/reactor/device.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for multi-device support functionality,
-//! which uses the gossip module for the lower-level handling
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-
-#[derive(Debug)]
-pub struct DeviceReactor(GossipReactor);
-
-impl DeviceReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
-}
diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs
deleted file mode 100644
index 68fbf06..0000000
--- a/lib/src/reactor/gossip/ctrl.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-use anyhow::Result;
-
-#[derive(Debug)]
-pub enum GossipRequest {
- Exit,
- Ping,
- PublishMe,
- Connect(ipfs::MultiaddrWithPeerId),
-}
-
-#[derive(Debug)]
-pub enum GossipReply {
- Exiting,
- Pong,
- NoHead,
- PublishMeResult(Result<()>),
- ConnectResult(Result<()>),
-}
-
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
deleted file mode 100644
index 7658509..0000000
--- a/lib/src/reactor/gossip/mod.rs
+++ /dev/null
@@ -1,329 +0,0 @@
-//! Low-level module for gossip'ing code
-//!
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-//!
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::Reactor;
-use crate::reactor::ReactorBuilder;
-use crate::reactor::ctrl::ReactorReceiver;
-use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplySender;
-
-mod ctrl;
-pub use ctrl::GossipRequest;
-pub use ctrl::GossipReply;
-
-mod msg;
-pub use msg::GossipMessage;
-
-mod strategy;
-pub use strategy::GossipHandlingStrategy;
-pub use strategy::LogStrategy;
-
-#[derive(Debug)]
-pub struct GossipReactorBuilder {
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
-}
-
-impl GossipReactorBuilder {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
- Self { profile, gossip_topic_name }
- }
-}
-
-impl ReactorBuilder for GossipReactorBuilder {
- type Reactor = GossipReactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
- GossipReactor {
- profile: self.profile,
- gossip_topic_name: self.gossip_topic_name,
- receiver: rr,
- strategy: std::marker::PhantomData,
- }
- }
-}
-
-pub struct GossipReactor<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
-{
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
- receiver: ReactorReceiver<GossipRequest, GossipReply>,
- strategy: std::marker::PhantomData<Strategy>,
-}
-
-impl<S> std::fmt::Debug for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
- }
-}
-
-impl<S> GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(reply) {
- anyhow::bail!("Failed to send GossipReply::NoHead)")
- }
-
- Ok(())
- }
-
- async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
- let profile = self.profile.read().await;
-
- let head = profile.head();
- if head.is_none() {
- Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
- return Ok(())
- }
- let head = head.unwrap().to_bytes();
-
- let own_id = match profile.client().own_id().await {
- Ok(id) => id,
- Err(e) => {
- Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
- return Ok(()) // TODO: abort operation here for now, maybe not the best idea
- }
- };
-
- let publish_res = profile
- .client()
- .ipfs
- .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
- peer_id: own_id.to_bytes(),
- cid: head
- }.into_bytes()?)
- .await;
-
- match publish_res {
- Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
- Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
- }
- }
-
- async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
- log::trace!("Connecting GossipReactor with {:?}", addr);
- self.profile.read().await.client().connect(addr).await
- }
-
- #[cfg(test)]
- async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
- self.profile
- .read()
- .await
- .client()
- .ipfs
- .peers()
- .await
- .map(|connections| {
- connections.iter().any(|connection| connection.addr == addr)
- })
- }
-
-}
-
-#[async_trait::async_trait]
-impl<S> Reactor for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- type Request = GossipRequest;
- type Reply = GossipReply;
-
- async fn run(mut self) -> Result<()> {
- use futures::stream::StreamExt;
-
- log::trace!("Booting {:?}", self);
- let mut subscription_stream = self.profile
- .read()
- .await
- .client()
- .ipfs
- .pubsub_subscribe(self.gossip_topic_name.clone())
- .await?;
-
- log::trace!("{:?} main loop", self);
- loop {
- tokio::select! {
- next_control_msg = self.receiver.recv() => {
- log::trace!("Received control message: {:?}", next_control_msg);
- match next_control_msg {
- None => break,
- Some((GossipRequest::Exit, reply_channel)) => {
- if let Err(_) = reply_channel.send(GossipReply::Exiting) {
- anyhow::bail!("Failed sending EXITING reply")
- }
- break
- },
-
- Some((GossipRequest::Ping, reply_channel)) => {
- log::trace!("Replying with Pong");
- if let Err(_) = reply_channel.send(GossipReply::Pong) {
- anyhow::bail!("Failed sending PONG reply")
- }
- },
-
- Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
-
- Some((GossipRequest::Connect(addr), reply_channel)) => {
- let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
- if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
- anyhow::bail!("Failed sending Connect({}) reply", addr)
- }
- },
- }
- }
-
- next_gossip_message = subscription_stream.next() => {
- if let Some(msg) = next_gossip_message {
- log::trace!("Received gossip message");
- match serde_json::from_slice(&msg.data) {
- Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
- Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
- }
- } else {
- log::trace!("Gossip stream closed, breaking reactor loop");
- break;
- }
- }
- }
- }
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use std::convert::TryFrom;
- use std::sync::Arc;
- use tokio::sync::RwLock;
-
- #[tokio::test]
- async fn test_gossip_reactor_simple() {
- let _ = env_logger::try_init();
-
- let profile = Profile::new_inmemory("test-gossip-reactor-simple").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
-
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Ping, reply_sender)).unwrap();
-
- let mut pong_received = false;
- let _ = tokio::spawn(async move {
- reactor.run().await
- });
-
- match reply_receiver.recv().await {
- Some(GossipReply::Pong) => {
- pong_received = true;
- log::trace!("Pong received!");
- let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Exit, reply_sender)).unwrap();
- }
- Some(r) => {
- assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
- }
- None => {
- // nothing
- }
- }
-
- assert!(pong_received, "No pong received");
- }
-
- #[tokio::test]
- async fn test_gossip_reactor_gossipping() {
- let _ = env_logger::try_init();
-
- let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
- let (left_profile, left_reactor, left_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built left GossipReactor");
-
- let (right_profile, right_reactor, right_rx) = {
- let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await;
- assert!(profile.is_ok());
- let profile = Arc::new(RwLock::new(profile.unwrap()));
-
- let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
- let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
- (profile, reactor, rx)
- };
- log::trace!("Built right GossipReactor");
-
- async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
- profile.read()
- .await
- .client()
- .ipfs
- .identity()
- .await
- .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
- .and_then(|(peerid, mut addr)| {
- ipfs::MultiaddrWithPeerId::try_from({
- addr.pop().expect("At least one address for client")
- })
- .map_err(anyhow::Error::from)
- })
- }
-
- let left_running_reactor = tokio::spawn(async move {
- left_reactor.run().await
- });
-
- let right_running_reactor = tokio::spawn(async move {
- right_reactor.run().await
- });
-
- let left_peer_id = get_peer_id(left_profile.clone()).await;
- log::trace!("Left GossipReactor = {:?}", left_peer_id);
- assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
- let left_peer_id = left_peer_id.unwrap();
-
- let right_peer_id = get_peer_id(right_profile.clone()).await;
- log::trace!("Right GossipReactor = {:?}", right_peer_id);
- assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
- let right_peer_id = right_peer_id.unwrap();
-
- let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor");
- right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
-
- log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
- match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
- Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
- Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
- log::trace!("Right GossipReactor is connected");
- assert!(true)
- },
- Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
- Ok(None) => assert!(false, "No reply from right reactor received"),
- }
- }
-}
diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs
deleted file mode 100644
index 6fe9d1a..0000000
--- a/lib/src/reactor/gossip/strategy.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::msg::GossipMessage;
-
-#[async_trait::async_trait]
-pub trait GossipHandlingStrategy: Sync + Send {
- async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>;
-}
-
-pub struct LogStrategy;
-
-#[async_trait::async_trait]
-impl GossipHandlingStrategy for LogStrategy {
- async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> {
- use std::convert::TryFrom;
- match msg {
- GossipMessage::CurrentProfileState { peer_id, cid } => {
- let peer_id = ipfs::PeerId::from_bytes(&peer_id);
- let cid = cid::Cid::try_from(&*cid);
-
- log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
- }
- }
-
- Ok(())
- }
-}
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
deleted file mode 100644
index 5299851..0000000
--- a/lib/src/reactor/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-use std::sync::Arc;
-use std::fmt::Debug;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-
-mod gossip;
-mod device;
-mod account;
-mod ctrl;
-
-use ctrl::ReactorReceiver;
-
-#[async_trait::async_trait]
-pub trait Reactor {
- type Request: Debug + Send + Sync;
- type Reply: Debug + Send + Sync;
-
- async fn run(self) -> Result<()>;
-}
-
-pub trait ReactorBuilder {
- type Reactor: Reactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor;
-}