diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 17:32:56 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 17:32:56 +0100 |
commit | 0dab7cf79e7579ccdc8ba1b99f24476004a13de7 (patch) | |
tree | 9f51c34ea28f34cd88e6d4e88ee77d45f3ee1ad3 | |
parent | 1c90ee16661006c8cbfd628a93608566a9bc6b76 (diff) | |
parent | 013a4203cc141e2ed4484b14b88cc0687c8ac318 (diff) |
Merge branch 'multi-device-support'
-rw-r--r-- | lib/src/client.rs | 8 | ||||
-rw-r--r-- | lib/src/lib.rs | 1 | ||||
-rw-r--r-- | lib/src/profile/device.rs | 31 | ||||
-rw-r--r-- | lib/src/profile/mod.rs | 6 | ||||
-rw-r--r-- | lib/src/profile/state.rs | 30 | ||||
-rw-r--r-- | lib/src/reactor/account.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/ctrl.rs | 15 | ||||
-rw-r--r-- | lib/src/reactor/device.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/gossip/ctrl.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 331 | ||||
-rw-r--r-- | lib/src/reactor/gossip/msg.rs | 17 | ||||
-rw-r--r-- | lib/src/reactor/gossip/strategy.rs | 31 | ||||
-rw-r--r-- | lib/src/reactor/mod.rs | 28 |
13 files changed, 553 insertions, 2 deletions
diff --git a/lib/src/client.rs b/lib/src/client.rs index a6a51d4..d985769 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -29,6 +29,14 @@ impl Client { Ok(()) } + pub async fn own_id(&self) -> Result<ipfs::PeerId> { + self.ipfs + .identity() + .await + .map(|id| id.0.into_peer_id()) + .map_err(anyhow::Error::from) + } + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { self.ipfs.connect(peer).await } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 04fc959..b7b05e2 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -4,3 +4,4 @@ pub mod ipfs_client; pub mod profile; pub mod stream; pub mod types; +pub mod reactor; diff --git a/lib/src/profile/device.rs b/lib/src/profile/device.rs new file mode 100644 index 0000000..daeab21 --- /dev/null +++ b/lib/src/profile/device.rs @@ -0,0 +1,31 @@ +use std::convert::TryFrom; +use std::convert::TryInto; +use anyhow::Result; + +#[derive(Clone, Debug)] +pub struct Device { + device_id: libp2p::identity::ed25519::PublicKey, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct DeviceSaveable { + device_id: Vec<u8>, +} + +impl TryFrom<Device> for DeviceSaveable { + type Error = anyhow::Error; + + fn try_from(device: Device) -> Result<Self> { + Ok(DeviceSaveable { device_id: device.device_id.encode().to_vec() }) + } +} + +impl TryInto<Device> for DeviceSaveable { + type Error = anyhow::Error; + + fn try_into(self) -> Result<Device> { + libp2p::identity::ed25519::PublicKey::decode(&self.device_id) + .map(|device_id| Device { device_id }) + .map_err(anyhow::Error::from) + } +} diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs index 39c4829..d1ab90f 100644 --- a/lib/src/profile/mod.rs +++ b/lib/src/profile/mod.rs @@ -7,6 +7,8 @@ use anyhow::Result; use crate::client::Client; use crate::ipfs_client::IpfsClient; +mod device; +use device::Device; mod state; use state::*; @@ -163,6 +165,10 @@ impl Profile { self.client.exit().await } + + pub fn add_device(&mut self, d: Device) -> Result<()> { + self.state.add_device(d) + } } diff --git a/lib/src/profile/state.rs b/lib/src/profile/state.rs index 33f0bd6..f20135e 100644 --- a/lib/src/profile/state.rs +++ b/lib/src/profile/state.rs @@ -6,6 +6,9 @@ use anyhow::Context; use anyhow::Result; use tokio::io::AsyncWriteExt; +use crate::profile::device::Device; +use crate::profile::device::DeviceSaveable; + #[derive(Debug)] pub struct StateDir(PathBuf); @@ -39,6 +42,9 @@ pub struct ProfileState { #[getset(get = "pub")] keypair: libp2p::identity::Keypair, + + #[getset(get = "pub")] + other_devices: Vec<Device>, } impl ProfileState { @@ -46,7 +52,8 @@ impl ProfileState { Self { profile_head: None, profile_name, - keypair + keypair, + other_devices: vec![], } } @@ -54,6 +61,11 @@ impl ProfileState { self.profile_head = Some(cid); Ok(()) // reserved for future use } + + pub(super) fn add_device(&mut self, d: Device) -> Result<()> { + self.other_devices.push(d); + Ok(()) // reserved for future use + } } impl std::fmt::Debug for ProfileState { @@ -67,6 +79,7 @@ pub(super) struct ProfileStateSaveable { profile_head: Option<Vec<u8>>, profile_name: String, keypair: Vec<u8>, + other_devices: Vec<DeviceSaveable> } impl ProfileStateSaveable { @@ -77,7 +90,14 @@ impl ProfileStateSaveable { keypair: match s.keypair { libp2p::identity::Keypair::Ed25519(ref kp) => Vec::from(kp.encode()), _ => anyhow::bail!("Only keypair type ed25519 supported"), - } + }, + other_devices: { + s.other_devices + .iter() + .cloned() + .map(DeviceSaveable::try_from) + .collect::<Result<Vec<_>>>()? + }, }) } @@ -127,6 +147,12 @@ impl TryInto<ProfileState> for ProfileStateSaveable { let kp = libp2p::identity::ed25519::Keypair::decode(&mut self.keypair)?; libp2p::identity::Keypair::Ed25519(kp) }, + other_devices: { + self.other_devices + .into_iter() + .map(DeviceSaveable::try_into) + .collect::<Result<Vec<_>>>()? + }, }) } } diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs new file mode 100644 index 0000000..59913b5 --- /dev/null +++ b/lib/src/reactor/account.rs @@ -0,0 +1,19 @@ +//! Module for account handling (following accounts, caching account updates) using the gossip +//! module for the lower-level handling + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::GossipReactor; + +#[derive(Debug)] +pub struct AccountReactor(GossipReactor); + +impl AccountReactor { + pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { + unimplemented!() + } +} diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs new file mode 100644 index 0000000..a32f1c3 --- /dev/null +++ b/lib/src/reactor/ctrl.rs @@ -0,0 +1,15 @@ +use tokio::sync::mpsc::UnboundedSender as Sender; +use tokio::sync::mpsc::UnboundedReceiver as Receiver; + +/// Type for sending messages to a reactor +pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>; + +/// Type that is used by a reactor for receiving messages +pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>; + +/// Type that represents the channel that has to be send with a request to a reactor for getting an +/// answer back +pub type ReplySender<Reply> = Sender<Reply>; + +pub type ReplyReceiver<Reply> = Receiver<Reply>; + diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs new file mode 100644 index 0000000..1014ca1 --- /dev/null +++ b/lib/src/reactor/device.rs @@ -0,0 +1,19 @@ +//! Module for multi-device support functionality, +//! which uses the gossip module for the lower-level handling + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::GossipReactor; + +#[derive(Debug)] +pub struct DeviceReactor(GossipReactor); + +impl DeviceReactor { + pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { + unimplemented!() + } +} diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs new file mode 100644 index 0000000..68fbf06 --- /dev/null +++ b/lib/src/reactor/gossip/ctrl.rs @@ -0,0 +1,19 @@ +use anyhow::Result; + +#[derive(Debug)] +pub enum GossipRequest { + Exit, + Ping, + PublishMe, + Connect(ipfs::MultiaddrWithPeerId), +} + +#[derive(Debug)] +pub enum GossipReply { + Exiting, + Pong, + NoHead, + PublishMeResult(Result<()>), + ConnectResult(Result<()>), +} + diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs new file mode 100644 index 0000000..c8a7d28 --- /dev/null +++ b/lib/src/reactor/gossip/mod.rs @@ -0,0 +1,331 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::Reactor; +use crate::reactor::ReactorBuilder; +use crate::reactor::ctrl::ReactorReceiver; +use crate::reactor::ctrl::ReactorSender; +use crate::reactor::ctrl::ReplySender; + +mod ctrl; +pub use ctrl::GossipRequest; +pub use ctrl::GossipReply; + +mod msg; +pub use msg::GossipMessage; + +mod strategy; +pub use strategy::GossipHandlingStrategy; +pub use strategy::LogStrategy; + +#[derive(Debug)] +pub struct GossipReactorBuilder { + profile: Arc<RwLock<Profile>>, + gossip_topic_name: String, +} + +impl GossipReactorBuilder { + pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self { + Self { profile, gossip_topic_name } + } +} + +impl ReactorBuilder for GossipReactorBuilder { + type Reactor = GossipReactor; + + fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor { + GossipReactor { + profile: self.profile, + gossip_topic_name: self.gossip_topic_name, + receiver: rr, + strategy: std::marker::PhantomData, + } + } +} + +pub struct GossipReactor<Strategy = LogStrategy> + where Strategy: GossipHandlingStrategy + Sync + Send +{ + profile: Arc<RwLock<Profile>>, + gossip_topic_name: String, + receiver: ReactorReceiver<GossipRequest, GossipReply>, + strategy: std::marker::PhantomData<Strategy>, +} + +impl<S> std::fmt::Debug for GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) + } +} + +impl<S> GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ + fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> { + if let Err(_) = channel.send(reply) { + anyhow::bail!("Failed to send GossipReply::NoHead)") + } + + Ok(()) + } + + async fn publish_me(&self, reply_channel: ReplySender<GossipReply>) -> Result<()> { + let profile = self.profile.read().await; + + let head = profile.head(); + if head.is_none() { + Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; + return Ok(()) + } + let head = head.unwrap().to_bytes(); + + let own_id = match profile.client().own_id().await { + Ok(id) => id, + Err(e) => { + Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; + return Ok(()) // TODO: abort operation here for now, maybe not the best idea + } + }; + + let publish_res = profile + .client() + .ipfs + .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { + peer_id: own_id.to_bytes(), + cid: head + }.into_bytes()?) + .await; + + match publish_res { + Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), + Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), + } + } + + async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { + log::trace!("Connecting GossipReactor with {:?}", addr); + self.profile.read().await.client().connect(addr).await + } + + #[cfg(test)] + async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<bool> { + self.profile + .read() + .await + .client() + .ipfs + .peers() + .await + .map(|connections| { + connections.iter().any(|connection| connection.addr == addr) + }) + } + +} + +#[async_trait::async_trait] +impl<S> Reactor for GossipReactor<S> + where S: GossipHandlingStrategy + Sync + Send +{ + type Request = GossipRequest; + type Reply = GossipReply; + + async fn run(mut self) -> Result<()> { + use futures::stream::StreamExt; + + log::trace!("Booting {:?}", self); + let mut subscription_stream = self.profile + .read() + .await + .client() + .ipfs + .pubsub_subscribe(self.gossip_topic_name.clone()) + .await?; + + log::trace!("{:?} main loop", self); + loop { + tokio::select! { + next_control_msg = self.receiver.recv() => { + log::trace!("Received control message: {:?}", next_control_msg); + match next_control_msg { + None => break, + Some((GossipRequest::Exit, reply_channel)) => { + if let Err(_) = reply_channel.send(GossipReply::Exiting) { + anyhow::bail!("Failed sending EXITING reply") + } + break + }, + + Some((GossipRequest::Ping, reply_channel)) => { + log::trace!("Replying with Pong"); + if let Err(_) = reply_channel.send(GossipReply::Pong) { + anyhow::bail!("Failed sending PONG reply") + } + }, + + Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?, + + Some((GossipRequest::Connect(addr), reply_channel)) => { + let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); + if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { + anyhow::bail!("Failed sending Connect({}) reply", addr) + } + }, + } + } + + next_gossip_message = subscription_stream.next() => { + if let Some(msg) = next_gossip_message { + log::trace!("Received gossip message"); + match serde_json::from_slice(&msg.data) { + Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?, + Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), + } + } else { + log::trace!("Gossip stream closed, breaking reactor loop"); + break; + } + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::convert::TryFrom; + use std::sync::Arc; + use tokio::sync::RwLock; + + use crate::config::Config; + + #[tokio::test] + async fn test_gossip_reactor_simple() { + let _ = env_logger::try_init(); + + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let gossip_topic_name = String::from("test-gossip-reactor-simple-topic"); + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx); + + let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + rx.send((GossipRequest::Ping, reply_sender)).unwrap(); + + let mut pong_received = false; + let _ = tokio::spawn(async move { + reactor.run().await + }); + + match reply_receiver.recv().await { + Some(GossipReply::Pong) => { + pong_received = true; + log::trace!("Pong received!"); + let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + rx.send((GossipRequest::Exit, reply_sender)).unwrap(); + } + Some(r) => { + assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); + } + None => { + // nothing + } + } + + assert!(pong_received, "No pong received"); + } + + #[tokio::test] + async fn test_gossip_reactor_gossipping() { + let _ = env_logger::try_init(); + + let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic"); + let (left_profile, left_reactor, left_rx) = { + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); + (profile, reactor, rx) + }; + log::trace!("Built left GossipReactor"); + + let (right_profile, right_reactor, right_rx) = { + let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await; + assert!(profile.is_ok()); + let profile = Arc::new(RwLock::new(profile.unwrap())); + + let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); + (profile, reactor, rx) + }; + log::trace!("Built right GossipReactor"); + + async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> { + profile.read() + .await + .client() + .ipfs + .identity() + .await + .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr)) + .and_then(|(peerid, mut addr)| { + ipfs::MultiaddrWithPeerId::try_from({ + addr.pop().expect("At least one address for client") + }) + .map_err(anyhow::Error::from) + }) + } + + let left_running_reactor = tokio::spawn(async move { + left_reactor.run().await + }); + + let right_running_reactor = tokio::spawn(async move { + right_reactor.run().await + }); + + let left_peer_id = get_peer_id(left_profile.clone()).await; + log::trace!("Left GossipReactor = {:?}", left_peer_id); + assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id); + let left_peer_id = left_peer_id.unwrap(); + + let right_peer_id = get_peer_id(right_profile.clone()).await; + log::trace!("Right GossipReactor = {:?}", right_peer_id); + assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id); + let right_peer_id = right_peer_id.unwrap(); + + let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + + log::trace!("Right GossipReactor should now connect to left GossipReactor"); + right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap(); + + log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply"); + match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await { + Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"), + Ok(Some(GossipReply::ConnectResult(Ok(())))) => { + log::trace!("Right GossipReactor is connected"); + assert!(true) + }, + Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), + Ok(None) => assert!(false, "No reply from right reactor received"), + } + } +} diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/reactor/gossip/msg.rs new file mode 100644 index 0000000..049fc68 --- /dev/null +++ b/lib/src/reactor/gossip/msg.rs @@ -0,0 +1,17 @@ +use anyhow::Result; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum GossipMessage { + CurrentProfileState { + peer_id: Vec<u8>, + cid: Vec<u8>, + }, +} + +impl GossipMessage { + pub(super) fn into_bytes(self) -> Result<Vec<u8>> { + serde_json::to_string(&self) + .map(String::into_bytes) + .map_err(anyhow::Error::from) + } +} diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs new file mode 100644 index 0000000..b54dddc --- /dev/null +++ b/lib/src/reactor/gossip/strategy.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::reactor::gossip::msg::GossipMessage; + +#[async_trait::async_trait] +pub trait GossipHandlingStrategy: Sync + Send { + async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>; +} + +pub struct LogStrategy; + +#[async_trait::async_trait] +impl GossipHandlingStrategy for LogStrategy { + async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> { + use std::convert::TryFrom; + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + let peer_id = ipfs::PeerId::from_bytes(&peer_id); + let cid = cid::Cid::try_from(&*cid); + + log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); + } + } + + Ok(()) + } +} diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs new file mode 100644 index 0000000..5299851 --- /dev/null +++ b/lib/src/reactor/mod.rs @@ -0,0 +1,28 @@ +use std::sync::Arc; +use std::fmt::Debug; + +use anyhow::Result; +use tokio::sync::RwLock; + +use crate::profile::Profile; + +mod gossip; +mod device; +mod account; +mod ctrl; + +use ctrl::ReactorReceiver; + +#[async_trait::async_trait] +pub trait Reactor { + type Request: Debug + Send + Sync; + type Reply: Debug + Send + Sync; + + async fn run(self) -> Result<()>; +} + +pub trait ReactorBuilder { + type Reactor: Reactor; + + fn build_with_receiver(self, rr: ReactorReceiver<<Self::Reactor as Reactor>::Request, <Self::Reactor as Reactor>::Reply>) -> Self::Reactor; +} |