From a38a0babea989e979cb2bf7bdcb05c6c7aa0dfe9 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 7 Dec 2021 15:10:54 +0100 Subject: Add support for storing other known devices in Profile Signed-off-by: Matthias Beyer --- lib/src/profile/device.rs | 31 +++++++++++++++++++++++++++++++ lib/src/profile/mod.rs | 1 + lib/src/profile/state.rs | 25 +++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 lib/src/profile/device.rs diff --git a/lib/src/profile/device.rs b/lib/src/profile/device.rs new file mode 100644 index 0000000..daeab21 --- /dev/null +++ b/lib/src/profile/device.rs @@ -0,0 +1,31 @@ +use std::convert::TryFrom; +use std::convert::TryInto; +use anyhow::Result; + +#[derive(Clone, Debug)] +pub struct Device { + device_id: libp2p::identity::ed25519::PublicKey, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct DeviceSaveable { + device_id: Vec, +} + +impl TryFrom for DeviceSaveable { + type Error = anyhow::Error; + + fn try_from(device: Device) -> Result { + Ok(DeviceSaveable { device_id: device.device_id.encode().to_vec() }) + } +} + +impl TryInto for DeviceSaveable { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + libp2p::identity::ed25519::PublicKey::decode(&self.device_id) + .map(|device_id| Device { device_id }) + .map_err(anyhow::Error::from) + } +} diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs index 050d53d..d3e0712 100644 --- a/lib/src/profile/mod.rs +++ b/lib/src/profile/mod.rs @@ -8,6 +8,7 @@ use crate::client::Client; use crate::config::Config; use crate::ipfs_client::IpfsClient; +mod device; mod state; use state::*; diff --git a/lib/src/profile/state.rs b/lib/src/profile/state.rs index 33f0bd6..921a2bf 100644 --- a/lib/src/profile/state.rs +++ b/lib/src/profile/state.rs @@ -6,6 +6,9 @@ use anyhow::Context; use anyhow::Result; use tokio::io::AsyncWriteExt; +use crate::profile::device::Device; +use crate::profile::device::DeviceSaveable; + #[derive(Debug)] pub struct StateDir(PathBuf); @@ -39,6 +42,9 @@ pub struct ProfileState { #[getset(get = "pub")] keypair: libp2p::identity::Keypair, + + #[getset(get = "pub")] + other_devices: Vec, } impl ProfileState { @@ -46,7 +52,8 @@ impl ProfileState { Self { profile_head: None, profile_name, - keypair + keypair, + other_devices: vec![], } } @@ -67,6 +74,7 @@ pub(super) struct ProfileStateSaveable { profile_head: Option>, profile_name: String, keypair: Vec, + other_devices: Vec } impl ProfileStateSaveable { @@ -77,7 +85,14 @@ impl ProfileStateSaveable { keypair: match s.keypair { libp2p::identity::Keypair::Ed25519(ref kp) => Vec::from(kp.encode()), _ => anyhow::bail!("Only keypair type ed25519 supported"), - } + }, + other_devices: { + s.other_devices + .iter() + .cloned() + .map(DeviceSaveable::try_from) + .collect::>>()? + }, }) } @@ -127,6 +142,12 @@ impl TryInto for ProfileStateSaveable { let kp = libp2p::identity::ed25519::Keypair::decode(&mut self.keypair)?; libp2p::identity::Keypair::Ed25519(kp) }, + other_devices: { + self.other_devices + .into_iter() + .map(DeviceSaveable::try_into) + .collect::>>()? + }, }) } } -- cgit v1.2.3 From 928b82a6a9d3e06dbdbc7a2ea713fb5555134731 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 7 Dec 2021 15:12:33 +0100 Subject: Add interfaces for adding devices Signed-off-by: Matthias Beyer --- lib/src/profile/mod.rs | 5 +++++ lib/src/profile/state.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs index d3e0712..44f53d4 100644 --- a/lib/src/profile/mod.rs +++ b/lib/src/profile/mod.rs @@ -9,6 +9,7 @@ use crate::config::Config; use crate::ipfs_client::IpfsClient; mod device; +use device::Device; mod state; use state::*; @@ -165,6 +166,10 @@ impl Profile { self.client.exit().await } + + pub fn add_device(&mut self, d: Device) -> Result<()> { + self.state.add_device(d) + } } diff --git a/lib/src/profile/state.rs b/lib/src/profile/state.rs index 921a2bf..f20135e 100644 --- a/lib/src/profile/state.rs +++ b/lib/src/profile/state.rs @@ -61,6 +61,11 @@ impl ProfileState { self.profile_head = Some(cid); Ok(()) // reserved for future use } + + pub(super) fn add_device(&mut self, d: Device) -> Result<()> { + self.other_devices.push(d); + Ok(()) // reserved for future use + } } impl std::fmt::Debug for ProfileState { -- cgit v1.2.3 From 81ac4baec0746fd5df328e6af25d88c80c0fef4f Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 7 Dec 2021 15:49:13 +0100 Subject: Add Reactor type Signed-off-by: Matthias Beyer --- lib/src/lib.rs | 1 + lib/src/reactor/mod.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 lib/src/reactor/mod.rs diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 0cccef9..b86077a 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -5,3 +5,4 @@ pub mod ipfs_client; pub mod profile; pub mod stream; pub mod types; +pub mod reactor; diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs new file mode 100644 index 0000000..70088a3 --- /dev/null +++ b/lib/src/reactor/mod.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; + +/// Reactor type, for running the application logic +/// +/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and +/// keeping profile updates of other accounts, communication on the gossipsub topics... etc +#[derive(Debug)] +pub struct Reactor { + profile: Arc>, +} + +impl Reactor { + pub fn new(profile: Profile) -> Self { + Reactor { + profile: Arc::new(RwLock::new(profile)), + } + } + + pub async fn head(&self) -> Option { + self.profile.read().await.head().map(cid::Cid::clone) + } + + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { + self.profile.read().await.connect(peer).await + } + + pub fn profile(&self) -> Arc> { + self.profile.clone() + } + + pub async fn exit(self) -> Result<()> { + let mut inner = self.profile; + loop { + match Arc::try_unwrap(inner) { + Err(arc) => inner = arc, + Ok(inner) => return inner.into_inner().exit().await, + } + } + } + +} -- cgit v1.2.3 From cecb6a4f0ffc28143ac9d437048484c83a37d277 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 7 Dec 2021 15:58:00 +0100 Subject: Add submodules for behaviour-specific reactors Signed-off-by: Matthias Beyer --- lib/src/reactor/account.rs | 19 +++++++++++++++++++ lib/src/reactor/device.rs | 19 +++++++++++++++++++ lib/src/reactor/gossip.rs | 24 ++++++++++++++++++++++++ lib/src/reactor/mod.rs | 15 +++++++++++++++ 4 files changed, 77 insertions(+) create mode 100644 lib/src/reactor/account.rs create mode 100644 lib/src/reactor/device.rs create mode 100644 lib/src/reactor/gossip.rs diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs new file mode 100644 index 0000000..c6be1ce --- /dev/null +++ b/lib/src/reactor/account.rs @@ -0,0 +1,19 @@ +//! Module for account handling (following accounts, caching account updates) using the gossip +//! module for the lower-level handling + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::GossipReactor; + +#[derive(Debug)] +pub struct AccountReactor(GossipReactor); + +impl AccountReactor { + pub(super) fn new(profile: Arc>) -> Self { + Self(GossipReactor::new(profile)) + } +} diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs new file mode 100644 index 0000000..c013db6 --- /dev/null +++ b/lib/src/reactor/device.rs @@ -0,0 +1,19 @@ +//! Module for multi-device support functionality, +//! which uses the gossip module for the lower-level handling + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::GossipReactor; + +#[derive(Debug)] +pub struct DeviceReactor(GossipReactor); + +impl DeviceReactor { + pub(super) fn new(profile: Arc>) -> Self { + Self(GossipReactor::new(profile)) + } +} diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs new file mode 100644 index 0000000..e695dac --- /dev/null +++ b/lib/src/reactor/gossip.rs @@ -0,0 +1,24 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; + +#[derive(Debug)] +pub struct GossipReactor { + profile: Arc>, +} + +impl GossipReactor { + pub(super) fn new(profile: Arc>) -> Self { + Self { profile } + } +} + diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index 70088a3..d55f5c6 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -5,6 +5,10 @@ use tokio::sync::RwLock; use crate::profile::Profile; +mod gossip; +mod device; +mod account; + /// Reactor type, for running the application logic /// /// The Reactor runs the whole application logic, that is syncing with other devices, fetching and @@ -43,4 +47,15 @@ impl Reactor { } } + /// Run the reactor + /// + /// Starts all inner functionality and exposes things + /// + /// # Return + /// + /// Return types are WIP, as this must return "running" objects that can be communicated with + pub async fn run(self) -> Result<()> { + unimplemented!() + } + } -- cgit v1.2.3 From 57a595b521a43da2c61c44fd6257477a1a141eb8 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:47:42 +0100 Subject: Remove example new() impls Signed-off-by: Matthias Beyer --- lib/src/reactor/account.rs | 2 +- lib/src/reactor/device.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs index c6be1ce..59913b5 100644 --- a/lib/src/reactor/account.rs +++ b/lib/src/reactor/account.rs @@ -14,6 +14,6 @@ pub struct AccountReactor(GossipReactor); impl AccountReactor { pub(super) fn new(profile: Arc>) -> Self { - Self(GossipReactor::new(profile)) + unimplemented!() } } diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs index c013db6..1014ca1 100644 --- a/lib/src/reactor/device.rs +++ b/lib/src/reactor/device.rs @@ -14,6 +14,6 @@ pub struct DeviceReactor(GossipReactor); impl DeviceReactor { pub(super) fn new(profile: Arc>) -> Self { - Self(GossipReactor::new(profile)) + unimplemented!() } } -- cgit v1.2.3 From ea0f2c399320b7bf0f51ada34451bc21100dbafa Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:47:56 +0100 Subject: Add types for controlling a reactor implementation Signed-off-by: Matthias Beyer --- lib/src/reactor/ctrl.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 lib/src/reactor/ctrl.rs diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs new file mode 100644 index 0000000..92246a9 --- /dev/null +++ b/lib/src/reactor/ctrl.rs @@ -0,0 +1,36 @@ +use std::fmt::Debug; + +use tokio::sync::mpsc::UnboundedSender as Sender; +use tokio::sync::mpsc::UnboundedReceiver as Receiver; + +/// Type for sending messages to a reactor +pub type ReactorSender = Sender<(ReactorRequest, ReplyChannel)>; + +/// Type that is used by a reactor for receiving messages +pub type ReactorReceiver = Receiver<(ReactorRequest, ReplyChannel)>; + +/// Type that represents the channel that has to be send with a request to a reactor for getting an +/// answer back +pub type ReplyChannel = Sender>; + +pub type ReplyReceiver = Receiver>; + +/// Send control messages to the reactor +#[derive(Debug)] +pub enum ReactorRequest { + /// check if the reactor still responds + Ping, + + /// Quit the reactor + Exit, + + Custom(CustomRequest), +} + +#[derive(Debug)] +pub enum ReactorReply { + Pong, + Exiting, + + Custom(CustomReply), +} -- cgit v1.2.3 From ec205af0d61ad834bd3033219c2590a7d83fdc9b Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:48:08 +0100 Subject: Add abstract reactor implementation Signed-off-by: Matthias Beyer --- lib/src/reactor/mod.rs | 67 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index d55f5c6..b4aa597 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::fmt::Debug; use anyhow::Result; use tokio::sync::RwLock; @@ -8,21 +9,39 @@ use crate::profile::Profile; mod gossip; mod device; mod account; +mod ctrl; + +pub use ctrl::ReactorReceiver; +pub use ctrl::ReactorReply; +pub use ctrl::ReactorRequest; +pub use ctrl::ReactorSender; +pub use ctrl::ReplyChannel; /// Reactor type, for running the application logic /// /// The Reactor runs the whole application logic, that is syncing with other devices, fetching and /// keeping profile updates of other accounts, communication on the gossipsub topics... etc #[derive(Debug)] -pub struct Reactor { +pub(super) struct Reactor + where CustomReactorRequest: Debug + Send + Sync, + CustomReactorReply: Debug + Send + Sync +{ profile: Arc>, + rx: ReactorReceiver, } -impl Reactor { - pub fn new(profile: Profile) -> Self { - Reactor { - profile: Arc::new(RwLock::new(profile)), - } +impl Reactor + where CustomReactorRequest: Debug + Send + Sync, + CustomReactorReply: Debug + Send + Sync +{ + pub(super) fn new(profile: Arc>) -> (Self, ReactorSender) { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = Reactor { + profile, + rx, + }; + + (reactor, tx) } pub async fn head(&self) -> Option { @@ -47,15 +66,33 @@ impl Reactor { } } - /// Run the reactor - /// - /// Starts all inner functionality and exposes things - /// - /// # Return - /// - /// Return types are WIP, as this must return "running" objects that can be communicated with - pub async fn run(self) -> Result<()> { - unimplemented!() + pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { + self.rx.recv().await + } + + /// Process the request if it is not a specialized request, + /// return the specialized request if it is one and cannot be processed by this reactor + /// implementation + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result)>> { + match request { + (ReactorRequest::Ping, reply_channel) => { + if let Err(_) = reply_channel.send(ReactorReply::Pong) { + anyhow::bail!("Failed sending PONG reply") + } + Ok(None) + }, + + (ReactorRequest::Exit, reply_channel) => { + if let Err(_) = reply_channel.send(ReactorReply::Exiting) { + anyhow::bail!("Failed sending EXITING reply") + } + Ok(None) + }, + + (ReactorRequest::Custom(c), reply_channel) => { + Ok(Some((c, reply_channel))) + } + } } } -- cgit v1.2.3 From 6bc7a060fd42c06b76406eb4e8976f449afb59c6 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:48:18 +0100 Subject: Add basis for gossip reactor implementation Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs index e695dac..998112f 100644 --- a/lib/src/reactor/gossip.rs +++ b/lib/src/reactor/gossip.rs @@ -10,15 +10,60 @@ use anyhow::Result; use tokio::sync::RwLock; use crate::profile::Profile; +use crate::reactor::Reactor; +use crate::reactor::ctrl::ReactorReceiver; +use crate::reactor::ctrl::ReactorReply; +use crate::reactor::ctrl::ReactorRequest; +use crate::reactor::ctrl::ReactorSender; +use crate::reactor::ctrl::ReplyChannel; + #[derive(Debug)] pub struct GossipReactor { - profile: Arc>, + inner: Reactor, +} + +#[derive(Debug)] +pub enum GossipRequest { + Ping, +} + +#[derive(Debug)] +pub enum GossipReply { + Pong, } impl GossipReactor { - pub(super) fn new(profile: Arc>) -> Self { - Self { profile } + pub fn new(profile: Arc>) -> (Self, ReactorSender) { + let (inner, sender) = Reactor::::new(profile); + (Self { inner }, sender) + } + + pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { + self.inner.receive_next_message().await + } + + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result<()> { + match self.inner.process_reactor_message(request).await? { + None => Ok(()), + Some((GossipRequest::Ping, reply_channel)) => { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) { + anyhow::bail!("Failed sening PONG reply") + } + + Ok(()) + } + } + } + + pub async fn run(mut self) -> Result<()> { + loop { + match self.receive_next_message().await { + None => break, + Some(tpl) => self.process_reactor_message(tpl).await?, + } + } + Ok(()) } } -- cgit v1.2.3 From 3767ecd55bbfea76afc1ae80a0dc7911832a9aa1 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:51:20 +0100 Subject: Move ReactorRequest/ReactorReply to Reactor module Signed-off-by: Matthias Beyer --- lib/src/reactor/ctrl.rs | 22 +++------------------- lib/src/reactor/gossip.rs | 4 ++-- lib/src/reactor/mod.rs | 22 ++++++++++++++++++++-- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs index 92246a9..a745394 100644 --- a/lib/src/reactor/ctrl.rs +++ b/lib/src/reactor/ctrl.rs @@ -3,6 +3,9 @@ use std::fmt::Debug; use tokio::sync::mpsc::UnboundedSender as Sender; use tokio::sync::mpsc::UnboundedReceiver as Receiver; +use crate::reactor::ReactorReply; +use crate::reactor::ReactorRequest; + /// Type for sending messages to a reactor pub type ReactorSender = Sender<(ReactorRequest, ReplyChannel)>; @@ -15,22 +18,3 @@ pub type ReplyChannel = Sender>; pub type ReplyReceiver = Receiver>; -/// Send control messages to the reactor -#[derive(Debug)] -pub enum ReactorRequest { - /// check if the reactor still responds - Ping, - - /// Quit the reactor - Exit, - - Custom(CustomRequest), -} - -#[derive(Debug)] -pub enum ReactorReply { - Pong, - Exiting, - - Custom(CustomReply), -} diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs index 998112f..ba950db 100644 --- a/lib/src/reactor/gossip.rs +++ b/lib/src/reactor/gossip.rs @@ -11,9 +11,9 @@ use tokio::sync::RwLock; use crate::profile::Profile; use crate::reactor::Reactor; +use crate::reactor::ReactorReply; +use crate::reactor::ReactorRequest; use crate::reactor::ctrl::ReactorReceiver; -use crate::reactor::ctrl::ReactorReply; -use crate::reactor::ctrl::ReactorRequest; use crate::reactor::ctrl::ReactorSender; use crate::reactor::ctrl::ReplyChannel; diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index b4aa597..9e22b1e 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -12,11 +12,29 @@ mod account; mod ctrl; pub use ctrl::ReactorReceiver; -pub use ctrl::ReactorReply; -pub use ctrl::ReactorRequest; pub use ctrl::ReactorSender; pub use ctrl::ReplyChannel; +/// Send control messages to the reactor +#[derive(Debug)] +pub enum ReactorRequest { + /// check if the reactor still responds + Ping, + + /// Quit the reactor + Exit, + + Custom(CustomRequest), +} + +#[derive(Debug)] +pub enum ReactorReply { + Pong, + Exiting, + + Custom(CustomReply), +} + /// Reactor type, for running the application logic /// /// The Reactor runs the whole application logic, that is syncing with other devices, fetching and -- cgit v1.2.3 From e04c9d592567292f1701ff37e17ccc9181e96dc6 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:58:59 +0100 Subject: Implement connecting to other peer Signed-off-by: Matthias Beyer --- lib/src/reactor/mod.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index 9e22b1e..c35cecc 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -24,6 +24,8 @@ pub enum ReactorRequest { /// Quit the reactor Exit, + Connect(ipfs::MultiaddrWithPeerId), + Custom(CustomRequest), } @@ -32,6 +34,8 @@ pub enum ReactorReply { Pong, Exiting, + ConnectResult((Result<()>, ipfs::MultiaddrWithPeerId)), + Custom(CustomReply), } @@ -107,6 +111,22 @@ impl Reactor { + match self.profile.read().await.client().connect(addr.clone()).await { + Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::ConnectResult((Ok(()), addr.clone()))) { + anyhow::bail!("Failed sending ConnectResult({}, Ok(()))", addr) + } else { + Ok(None) + } + + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::ConnectResult((Err(e), addr.clone()))) { + anyhow::bail!("Failed sending ConnectResult({}, Err(_))", addr) + } else { + Ok(None) + } + } + } + (ReactorRequest::Custom(c), reply_channel) => { Ok(Some((c, reply_channel))) } -- cgit v1.2.3 From 20f376f70c107e686af11d6f09284c4fafc48d1a Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 17:56:41 +0100 Subject: Add Client::own_id() for getting own PeerId Signed-off-by: Matthias Beyer --- lib/src/client.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/src/client.rs b/lib/src/client.rs index da8ad94..4d4a971 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -33,6 +33,14 @@ impl Client { Ok(()) } + pub async fn own_id(&self) -> Result { + self.ipfs + .identity() + .await + .map(|id| id.0.into_peer_id()) + .map_err(anyhow::Error::from) + } + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { self.ipfs.connect(peer).await } -- cgit v1.2.3 From 5856b8b59044c5c3dc007a35191931fb13b6aef1 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 17:56:57 +0100 Subject: Add implementation for first GossipMessage Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip.rs | 114 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 6 deletions(-) diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs index ba950db..0da0c97 100644 --- a/lib/src/reactor/gossip.rs +++ b/lib/src/reactor/gossip.rs @@ -21,22 +21,48 @@ use crate::reactor::ctrl::ReplyChannel; #[derive(Debug)] pub struct GossipReactor { inner: Reactor, + gossip_topic_name: String, } #[derive(Debug)] pub enum GossipRequest { Ping, + PublishMe, } #[derive(Debug)] pub enum GossipReply { Pong, + NoHead, + PublishMeResult(Result<()>), } +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum GossipMessage { + CurrentProfileState { + peer_id: Vec, + cid: Vec, + }, +} + +impl GossipMessage { + fn into_bytes(self) -> Result> { + serde_json::to_string(&self) + .map(String::into_bytes) + .map_err(anyhow::Error::from) + } +} + + impl GossipReactor { - pub fn new(profile: Arc>) -> (Self, ReactorSender) { + pub fn new(profile: Arc>, gossip_topic_name: String) -> (Self, ReactorSender) { let (inner, sender) = Reactor::::new(profile); - (Self { inner }, sender) + let reactor = Self { + inner, + gossip_topic_name, + }; + + (reactor, sender) } pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { @@ -52,15 +78,91 @@ impl GossipReactor { } Ok(()) - } + }, + + Some((GossipRequest::PublishMe, reply_channel)) => { + let profile = self.inner.profile(); + let profile = profile.read().await; + + let head = profile.head(); + if head.is_none() { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) { + anyhow::bail!("Failed to send GossipReply::NoHead)") + } + } + let head = head.unwrap().to_bytes(); + + let own_id = match profile.client().own_id().await { + Ok(id) => id, + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") + } else { + return Ok(()) // TODO: abort operation here for now, maybe not the best idea + } + }; + + let publish_res = profile + .client() + .ipfs + .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { + peer_id: own_id.to_bytes(), + cid: head + }.into_bytes()?) + .await; + + match publish_res { + Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))") + } else { + Ok(()) + }, + + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") + } else { + Ok(()) + } + + } + }, } } pub async fn run(mut self) -> Result<()> { + use std::convert::TryFrom; + use futures::stream::StreamExt; + + let mut subscription_stream = self.inner.profile() + .read() + .await + .client() + .ipfs + .pubsub_subscribe(self.gossip_topic_name.clone()) + .await?; + loop { - match self.receive_next_message().await { - None => break, - Some(tpl) => self.process_reactor_message(tpl).await?, + tokio::select! { + next_control_msg = self.receive_next_message() => { + match next_control_msg { + None => break, + Some(tpl) => self.process_reactor_message(tpl).await?, + } + } + + next_gossip_message = subscription_stream.next() => { + if let Some(next_gossip_message) = next_gossip_message { + match serde_json::from_slice(&next_gossip_message.data) { + Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), + Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + log::trace!("Peer {:?} is at {:?}", peer_id, cid) + } + } + } else { + break; + } + } } } Ok(()) -- cgit v1.2.3 From da6f7fc5e837c96e5e975e41388324a0c387eff1 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 17:59:29 +0100 Subject: Refactor: Split gossip module into submodules Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip.rs | 171 ----------------------------------------- lib/src/reactor/gossip/ctrl.rs | 15 ++++ lib/src/reactor/gossip/mod.rs | 148 +++++++++++++++++++++++++++++++++++ lib/src/reactor/gossip/msg.rs | 17 ++++ 4 files changed, 180 insertions(+), 171 deletions(-) delete mode 100644 lib/src/reactor/gossip.rs create mode 100644 lib/src/reactor/gossip/ctrl.rs create mode 100644 lib/src/reactor/gossip/mod.rs create mode 100644 lib/src/reactor/gossip/msg.rs diff --git a/lib/src/reactor/gossip.rs b/lib/src/reactor/gossip.rs deleted file mode 100644 index 0da0c97..0000000 --- a/lib/src/reactor/gossip.rs +++ /dev/null @@ -1,171 +0,0 @@ -//! Low-level module for gossip'ing code -//! -//! This module implements the low-level gossiping functionality that other modules use to -//! implement actual behaviours on -//! - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::Reactor; -use crate::reactor::ReactorReply; -use crate::reactor::ReactorRequest; -use crate::reactor::ctrl::ReactorReceiver; -use crate::reactor::ctrl::ReactorSender; -use crate::reactor::ctrl::ReplyChannel; - - -#[derive(Debug)] -pub struct GossipReactor { - inner: Reactor, - gossip_topic_name: String, -} - -#[derive(Debug)] -pub enum GossipRequest { - Ping, - PublishMe, -} - -#[derive(Debug)] -pub enum GossipReply { - Pong, - NoHead, - PublishMeResult(Result<()>), -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum GossipMessage { - CurrentProfileState { - peer_id: Vec, - cid: Vec, - }, -} - -impl GossipMessage { - fn into_bytes(self) -> Result> { - serde_json::to_string(&self) - .map(String::into_bytes) - .map_err(anyhow::Error::from) - } -} - - -impl GossipReactor { - pub fn new(profile: Arc>, gossip_topic_name: String) -> (Self, ReactorSender) { - let (inner, sender) = Reactor::::new(profile); - let reactor = Self { - inner, - gossip_topic_name, - }; - - (reactor, sender) - } - - pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { - self.inner.receive_next_message().await - } - - pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result<()> { - match self.inner.process_reactor_message(request).await? { - None => Ok(()), - Some((GossipRequest::Ping, reply_channel)) => { - if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) { - anyhow::bail!("Failed sening PONG reply") - } - - Ok(()) - }, - - Some((GossipRequest::PublishMe, reply_channel)) => { - let profile = self.inner.profile(); - let profile = profile.read().await; - - let head = profile.head(); - if head.is_none() { - if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) { - anyhow::bail!("Failed to send GossipReply::NoHead)") - } - } - let head = head.unwrap().to_bytes(); - - let own_id = match profile.client().own_id().await { - Ok(id) => id, - Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { - anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") - } else { - return Ok(()) // TODO: abort operation here for now, maybe not the best idea - } - }; - - let publish_res = profile - .client() - .ipfs - .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { - peer_id: own_id.to_bytes(), - cid: head - }.into_bytes()?) - .await; - - match publish_res { - Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) { - anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))") - } else { - Ok(()) - }, - - Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { - anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") - } else { - Ok(()) - } - - } - }, - } - } - - pub async fn run(mut self) -> Result<()> { - use std::convert::TryFrom; - use futures::stream::StreamExt; - - let mut subscription_stream = self.inner.profile() - .read() - .await - .client() - .ipfs - .pubsub_subscribe(self.gossip_topic_name.clone()) - .await?; - - loop { - tokio::select! { - next_control_msg = self.receive_next_message() => { - match next_control_msg { - None => break, - Some(tpl) => self.process_reactor_message(tpl).await?, - } - } - - next_gossip_message = subscription_stream.next() => { - if let Some(next_gossip_message) = next_gossip_message { - match serde_json::from_slice(&next_gossip_message.data) { - Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), - Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - log::trace!("Peer {:?} is at {:?}", peer_id, cid) - } - } - } else { - break; - } - } - } - } - Ok(()) - } -} - diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs new file mode 100644 index 0000000..5e2e805 --- /dev/null +++ b/lib/src/reactor/gossip/ctrl.rs @@ -0,0 +1,15 @@ +use anyhow::Result; + +#[derive(Debug)] +pub enum GossipRequest { + Ping, + PublishMe, +} + +#[derive(Debug)] +pub enum GossipReply { + Pong, + NoHead, + PublishMeResult(Result<()>), +} + diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs new file mode 100644 index 0000000..884f42a --- /dev/null +++ b/lib/src/reactor/gossip/mod.rs @@ -0,0 +1,148 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::Reactor; +use crate::reactor::ReactorReply; +use crate::reactor::ReactorRequest; +use crate::reactor::ctrl::ReactorReceiver; +use crate::reactor::ctrl::ReactorSender; +use crate::reactor::ctrl::ReplyChannel; + +mod ctrl; +pub use ctrl::GossipRequest; +pub use ctrl::GossipReply; + +mod msg; +pub use msg::GossipMessage; + +#[derive(Debug)] +pub struct GossipReactor { + inner: Reactor, + gossip_topic_name: String, +} + + +impl GossipReactor { + pub fn new(profile: Arc>, gossip_topic_name: String) -> (Self, ReactorSender) { + let (inner, sender) = Reactor::::new(profile); + let reactor = Self { + inner, + gossip_topic_name, + }; + + (reactor, sender) + } + + pub async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { + self.inner.receive_next_message().await + } + + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result<()> { + match self.inner.process_reactor_message(request).await? { + None => Ok(()), + Some((GossipRequest::Ping, reply_channel)) => { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::Pong)) { + anyhow::bail!("Failed sening PONG reply") + } + + Ok(()) + }, + + Some((GossipRequest::PublishMe, reply_channel)) => { + let profile = self.inner.profile(); + let profile = profile.read().await; + + let head = profile.head(); + if head.is_none() { + if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) { + anyhow::bail!("Failed to send GossipReply::NoHead)") + } + } + let head = head.unwrap().to_bytes(); + + let own_id = match profile.client().own_id().await { + Ok(id) => id, + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") + } else { + return Ok(()) // TODO: abort operation here for now, maybe not the best idea + } + }; + + let publish_res = profile + .client() + .ipfs + .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { + peer_id: own_id.to_bytes(), + cid: head + }.into_bytes()?) + .await; + + match publish_res { + Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))") + } else { + Ok(()) + }, + + Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { + anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") + } else { + Ok(()) + } + + } + }, + } + } + + pub async fn run(mut self) -> Result<()> { + use std::convert::TryFrom; + use futures::stream::StreamExt; + + let mut subscription_stream = self.inner.profile() + .read() + .await + .client() + .ipfs + .pubsub_subscribe(self.gossip_topic_name.clone()) + .await?; + + loop { + tokio::select! { + next_control_msg = self.receive_next_message() => { + match next_control_msg { + None => break, + Some(tpl) => self.process_reactor_message(tpl).await?, + } + } + + next_gossip_message = subscription_stream.next() => { + if let Some(next_gossip_message) = next_gossip_message { + match serde_json::from_slice(&next_gossip_message.data) { + Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), + Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + log::trace!("Peer {:?} is at {:?}", peer_id, cid) + } + } + } else { + break; + } + } + } + } + Ok(()) + } +} + diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/reactor/gossip/msg.rs new file mode 100644 index 0000000..049fc68 --- /dev/null +++ b/lib/src/reactor/gossip/msg.rs @@ -0,0 +1,17 @@ +use anyhow::Result; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum GossipMessage { + CurrentProfileState { + peer_id: Vec, + cid: Vec, + }, +} + +impl GossipMessage { + pub(super) fn into_bytes(self) -> Result> { + serde_json::to_string(&self) + .map(String::into_bytes) + .map_err(anyhow::Error::from) + } +} -- cgit v1.2.3 From 3a4fabc292a9c6c1405e78312b24ea67a945ecc7 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 18:03:21 +0100 Subject: Refactor: Add helper fn for sending gossip reply object Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 884f42a..52bff51 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -46,6 +46,14 @@ impl GossipReactor { self.inner.receive_next_message().await } + fn send_gossip_reply(channel: ReplyChannel, reply: GossipReply) -> Result<()> { + if let Err(_) = channel.send(ReactorReply::Custom(reply)) { + anyhow::bail!("Failed to send GossipReply::NoHead)") + } + + Ok(()) + } + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result<()> { match self.inner.process_reactor_message(request).await? { None => Ok(()), @@ -63,17 +71,15 @@ impl GossipReactor { let head = profile.head(); if head.is_none() { - if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::NoHead)) { - anyhow::bail!("Failed to send GossipReply::NoHead)") - } + Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; + return Ok(()) } let head = head.unwrap().to_bytes(); let own_id = match profile.client().own_id().await { Ok(id) => id, - Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { - anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") - } else { + Err(e) => { + Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; return Ok(()) // TODO: abort operation here for now, maybe not the best idea } }; @@ -88,18 +94,8 @@ impl GossipReactor { .await; match publish_res { - Ok(()) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Ok(())))) { - anyhow::bail!("Failed to send GossipReply::PublishMeResult(Ok(()))") - } else { - Ok(()) - }, - - Err(e) => if let Err(_) = reply_channel.send(ReactorReply::Custom(GossipReply::PublishMeResult(Err(e)))) { - anyhow::bail!("Failed to send GossipReply::PublishMeResult(Err(_))") - } else { - Ok(()) - } - + Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), + Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), } }, } -- cgit v1.2.3 From 80caa9e67cdb64b79328acc74d0131789fc47c6d Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 18:04:55 +0100 Subject: Refactor: Move ReplyMe request handling into helper fn Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 66 ++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 52bff51..9ba1d05 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -65,39 +65,41 @@ impl GossipReactor { Ok(()) }, - Some((GossipRequest::PublishMe, reply_channel)) => { - let profile = self.inner.profile(); - let profile = profile.read().await; - - let head = profile.head(); - if head.is_none() { - Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; - return Ok(()) - } - let head = head.unwrap().to_bytes(); + Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await + } + } - let own_id = match profile.client().own_id().await { - Ok(id) => id, - Err(e) => { - Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; - return Ok(()) // TODO: abort operation here for now, maybe not the best idea - } - }; - - let publish_res = profile - .client() - .ipfs - .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { - peer_id: own_id.to_bytes(), - cid: head - }.into_bytes()?) - .await; - - match publish_res { - Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), - Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), - } - }, + async fn publish_me(&self, reply_channel: ReplyChannel) -> Result<()> { + let profile = self.inner.profile(); + let profile = profile.read().await; + + let head = profile.head(); + if head.is_none() { + Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; + return Ok(()) + } + let head = head.unwrap().to_bytes(); + + let own_id = match profile.client().own_id().await { + Ok(id) => id, + Err(e) => { + Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; + return Ok(()) // TODO: abort operation here for now, maybe not the best idea + } + }; + + let publish_res = profile + .client() + .ipfs + .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { + peer_id: own_id.to_bytes(), + cid: head + }.into_bytes()?) + .await; + + match publish_res { + Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), + Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), } } -- cgit v1.2.3 From 88450e0d8c2ae0046998180469345c0ce7f3a5aa Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 18:11:22 +0100 Subject: Refactor: Move handling of pubsub message to own fn Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 9ba1d05..7056b26 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -103,8 +103,24 @@ impl GossipReactor { } } - pub async fn run(mut self) -> Result<()> { + async fn handle_gossip_message(&self, msg: Arc) -> Result<()> { use std::convert::TryFrom; + + match serde_json::from_slice(&msg.data) { + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + log::trace!("Peer {:?} is at {:?}", peer_id, cid); + + // TODO start dispatched node chain fetching + } + } + + Ok(()) + } + + pub async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; let mut subscription_stream = self.inner.profile() @@ -126,14 +142,7 @@ impl GossipReactor { next_gossip_message = subscription_stream.next() => { if let Some(next_gossip_message) = next_gossip_message { - match serde_json::from_slice(&next_gossip_message.data) { - Err(e) => log::trace!("Failed to deserialize gossip message from {}", next_gossip_message.source), - Ok(GossipMessage::CurrentProfileState { peer_id, cid }) => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - log::trace!("Peer {:?} is at {:?}", peer_id, cid) - } - } + self.handle_gossip_message(next_gossip_message).await?; } else { break; } -- cgit v1.2.3 From 4ed7c5aa6279e74d21829003f0a901e5ff3945f7 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 18:35:02 +0100 Subject: Add simple reactor-stopping mechanism Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 5 +++++ lib/src/reactor/mod.rs | 6 +++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 7056b26..9ee19e5 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -123,6 +123,7 @@ impl GossipReactor { pub async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; + self.inner.set_running(true); let mut subscription_stream = self.inner.profile() .read() .await @@ -148,6 +149,10 @@ impl GossipReactor { } } } + + if !self.inner.running() { + break; + } } Ok(()) } diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index c35cecc..1f8159d 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -43,11 +43,13 @@ pub enum ReactorReply { /// /// The Reactor runs the whole application logic, that is syncing with other devices, fetching and /// keeping profile updates of other accounts, communication on the gossipsub topics... etc -#[derive(Debug)] +#[derive(Debug, getset::Getters, getset::Setters)] pub(super) struct Reactor where CustomReactorRequest: Debug + Send + Sync, CustomReactorReply: Debug + Send + Sync { + #[getset(get = "pub", set = "pub")] + running: bool, profile: Arc>, rx: ReactorReceiver, } @@ -59,6 +61,7 @@ impl Reactor>) -> (Self, ReactorSender) { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let reactor = Reactor { + running: true, profile, rx, }; @@ -105,6 +108,7 @@ impl Reactor { + self.running = false; if let Err(_) = reply_channel.send(ReactorReply::Exiting) { anyhow::bail!("Failed sending EXITING reply") } -- cgit v1.2.3 From 39e820aed41d62d1c709d02406b1a47729ccf34f Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 18:35:15 +0100 Subject: Add simple reactor test Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 56 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 9ee19e5..d84f985 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -158,3 +158,59 @@ impl GossipReactor { } } +#[cfg(test)] +mod tests { + use super::*; + + use std::convert::TryFrom; + use std::sync::Arc; + use tokio::sync::RwLock; + + use crate::config::Config; + + #[tokio::test] + async fn test_gossip_reactor_simple() { + let _ = env_logger::try_init(); + + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let gossip_topic_name = String::from("test-gossip-reactor-simple-topic"); + let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name); + + let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + tx.send((ReactorRequest::Ping, reply_sender)); + + let mut pong_received = false; + tokio::select! { + reply = reply_receiver.recv() => { + match reply { + Some(ReactorReply::Pong) => { + pong_received = true; + let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + tx.send((ReactorRequest::Exit, reply_sender)); + } + Some(r) => { + assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); + } + None => { + // nothing + } + } + }, + + reactor_res = reactor.run() => { + match reactor_res { + Ok(()) => assert!(false, "Reactor finished before pong was received"), + + Err(e) => { + assert!(false, "Reactor errored: {:?}", e); + } + } + } + } + + assert!(pong_received, "No pong received"); + } +} -- cgit v1.2.3 From 395598e9c1360bd2f3182d927727934676e1c669 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 11 Dec 2021 10:38:58 +0100 Subject: Add message types for requesting connection to other peer Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/ctrl.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs index 5e2e805..d65af9c 100644 --- a/lib/src/reactor/gossip/ctrl.rs +++ b/lib/src/reactor/gossip/ctrl.rs @@ -4,6 +4,7 @@ use anyhow::Result; pub enum GossipRequest { Ping, PublishMe, + Connect(ipfs::MultiaddrWithPeerId), } #[derive(Debug)] @@ -11,5 +12,6 @@ pub enum GossipReply { Pong, NoHead, PublishMeResult(Result<()>), + ConnectResult(Result<()>), } -- cgit v1.2.3 From 45cd97c4923b287715737ee6d3595e0c5177bf44 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 11 Dec 2021 10:39:13 +0100 Subject: Add impl for handling connect request Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index d84f985..809ccb0 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -65,7 +65,17 @@ impl GossipReactor { Ok(()) }, - Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await + Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await, + + Some((GossipRequest::Connect(addr), reply_channel)) => { + let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); + if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { + anyhow::bail!("Failed sending Connect({}) reply", addr) + } + + Ok(()) + }, + } } @@ -103,6 +113,10 @@ impl GossipReactor { } } + async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { + self.inner.profile().read().await.client().connect(addr).await + } + async fn handle_gossip_message(&self, msg: Arc) -> Result<()> { use std::convert::TryFrom; -- cgit v1.2.3 From a31b67035996a7fe050f7920e39e4ea016fb1842 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 11 Dec 2021 10:49:55 +0100 Subject: Add GossipReactor::is_connected_to() helper fn for checking whether a client is connected to another client Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 809ccb0..bf8b4eb 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -117,6 +117,21 @@ impl GossipReactor { self.inner.profile().read().await.client().connect(addr).await } + #[cfg(test)] + async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result { + self.inner + .profile() + .read() + .await + .client() + .ipfs + .peers() + .await + .map(|connections| { + connections.iter().any(|connection| connection.addr == addr) + }) + } + async fn handle_gossip_message(&self, msg: Arc) -> Result<()> { use std::convert::TryFrom; -- cgit v1.2.3 From 5afe7e6a6b71ad0851f08c56834f9f38412d0f22 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 11 Dec 2021 10:50:08 +0100 Subject: Add test for gossipping Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 62 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index bf8b4eb..58e8828 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -242,4 +242,66 @@ mod tests { assert!(pong_received, "No pong received"); } + + #[tokio::test] + async fn test_gossip_reactor_gossipping() { + let _ = env_logger::try_init(); + + let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic"); + let (left_profile, left_reactor, left_tx) = { + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let (reactor, tx) = GossipReactor::new(profile.clone(), gossip_topic_name.clone()); + (profile, reactor, tx) + }; + + let (right_profile, right_reactor, right_tx) = { + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await; + assert!(profile.is_ok()); + let