summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-29 20:50:53 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-29 20:50:53 +0100
commitb7f7ae681ff9c4e6ee1822f4749eceddc3d3972a (patch)
treec176faac85a2f17b90a00bcce204f2989a327983
parent6265a4a9372e651732f53cd6a129dd50e491245e (diff)
parent41154413d411048207cce3c22d68442d795e6a6f (diff)
Merge branch 'no-gossip-handler'
-rw-r--r--cli/src/profile.rs4
-rw-r--r--gui/src/app/message.rs2
-rw-r--r--gui/src/app/mod.rs7
-rw-r--r--gui/src/gossip.rs14
-rw-r--r--lib/src/gossip/handler.rs73
-rw-r--r--lib/src/gossip/mod.rs6
6 files changed, 19 insertions, 87 deletions
diff --git a/cli/src/profile.rs b/cli/src/profile.rs
index cf1a2f0..d751116 100644
--- a/cli/src/profile.rs
+++ b/cli/src/profile.rs
@@ -90,8 +90,8 @@ async fn profile_serve(matches: &ArgMatches) -> Result<()> {
.pubsub_subscribe("distrox".to_string())
.await
.map(|stream| {
- use distrox_lib::gossip::deserializer::GossipDeserializer;
- use distrox_lib::gossip::deserializer::LogStrategy;
+ use distrox_lib::gossip::GossipDeserializer;
+ use distrox_lib::gossip::LogStrategy;
GossipDeserializer::<LogStrategy>::new().run(stream)
})?
diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs
index f19a2dd..60d9433 100644
--- a/gui/src/app/message.rs
+++ b/gui/src/app/message.rs
@@ -18,6 +18,7 @@ pub enum Message {
ToggleLog,
+ GossipMessage(ipfs::PeerId, GossipMessage),
GossipSubscriptionFailed(String),
GossipHandled(GossipMessage),
@@ -47,6 +48,7 @@ impl Message {
Message::ToggleLog => "ToggleLog",
+ Message::GossipMessage(_, _) => "GossipMessage",
Message::GossipSubscriptionFailed(_) => "GossipSubscriptionFailed",
Message::GossipHandled(_) => "GossipHandled",
diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs
index a1428cd..ac9e43d 100644
--- a/gui/src/app/mod.rs
+++ b/gui/src/app/mod.rs
@@ -183,6 +183,13 @@ impl Application for Distrox {
iced::Command::none()
}
+ Message::GossipMessage(source, msg) => {
+ log::trace!("Received Gossip from {}: {:?}", source, msg);
+ iced::Command::perform(async {
+ Message::GossipHandled(msg)
+ }, |m: Message| -> Message { m })
+ }
+
Message::GossipHandled(msg) => {
use distrox_lib::gossip::GossipMessage;
diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs
index c88c29c..ae941f5 100644
--- a/gui/src/gossip.rs
+++ b/gui/src/gossip.rs
@@ -5,6 +5,8 @@ use tokio::sync::RwLock;
use distrox_lib::profile::Profile;
use distrox_lib::client::Client;
+use distrox_lib::gossip::GossipDeserializer;
+use distrox_lib::gossip::LogStrategy;
use crate::app::Message;
@@ -35,19 +37,13 @@ where
}
fn stream(self: Box<Self>, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> {
- use distrox_lib::gossip::deserializer;
- use distrox_lib::gossip::handler;
-
// TODO: Do "right", whatever this means...
let stream = Arc::try_unwrap(self.subscription).unwrap();
Box::pin({
- let stream = deserializer::GossipDeserializer::<deserializer::LogStrategy>::new().run(stream);
- let stream = handler::GossipHandler::<handler::LogStrategy>::new(self.profile.clone()).run(stream);
-
- stream.map(|(gossip_message, _handling_result)| {
- Message::GossipHandled(gossip_message)
- })
+ GossipDeserializer::<LogStrategy>::new()
+ .run(stream)
+ .map(|(source, msg)| Message::GossipMessage(source, msg))
})
}
}
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
deleted file mode 100644
index e524da8..0000000
--- a/lib/src/gossip/handler.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-//! Low-level module for gossip'ing code
-//!
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-//!
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use futures::Stream;
-use futures::StreamExt;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::gossip::GossipMessage;
-
-#[derive(Debug)]
-pub struct GossipHandler<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
-{
- profile: Arc<RwLock<Profile>>,
- strategy: std::marker::PhantomData<Strategy>,
-}
-
-impl<Strat> GossipHandler<Strat>
- where Strat: GossipHandlingStrategy + Sync + Send
-{
- pub fn new(profile: Arc<RwLock<Profile>>) -> Self {
- Self {
- profile,
- strategy: std::marker::PhantomData,
- }
- }
-
- pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)>
- where S: Stream<Item = (ipfs::PeerId, GossipMessage)>
- {
- input.then(move |(source, msg)| {
- let pr = self.profile.clone();
- async move {
- log::trace!("Received gossip message from {}: {:?}", source, msg);
- let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await;
- (msg, res)
- }
- })
- }
-}
-
-#[async_trait::async_trait]
-pub trait GossipHandlingStrategy: Sync + Send {
- async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
-}
-
-pub struct LogStrategy;
-
-#[async_trait::async_trait]
-impl GossipHandlingStrategy for LogStrategy {
- async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
- use std::convert::TryFrom;
- use std::ops::Deref;
-
- match msg {
- GossipMessage::CurrentProfileState { peer_id, cid } => {
- let peer_id = ipfs::PeerId::from_bytes(peer_id);
- let cid = cid::Cid::try_from(cid.deref());
-
- log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
- }
- }
-
- Ok(())
- }
-}
diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs
index 4f4d143..0c60ba8 100644
--- a/lib/src/gossip/mod.rs
+++ b/lib/src/gossip/mod.rs
@@ -1,6 +1,6 @@
+mod deserializer;
+pub use deserializer::*;
+
mod msg;
pub use msg::GossipMessage;
-pub mod deserializer;
-pub mod handler;
-