summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-18 17:32:56 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 17:32:56 +0100
commit0dab7cf79e7579ccdc8ba1b99f24476004a13de7 (patch)
tree9f51c34ea28f34cd88e6d4e88ee77d45f3ee1ad3
parent1c90ee16661006c8cbfd628a93608566a9bc6b76 (diff)
parent013a4203cc141e2ed4484b14b88cc0687c8ac318 (diff)
Merge branch 'multi-device-support'
-rw-r--r--lib/src/client.rs8
-rw-r--r--lib/src/lib.rs1
-rw-r--r--lib/src/profile/device.rs31
-rw-r--r--lib/src/profile/mod.rs6
-rw-r--r--lib/src/profile/state.rs30
-rw-r--r--lib/src/reactor/account.rs19
-rw-r--r--lib/src/reactor/ctrl.rs15
-rw-r--r--lib/src/reactor/device.rs19
-rw-r--r--lib/src/reactor/gossip/ctrl.rs19
-rw-r--r--lib/src/reactor/gossip/mod.rs331
-rw-r--r--lib/src/reactor/gossip/msg.rs17
-rw-r--r--lib/src/reactor/gossip/strategy.rs31
-rw-r--r--lib/src/reactor/mod.rs28
13 files changed, 553 insertions, 2 deletions
diff --git a/lib/src/client.rs b/lib/src/client.rs
index a6a51d4..d985769 100644
--- a/lib/src/client.rs
+++ b/lib/src/client.rs
@@ -29,6 +29,14 @@ impl Client {
Ok(())
}
+ pub async fn own_id(&self) -> Result<ipfs::PeerId> {
+ self.ipfs
+ .identity()
+ .await
+ .map(|id| id.0.into_peer_id())
+ .map_err(anyhow::Error::from)
+ }
+
pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
self.ipfs.connect(peer).await
}
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index 04fc959..b7b05e2 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -4,3 +4,4 @@ pub mod ipfs_client;
pub mod profile;
pub mod stream;
pub mod types;
+pub mod reactor;
diff --git a/lib/src/profile/device.rs b/lib/src/profile/device.rs
new file mode 100644
index 0000000..daeab21
--- /dev/null
+++ b/lib/src/profile/device.rs
@@ -0,0 +1,31 @@
+use std::convert::TryFrom;
+use std::convert::TryInto;
+use anyhow::Result;
+
+#[derive(Clone, Debug)]
+pub struct Device {
+ device_id: libp2p::identity::ed25519::PublicKey,
+}
+
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
+pub struct DeviceSaveable {
+ device_id: Vec<u8>,
+}
+
+impl TryFrom<Device> for DeviceSaveable {
+ type Error = anyhow::Error;
+
+ fn try_from(device: Device) -> Result<Self> {
+ Ok(DeviceSaveable { device_id: device.device_id.encode().to_vec() })
+ }
+}
+
+impl TryInto<Device> for DeviceSaveable {
+ type Error = anyhow::Error;
+
+ fn try_into(self) -> Result<Device> {
+ libp2p::identity::ed25519::PublicKey::decode(&self.device_id)
+ .map(|device_id| Device { device_id })
+ .map_err(anyhow::Error::from)
+ }
+}
diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs
index 39c4829..d1ab90f 100644
--- a/lib/src/profile/mod.rs
+++ b/lib/src/profile/mod.rs
@@ -7,6 +7,8 @@ use anyhow::Result;
use crate::client::Client;
use crate::ipfs_client::IpfsClient;
+mod device;
+use device::Device;
mod state;
use state::*;
@@ -163,6 +165,10 @@ impl Profile {
self.client.exit().await
}
+
+ pub fn add_device(&mut self, d: Device) -> Result<()> {
+ self.state.add_device(d)
+ }
}
diff --git a/lib/src/profile/state.rs b/lib/src/profile/state.rs
index 33f0bd6..f20135e 100644
--- a/lib/src/profile/state.rs
+++ b/lib/src/profile/state.rs
@@ -6,6 +6,9 @@ use anyhow::Context;
use anyhow::Result;
use tokio::io::AsyncWriteExt;
+use crate::profile::device::Device;
+use crate::profile::device::DeviceSaveable;
+
#[derive(Debug)]
pub struct StateDir(PathBuf);
@@ -39,6 +42,9 @@ pub struct ProfileState {
#[getset(get = "pub")]
keypair: libp2p::identity::Keypair,
+
+ #[getset(get = "pub")]
+ other_devices: Vec<Device>,
}
impl ProfileState {
@@ -46,7 +52,8 @@ impl ProfileState {
Self {
profile_head: None,
profile_name,
- keypair
+ keypair,
+ other_devices: vec![],
}
}
@@ -54,6 +61,11 @@ impl ProfileState {
self.profile_head = Some(cid);
Ok(()) // reserved for future use
}
+
+ pub(super) fn add_device(&mut self, d: Device) -> Result<()> {
+ self.other_devices.push(d);
+ Ok(()) // reserved for future use
+ }
}
impl std::fmt::Debug for ProfileState {
@@ -67,6 +79,7 @@ pub(super) struct ProfileStateSaveable {
profile_head: Option<Vec<u8>>,
profile_name: String,
keypair: Vec<u8>,
+ other_devices: Vec<DeviceSaveable>
}
impl ProfileStateSaveable {
@@ -77,7 +90,14 @@ impl ProfileStateSaveable {
keypair: match s.keypair {
libp2p::identity::Keypair::Ed25519(ref kp) => Vec::from(kp.encode()),
_ => anyhow::bail!("Only keypair type ed25519 supported"),
- }
+ },
+ other_devices: {
+ s.other_devices
+ .iter()
+ .cloned()
+ .map(DeviceSaveable::try_from)
+ .collect::<Result<Vec<_>>>()?
+ },
})
}
@@ -127,6 +147,12 @@ impl TryInto<ProfileState> for ProfileStateSaveable {
let kp = libp2p::identity::ed25519::Keypair::decode(&mut self.keypair)?;
libp2p::identity::Keypair::Ed25519(kp)
},
+ other_devices: {
+ self.other_devices
+ .into_iter()
+ .map(DeviceSaveable::try_into)
+ .collect::<Result<Vec<_>>>()?
+ },
})
}
}
diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs
new file mode 100644
index 0000000..59913b5
--- /dev/null
+++ b/lib/src/reactor/account.rs
@@ -0,0 +1,19 @@
+//! Module for account handling (following accounts, caching account updates) using the gossip
+//! module for the lower-level handling
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::GossipReactor;
+
+#[derive(Debug)]
+pub struct AccountReactor(GossipReactor);
+
+impl AccountReactor {
+ pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ unimplemented!()
+ }
+}
diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs
new file mode 100644
index 0000000..a32f1c3
--- /dev/null
+++ b/lib/src/reactor/ctrl.rs
@@ -0,0 +1,15 @@
+use tokio::sync::mpsc::UnboundedSender as Sender;
+use tokio::sync::mpsc::UnboundedReceiver as Receiver;
+
+/// Type for sending messages to a reactor
+pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
+
+/// Type that is used by a reactor for receiving messages
+pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
+
+/// Type that represents the channel that has to be send with a request to a reactor for getting an
+/// answer back
+pub type ReplySender<Reply> = Sender<Reply>;
+
+pub type ReplyReceiver<Reply> = Receiver<Reply>;
+
diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs
new file mode 100644
index 0000000..1014ca1
--- /dev/null
+++ b/lib/src/reactor/device.rs
@@ -0,0 +1,19 @@
+//! Module for multi-device support functionality,
+//! which uses the gossip module for the lower-level handling
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::GossipReactor;
+
+#[derive(Debug)]
+pub struct DeviceReactor(GossipReactor);
+
+impl DeviceReactor {
+ pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ unimplemented!()
+ }
+}
diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs
new file mode 100644
index 0000000..68fbf06
--- /dev/null
+++ b/lib/src/reactor/gossip/ctrl.rs
@@ -0,0 +1,19 @@
+use anyhow::Result;
+
+#[derive(Debug)]
+pub enum GossipRequest {
+ Exit,
+ Ping,
+ PublishMe,
+ Connect(ipfs::MultiaddrWithPeerId),
+}
+
+#[derive(Debug)]
+pub enum GossipReply {
+ Exiting,
+ Pong,
+ NoHead,
+ PublishMeResult(Result<()>),
+ ConnectResult(Result<()>),
+}
+
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
new file mode 100644
index 0000000..c8a7d28
--- /dev/null
+++ b/lib/src/reactor/gossip/mod.rs
@@ -0,0 +1,331 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::Reactor;
+use crate::reactor::ReactorBuilder;
+use crate::reactor::ctrl::ReactorReceiver;
+use crate::reactor::ctrl::ReactorSender;
+use crate::reactor::ctrl::ReplySender;
+
+mod ctrl;
+pub use ctrl::GossipRequest;
+pub use ctrl::GossipReply;
+
+mod msg;
+pub use msg::GossipMessage;
+
+mod strategy;
+pub use strategy::GossipHandlingStrategy;
+pub use strategy::LogStrategy;
+
+#[derive(Debug)]
+pub struct GossipReactorBuilder {
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+}
+
+impl GossipReactorBuilder {
+ pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
+ Self { profile, gossip_topic_name }
+ }
+}
+
+impl ReactorBuilder for GossipReactorBuilder {
+ type Reactor = GossipReactor;
+
+ fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
+ GossipReactor {
+ profile: self.profile,
+ gossip_topic_name: self.gossip_topic_name,
+ receiver: rr,
+ strategy: std::marker::PhantomData,
+ }
+ }
+}
+
+pub struct GossipReactor<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<RwLock<Profile>>,
+ gossip_topic_name: String,
+ receiver: ReactorReceiver<GossipRequest, GossipReply>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<S> std::fmt::Debug for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
+ }
+}
+
+impl<S> GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
+ if let Err(_) = channel.send(reply) {
+ anyhow::bail!("Failed to send GossipReply::NoHead)")
+ }
+
+ Ok(())
+ }
+
+ async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> {
+ let profile = self.profile.read().await;
+
+ let head = profile.head();
+ if head.is_none() {
+ Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?;
+ return Ok(())
+ }
+ let head = head.unwrap().to_bytes();
+
+ let own_id = match profile.client().own_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?;
+ return Ok(()) // TODO: abort operation here for now, maybe not the best idea
+ }
+ };
+
+ let publish_res = profile
+ .client()
+ .ipfs
+ .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState {
+ peer_id: own_id.to_bytes(),
+ cid: head
+ }.into_bytes()?)
+ .await;
+
+ match publish_res {
+ Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))),
+ Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))),
+ }
+ }
+
+ async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
+ log::trace!("Connecting GossipReactor with {:?}", addr);
+ self.profile.read().await.client().connect(addr).await
+ }
+
+ #[cfg(test)]
+ async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> {
+ self.profile
+ .read()
+ .await
+ .client()
+ .ipfs
+ .peers()
+ .await
+ .map(|connections| {
+ connections.iter().any(|connection| connection.addr == addr)
+ })
+ }
+
+}
+
+#[async_trait::async_trait]
+impl<S> Reactor for GossipReactor<S>
+ where S: GossipHandlingStrategy + Sync + Send
+{
+ type Request = GossipRequest;
+ type Reply = GossipReply;
+
+ async fn run(mut self) -> Result<()> {
+ use futures::stream::StreamExt;
+
+ log::trace!("Booting {:?}", self);
+ let mut subscription_stream = self.profile
+ .read()
+ .await
+ .client()
+ .ipfs
+ .pubsub_subscribe(self.gossip_topic_name.clone())
+ .await?;
+
+ log::trace!("{:?} main loop", self);
+ loop {
+ tokio::select! {
+ next_control_msg = self.receiver.recv() => {
+ log::trace!("Received control message: {:?}", next_control_msg);
+ match next_control_msg {
+ None => break,
+ Some((GossipRequest::Exit, reply_channel)) => {
+ if let Err(_) = reply_channel.send(GossipReply::Exiting) {
+ anyhow::bail!("Failed sending EXITING reply")
+ }
+ break
+ },
+
+ Some((GossipRequest::Ping, reply_channel)) => {
+ log::trace!("Replying with Pong");
+ if let Err(_) = reply_channel.send(GossipReply::Pong) {
+ anyhow::bail!("Failed sending PONG reply")
+ }
+ },
+
+ Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?,
+
+ Some((GossipRequest::Connect(addr), reply_channel)) => {
+ let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await);
+ if let Err(_) = Self::send_gossip_reply(reply_channel, reply) {
+ anyhow::bail!("Failed sending Connect({}) reply", addr)
+ }
+ },
+ }
+ }
+
+ next_gossip_message = subscription_stream.next() => {
+ if let Some(msg) = next_gossip_message {
+ log::trace!("Received gossip message");
+ match serde_json::from_slice(&msg.data) {
+ Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?,
+ Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source),
+ }
+ } else {
+ log::trace!("Gossip stream closed, breaking reactor loop");
+ break;
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::convert::TryFrom;
+ use std::sync::Arc;
+ use tokio::sync::RwLock;
+
+ use crate::config::Config;
+
+ #[tokio::test]
+ async fn test_gossip_reactor_simple() {
+ let _ = env_logger::try_init();
+
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let gossip_topic_name = String::from("test-gossip-reactor-simple-topic");
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
+
+ let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+ rx.send((GossipRequest::Ping, reply_sender)).unwrap();
+
+ let mut pong_received = false;
+ let _ = tokio::spawn(async move {
+ reactor.run().await
+ });
+
+ match reply_receiver.recv().await {
+ Some(GossipReply::Pong) => {
+ pong_received = true;
+ log::trace!("Pong received!");
+ let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+ rx.send((GossipRequest::Exit, reply_sender)).unwrap();
+ }
+ Some(r) => {
+ assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
+ }
+ None => {
+ // nothing
+ }
+ }
+
+ assert!(pong_received, "No pong received");
+ }
+
+ #[tokio::test]
+ async fn test_gossip_reactor_gossipping() {
+ let _ = env_logger::try_init();
+
+ let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
+ let (left_profile, left_reactor, left_rx) = {
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
+ };
+ log::trace!("Built left GossipReactor");
+
+ let (right_profile, right_reactor, right_rx) = {
+ let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await;
+ assert!(profile.is_ok());
+ let profile = Arc::new(RwLock::new(profile.unwrap()));
+
+ let (rx, tx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
+ (profile, reactor, rx)
+ };
+ log::trace!("Built right GossipReactor");
+
+ async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
+ profile.read()
+ .await
+ .client()
+ .ipfs
+ .identity()
+ .await
+ .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr))
+ .and_then(|(peerid, mut addr)| {
+ ipfs::MultiaddrWithPeerId::try_from({
+ addr.pop().expect("At least one address for client")
+ })
+ .map_err(anyhow::Error::from)
+ })
+ }
+
+ let left_running_reactor = tokio::spawn(async move {
+ left_reactor.run().await
+ });
+
+ let right_running_reactor = tokio::spawn(async move {
+ right_reactor.run().await
+ });
+
+ let left_peer_id = get_peer_id(left_profile.clone()).await;
+ log::trace!("Left GossipReactor = {:?}", left_peer_id);
+ assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
+ let left_peer_id = left_peer_id.unwrap();
+
+ let right_peer_id = get_peer_id(right_profile.clone()).await;
+ log::trace!("Right GossipReactor = {:?}", right_peer_id);
+ assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
+ let right_peer_id = right_peer_id.unwrap();
+
+ let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor");
+ right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
+ match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
+ Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
+ Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
+ log::trace!("Right GossipReactor is connected");
+ assert!(true)
+ },
+ Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
+ Ok(None) => assert!(false, "No reply from right reactor received"),
+ }
+ }
+}
diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/reactor/gossip/msg.rs
new file mode 100644
index 0000000..049fc68
--- /dev/null
+++ b/lib/src/reactor/gossip/msg.rs
@@ -0,0 +1,17 @@
+use anyhow::Result;
+
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
+pub enum GossipMessage {
+ CurrentProfileState {
+ peer_id: Vec<u8>,
+ cid: Vec<u8>,
+ },
+}
+
+impl GossipMessage {
+ pub(super) fn into_bytes(self) -> Result<Vec<u8>> {
+ serde_json::to_string(&self)
+ .map(String::into_bytes)
+ .map_err(anyhow::Error::from)
+ }
+}
diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs
new file mode 100644
index 0000000..b54dddc
--- /dev/null
+++ b/lib/src/reactor/gossip/strategy.rs
@@ -0,0 +1,31 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::reactor::gossip::msg::GossipMessage;
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(&peer_id);
+ let cid = cid::Cid::try_from(&*cid);
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
new file mode 100644
index 0000000..5299851
--- /dev/null
+++ b/lib/src/reactor/mod.rs
@@ -0,0 +1,28 @@
+use std::sync::Arc;
+use std::fmt::Debug;
+
+use anyhow::Result;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+
+mod gossip;
+mod device;
+mod account;
+mod ctrl;
+
+use ctrl::ReactorReceiver;
+
+#[async_trait::async_trait]
+pub trait Reactor {
+ type Request: Debug + Send + Sync;
+ type Reply: Debug + Send + Sync;
+
+ async fn run(self) -> Result<()>;
+}
+
+pub trait ReactorBuilder {
+ type Reactor: Reactor;
+
+ fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor;
+}