diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-29 20:50:53 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-29 20:50:53 +0100 |
commit | b7f7ae681ff9c4e6ee1822f4749eceddc3d3972a (patch) | |
tree | c176faac85a2f17b90a00bcce204f2989a327983 | |
parent | 6265a4a9372e651732f53cd6a129dd50e491245e (diff) | |
parent | 41154413d411048207cce3c22d68442d795e6a6f (diff) |
Merge branch 'no-gossip-handler'
-rw-r--r-- | cli/src/profile.rs | 4 | ||||
-rw-r--r-- | gui/src/app/message.rs | 2 | ||||
-rw-r--r-- | gui/src/app/mod.rs | 7 | ||||
-rw-r--r-- | gui/src/gossip.rs | 14 | ||||
-rw-r--r-- | lib/src/gossip/handler.rs | 73 | ||||
-rw-r--r-- | lib/src/gossip/mod.rs | 6 |
6 files changed, 19 insertions, 87 deletions
diff --git a/cli/src/profile.rs b/cli/src/profile.rs index cf1a2f0..d751116 100644 --- a/cli/src/profile.rs +++ b/cli/src/profile.rs @@ -90,8 +90,8 @@ async fn profile_serve(matches: &ArgMatches) -> Result<()> { .pubsub_subscribe("distrox".to_string()) .await .map(|stream| { - use distrox_lib::gossip::deserializer::GossipDeserializer; - use distrox_lib::gossip::deserializer::LogStrategy; + use distrox_lib::gossip::GossipDeserializer; + use distrox_lib::gossip::LogStrategy; GossipDeserializer::<LogStrategy>::new().run(stream) })? diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index f19a2dd..60d9433 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -18,6 +18,7 @@ pub enum Message { ToggleLog, + GossipMessage(ipfs::PeerId, GossipMessage), GossipSubscriptionFailed(String), GossipHandled(GossipMessage), @@ -47,6 +48,7 @@ impl Message { Message::ToggleLog => "ToggleLog", + Message::GossipMessage(_, _) => "GossipMessage", Message::GossipSubscriptionFailed(_) => "GossipSubscriptionFailed", Message::GossipHandled(_) => "GossipHandled", diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index a1428cd..ac9e43d 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -183,6 +183,13 @@ impl Application for Distrox { iced::Command::none() } + Message::GossipMessage(source, msg) => { + log::trace!("Received Gossip from {}: {:?}", source, msg); + iced::Command::perform(async { + Message::GossipHandled(msg) + }, |m: Message| -> Message { m }) + } + Message::GossipHandled(msg) => { use distrox_lib::gossip::GossipMessage; diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs index c88c29c..ae941f5 100644 --- a/gui/src/gossip.rs +++ b/gui/src/gossip.rs @@ -5,6 +5,8 @@ use tokio::sync::RwLock; use distrox_lib::profile::Profile; use distrox_lib::client::Client; +use distrox_lib::gossip::GossipDeserializer; +use distrox_lib::gossip::LogStrategy; use crate::app::Message; @@ -35,19 +37,13 @@ where } fn stream(self: Box<Self>, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> { - use distrox_lib::gossip::deserializer; - use distrox_lib::gossip::handler; - // TODO: Do "right", whatever this means... let stream = Arc::try_unwrap(self.subscription).unwrap(); Box::pin({ - let stream = deserializer::GossipDeserializer::<deserializer::LogStrategy>::new().run(stream); - let stream = handler::GossipHandler::<handler::LogStrategy>::new(self.profile.clone()).run(stream); - - stream.map(|(gossip_message, _handling_result)| { - Message::GossipHandled(gossip_message) - }) + GossipDeserializer::<LogStrategy>::new() + .run(stream) + .map(|(source, msg)| Message::GossipMessage(source, msg)) }) } } diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs deleted file mode 100644 index e524da8..0000000 --- a/lib/src/gossip/handler.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! Low-level module for gossip'ing code -//! -//! This module implements the low-level gossiping functionality that other modules use to -//! implement actual behaviours on -//! - -use std::sync::Arc; - -use anyhow::Result; -use futures::Stream; -use futures::StreamExt; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::gossip::GossipMessage; - -#[derive(Debug)] -pub struct GossipHandler<Strategy = LogStrategy> - where Strategy: GossipHandlingStrategy + Sync + Send -{ - profile: Arc<RwLock<Profile>>, - strategy: std::marker::PhantomData<Strategy>, -} - -impl<Strat> GossipHandler<Strat> - where Strat: GossipHandlingStrategy + Sync + Send -{ - pub fn new(profile: Arc<RwLock<Profile>>) -> Self { - Self { - profile, - strategy: std::marker::PhantomData, - } - } - - pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)> - where S: Stream<Item = (ipfs::PeerId, GossipMessage)> - { - input.then(move |(source, msg)| { - let pr = self.profile.clone(); - async move { - log::trace!("Received gossip message from {}: {:?}", source, msg); - let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await; - (msg, res) - } - }) - } -} - -#[async_trait::async_trait] -pub trait GossipHandlingStrategy: Sync + Send { - async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; -} - -pub struct LogStrategy; - -#[async_trait::async_trait] -impl GossipHandlingStrategy for LogStrategy { - async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { - use std::convert::TryFrom; - use std::ops::Deref; - - match msg { - GossipMessage::CurrentProfileState { peer_id, cid } => { - let peer_id = ipfs::PeerId::from_bytes(peer_id); - let cid = cid::Cid::try_from(cid.deref()); - - log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); - } - } - - Ok(()) - } -} diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs index 4f4d143..0c60ba8 100644 --- a/lib/src/gossip/mod.rs +++ b/lib/src/gossip/mod.rs @@ -1,6 +1,6 @@ +mod deserializer; +pub use deserializer::*; + mod msg; pub use msg::GossipMessage; -pub mod deserializer; -pub mod handler; - |