From 6c03b953ee5172c52ec001cf24e8143f034a1828 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 18 Dec 2021 21:09:29 +0100 Subject: Adjust Loading view Signed-off-by: Matthias Beyer --- gui/src/app/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index 8fefdbc..3cf45f9 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -138,13 +138,14 @@ impl Application for Distrox { let text = iced::Text::new("Loading"); let content = Column::new() - .max_width(800) .spacing(20) .push(text); Container::new(content) .width(Length::Fill) + .height(Length::Fill) .center_x() + .center_y() .into() } -- cgit v1.2.3 From 7675eba9244475c3847fd48eb13b5d8b54cf271c Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 18 Dec 2021 21:33:01 +0100 Subject: Add way to show application log Signed-off-by: Matthias Beyer --- gui/src/app/message.rs | 2 + gui/src/app/mod.rs | 107 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 88 insertions(+), 21 deletions(-) diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index f530774..68be044 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -10,6 +10,8 @@ pub enum Message { Loaded(Arc), FailedToLoad(String), + ToggleLog, + InputChanged(String), CreatePost, diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index 3cf45f9..dfd5677 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -5,6 +5,7 @@ use iced::Application; use iced::Column; use iced::Container; use iced::Length; +use iced::Row; use iced::Scrollable; use iced::TextInput; use iced::scrollable; @@ -27,6 +28,8 @@ enum Distrox { input: text_input::State, input_value: String, timeline: Timeline, + + log_visible: bool, }, FailedToStart, } @@ -65,6 +68,7 @@ impl Application for Distrox { input: text_input::State::default(), input_value: String::default(), timeline: Timeline::new(), + log_visible: false }; } @@ -78,7 +82,7 @@ impl Application for Distrox { } } - Distrox::Loaded { profile, ref mut input_value, timeline, .. } => { + Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, .. } => { match message { Message::InputChanged(input) => { *input_value = input; @@ -121,6 +125,11 @@ impl Application for Distrox { log::trace!("Timeline scrolled: {}", f); } + Message::ToggleLog => { + log::trace!("Log toggled"); + *log_visible = !*log_visible; + } + _ => {} } } @@ -149,24 +158,58 @@ impl Application for Distrox { .into() } - Distrox::Loaded { input, input_value, timeline, scroll, .. } => { - let input = TextInput::new( - input, - "What do you want to tell the world?", - input_value, - Message::InputChanged, - ) - .padding(15) - .size(12) - .on_submit(Message::CreatePost); - - let timeline = timeline.view(); - - Scrollable::new(scroll) - .padding(40) - .push(input) - .push(timeline) - .into() + Distrox::Loaded { input, input_value, timeline, scroll, log_visible, .. } => { + let left_column = Column::new() + .into(); + + let mid_column = Column::new() + .push({ + let input = TextInput::new( + input, + "What do you want to tell the world?", + input_value, + Message::InputChanged, + ) + .padding(15) + .size(12) + .on_submit(Message::CreatePost); + + let timeline = timeline.view(); + + Scrollable::new(scroll) + .padding(40) + .push(input) + .push(timeline) + }) + .into(); + + let right_column = Column::new() + .into(); + + let content = Row::with_children(vec![ + left_column, + mid_column, + right_column + ]) + .spacing(20) + .height(Length::Fill) + .width(Length::Fill); + + let content = Column::new() + .height(Length::Fill) + .width(Length::Fill) + .push(content); + + if *log_visible { + let log = Column::new() + .push({ + iced::Text::new("Here goes some log,... not yet implemented!") + .size(8) + }); + content.push(log) + } else { + content + }.into() } Distrox::FailedToStart => { @@ -176,7 +219,7 @@ impl Application for Distrox { } fn subscription(&self) -> iced::Subscription { - match self { + let post_loading_subs = match self { Distrox::Loaded { profile, .. } => { let head = profile.head(); @@ -190,7 +233,29 @@ impl Application for Distrox { } } _ => iced::Subscription::none(), - } + }; + + let keyboard_subs = { + use iced_native::event::Event; + + iced_native::subscription::events_with(|event, _| { + match event { + Event::Keyboard(iced_native::keyboard::Event::KeyPressed { key_code, .. }) => { + if key_code == iced_native::keyboard::KeyCode::F11 { + Some(Message::ToggleLog) + } else { + None + } + }, + _ => None, + } + }) + }; + + iced::Subscription::batch(vec![ + post_loading_subs, + keyboard_subs + ]) } } -- cgit v1.2.3 From 542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 18 Dec 2021 22:04:31 +0100 Subject: Shrink idea of "Reactors" A Reactor can be waaay less complex if we simply use it as "map"-helper for mapping over `Stream`s. If we map over a stream of Vec and deserialize them to GossipMessages in one step, and handle them appropriately in the next step, it is way less complex to implement these things and we do not have to care about this whole "how do I shut down the thing" because we can simply drop() everything and let the destructors do their job. This patch removes the Reactor nonsense. Signed-off-by: Matthias Beyer --- lib/src/gossip/deserializer.rs | 55 +++++++ lib/src/gossip/handler.rs | 73 ++++++++ lib/src/gossip/mod.rs | 9 + lib/src/gossip/msg.rs | 17 ++ lib/src/lib.rs | 2 +- lib/src/reactor/account.rs | 19 --- lib/src/reactor/ctrl.rs | 15 -- lib/src/reactor/device.rs | 19 --- lib/src/reactor/gossip/ctrl.rs | 19 --- lib/src/reactor/gossip/mod.rs | 329 ------------------------------------- lib/src/reactor/gossip/msg.rs | 17 -- lib/src/reactor/gossip/strategy.rs | 31 ---- lib/src/reactor/mod.rs | 28 ---- 13 files changed, 155 insertions(+), 478 deletions(-) create mode 100644 lib/src/gossip/deserializer.rs create mode 100644 lib/src/gossip/handler.rs create mode 100644 lib/src/gossip/mod.rs create mode 100644 lib/src/gossip/msg.rs delete mode 100644 lib/src/reactor/account.rs delete mode 100644 lib/src/reactor/ctrl.rs delete mode 100644 lib/src/reactor/device.rs delete mode 100644 lib/src/reactor/gossip/ctrl.rs delete mode 100644 lib/src/reactor/gossip/mod.rs delete mode 100644 lib/src/reactor/gossip/msg.rs delete mode 100644 lib/src/reactor/gossip/strategy.rs delete mode 100644 lib/src/reactor/mod.rs diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs new file mode 100644 index 0000000..a50644f --- /dev/null +++ b/lib/src/gossip/deserializer.rs @@ -0,0 +1,55 @@ +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; + +use crate::gossip::GossipMessage; + +pub struct GossipDeserializer + where ErrStrategy: GossipDeserializerErrorStrategy +{ + strategy: std::marker::PhantomData, +} + +impl GossipDeserializer + where ErrStrategy: GossipDeserializerErrorStrategy +{ + pub fn new() -> Self { + Self { + strategy: std::marker::PhantomData, + } + } + + pub fn run(mut self, input: S) -> impl Stream + where S: Stream + { + input.filter_map(|message| async move { + log::trace!("Received gossip message"); + + match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { + Ok(m) => Some(m), + Err(e) => { + ErrStrategy::handle_error(e); + None + } + } + }) + } +} + +pub trait GossipDeserializerErrorStrategy { + fn handle_error(err: anyhow::Error); +} + +pub struct LogStrategy; +impl GossipDeserializerErrorStrategy for LogStrategy { + fn handle_error(err: anyhow::Error) { + log::trace!("Error: {}", err); + } +} + +pub struct IgnoreStrategy; +impl GossipDeserializerErrorStrategy for IgnoreStrategy { + fn handle_error(_: anyhow::Error) { + () + } +} diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs new file mode 100644 index 0000000..e524da8 --- /dev/null +++ b/lib/src/gossip/handler.rs @@ -0,0 +1,73 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::gossip::GossipMessage; + +#[derive(Debug)] +pub struct GossipHandler + where Strategy: GossipHandlingStrategy + Sync + Send +{ + profile: Arc>, + strategy: std::marker::PhantomData, +} + +impl GossipHandler + where Strat: GossipHandlingStrategy + Sync + Send +{ + pub fn new(profile: Arc>) -> Self { + Self { + profile, + strategy: std::marker::PhantomData, + } + } + + pub fn run(self, input: S) -> impl Stream)> + where S: Stream + { + input.then(move |(source, msg)| { + let pr = self.profile.clone(); + async move { + log::trace!("Received gossip message from {}: {:?}", source, msg); + let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await; + (msg, res) + } + }) + } +} + +#[async_trait::async_trait] +pub trait GossipHandlingStrategy: Sync + Send { + async fn handle_gossip_message(profile: Arc>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; +} + +pub struct LogStrategy; + +#[async_trait::async_trait] +impl GossipHandlingStrategy for LogStrategy { + async fn handle_gossip_message(_profile: Arc>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { + use std::convert::TryFrom; + use std::ops::Deref; + + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + let peer_id = ipfs::PeerId::from_bytes(peer_id); + let cid = cid::Cid::try_from(cid.deref()); + + log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); + } + } + + Ok(()) + } +} diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs new file mode 100644 index 0000000..d6a6963 --- /dev/null +++ b/lib/src/gossip/mod.rs @@ -0,0 +1,9 @@ +mod msg; +pub use msg::GossipMessage; + +mod handler; +pub use handler::*; + +mod deserializer; +pub use deserializer::*; + diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs new file mode 100644 index 0000000..049fc68 --- /dev/null +++ b/lib/src/gossip/msg.rs @@ -0,0 +1,17 @@ +use anyhow::Result; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum GossipMessage { + CurrentProfileState { + peer_id: Vec, + cid: Vec, + }, +} + +impl GossipMessage { + pub(super) fn into_bytes(self) -> Result> { + serde_json::to_string(&self) + .map(String::into_bytes) + .map_err(anyhow::Error::from) + } +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b7b05e2..e836c37 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -4,4 +4,4 @@ pub mod ipfs_client; pub mod profile; pub mod stream; pub mod types; -pub mod reactor; +pub mod gossip; diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs deleted file mode 100644 index 59913b5..0000000 --- a/lib/src/reactor/account.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Module for account handling (following accounts, caching account updates) using the gossip -//! module for the lower-level handling - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::GossipReactor; - -#[derive(Debug)] -pub struct AccountReactor(GossipReactor); - -impl AccountReactor { - pub(super) fn new(profile: Arc>) -> Self { - unimplemented!() - } -} diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs deleted file mode 100644 index a32f1c3..0000000 --- a/lib/src/reactor/ctrl.rs +++ /dev/null @@ -1,15 +0,0 @@ -use tokio::sync::mpsc::UnboundedSender as Sender; -use tokio::sync::mpsc::UnboundedReceiver as Receiver; - -/// Type for sending messages to a reactor -pub type ReactorSender = Sender<(Request, ReplySender)>; - -/// Type that is used by a reactor for receiving messages -pub type ReactorReceiver = Receiver<(Request, ReplySender)>; - -/// Type that represents the channel that has to be send with a request to a reactor for getting an -/// answer back -pub type ReplySender = Sender; - -pub type ReplyReceiver = Receiver; - diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs deleted file mode 100644 index 1014ca1..0000000 --- a/lib/src/reactor/device.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Module for multi-device support functionality, -//! which uses the gossip module for the lower-level handling - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::GossipReactor; - -#[derive(Debug)] -pub struct DeviceReactor(GossipReactor); - -impl DeviceReactor { - pub(super) fn new(profile: Arc>) -> Self { - unimplemented!() - } -} diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs deleted file mode 100644 index 68fbf06..0000000 --- a/lib/src/reactor/gossip/ctrl.rs +++ /dev/null @@ -1,19 +0,0 @@ -use anyhow::Result; - -#[derive(Debug)] -pub enum GossipRequest { - Exit, - Ping, - PublishMe, - Connect(ipfs::MultiaddrWithPeerId), -} - -#[derive(Debug)] -pub enum GossipReply { - Exiting, - Pong, - NoHead, - PublishMeResult(Result<()>), - ConnectResult(Result<()>), -} - diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs deleted file mode 100644 index 7658509..0000000 --- a/lib/src/reactor/gossip/mod.rs +++ /dev/null @@ -1,329 +0,0 @@ -//! Low-level module for gossip'ing code -//! -//! This module implements the low-level gossiping functionality that other modules use to -//! implement actual behaviours on -//! - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::Reactor; -use crate::reactor::ReactorBuilder; -use crate::reactor::ctrl::ReactorReceiver; -use crate::reactor::ctrl::ReactorSender; -use crate::reactor::ctrl::ReplySender; - -mod ctrl; -pub use ctrl::GossipRequest; -pub use ctrl::GossipReply; - -mod msg; -pub use msg::GossipMessage; - -mod strategy; -pub use strategy::GossipHandlingStrategy; -pub use strategy::LogStrategy; - -#[derive(Debug)] -pub struct GossipReactorBuilder { - profile: Arc>, - gossip_topic_name: String, -} - -impl GossipReactorBuilder { - pub fn new(profile: Arc>, gossip_topic_name: String) -> Self { - Self { profile, gossip_topic_name } - } -} - -impl ReactorBuilder for GossipReactorBuilder { - type Reactor = GossipReactor; - - fn build_with_receiver(self, rr: ReactorReceiver) -> Self::Reactor { - GossipReactor { - profile: self.profile, - gossip_topic_name: self.gossip_topic_name, - receiver: rr, - strategy: std::marker::PhantomData, - } - } -} - -pub struct GossipReactor - where Strategy: GossipHandlingStrategy + Sync + Send -{ - profile: Arc>, - gossip_topic_name: String, - receiver: ReactorReceiver, - strategy: std::marker::PhantomData, -} - -impl std::fmt::Debug for GossipReactor - where S: GossipHandlingStrategy + Sync + Send -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) - } -} - -impl GossipReactor - where S: GossipHandlingStrategy + Sync + Send -{ - fn send_gossip_reply(channel: ReplySender, reply: GossipReply) -> Result<()> { - if let Err(_) = channel.send(reply) { - anyhow::bail!("Failed to send GossipReply::NoHead)") - } - - Ok(()) - } - - async fn publish_me(&self, reply_channel: ReplySender) -> Result<()> { - let profile = self.profile.read().await; - - let head = profile.head(); - if head.is_none() { - Self::send_gossip_reply(reply_channel, GossipReply::NoHead)?; - return Ok(()) - } - let head = head.unwrap().to_bytes(); - - let own_id = match profile.client().own_id().await { - Ok(id) => id, - Err(e) => { - Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e)))?; - return Ok(()) // TODO: abort operation here for now, maybe not the best idea - } - }; - - let publish_res = profile - .client() - .ipfs - .pubsub_publish(self.gossip_topic_name.clone(), GossipMessage::CurrentProfileState { - peer_id: own_id.to_bytes(), - cid: head - }.into_bytes()?) - .await; - - match publish_res { - Ok(()) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Ok(()))), - Err(e) => Self::send_gossip_reply(reply_channel, GossipReply::PublishMeResult(Err(e))), - } - } - - async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { - log::trace!("Connecting GossipReactor with {:?}", addr); - self.profile.read().await.client().connect(addr).await - } - - #[cfg(test)] - async fn is_connected_to(&self, addr: ipfs::MultiaddrWithPeerId) -> Result { - self.profile - .read() - .await - .client() - .ipfs - .peers() - .await - .map(|connections| { - connections.iter().any(|connection| connection.addr == addr) - }) - } - -} - -#[async_trait::async_trait] -impl Reactor for GossipReactor - where S: GossipHandlingStrategy + Sync + Send -{ - type Request = GossipRequest; - type Reply = GossipReply; - - async fn run(mut self) -> Result<()> { - use futures::stream::StreamExt; - - log::trace!("Booting {:?}", self); - let mut subscription_stream = self.profile - .read() - .await - .client() - .ipfs - .pubsub_subscribe(self.gossip_topic_name.clone()) - .await?; - - log::trace!("{:?} main loop", self); - loop { - tokio::select! { - next_control_msg = self.receiver.recv() => { - log::trace!("Received control message: {:?}", next_control_msg); - match next_control_msg { - None => break, - Some((GossipRequest::Exit, reply_channel)) => { - if let Err(_) = reply_channel.send(GossipReply::Exiting) { - anyhow::bail!("Failed sending EXITING reply") - } - break - }, - - Some((GossipRequest::Ping, reply_channel)) => { - log::trace!("Replying with Pong"); - if let Err(_) = reply_channel.send(GossipReply::Pong) { - anyhow::bail!("Failed sending PONG reply") - } - }, - - Some((GossipRequest::PublishMe, reply_channel)) => self.publish_me(reply_channel).await?, - - Some((GossipRequest::Connect(addr), reply_channel)) => { - let reply = GossipReply::ConnectResult(self.connect(addr.clone()).await); - if let Err(_) = Self::send_gossip_reply(reply_channel, reply) { - anyhow::bail!("Failed sending Connect({}) reply", addr) - } - }, - } - } - - next_gossip_message = subscription_stream.next() => { - if let Some(msg) = next_gossip_message { - log::trace!("Received gossip message"); - match serde_json::from_slice(&msg.data) { - Ok(m) => S::handle_gossip_message(self.profile.clone(), msg.source, m).await?, - Err(e) => log::trace!("Failed to deserialize gossip message from {}", msg.source), - } - } else { - log::trace!("Gossip stream closed, breaking reactor loop"); - break; - } - } - } - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::convert::TryFrom; - use std::sync::Arc; - use tokio::sync::RwLock; - - #[tokio::test] - async fn test_gossip_reactor_simple() { - let _ = env_logger::try_init(); - - let profile = Profile::new_inmemory("test-gossip-reactor-simple").await; - assert!(profile.is_ok()); - let profile = Arc::new(RwLock::new(profile.unwrap())); - - let gossip_topic_name = String::from("test-gossip-reactor-simple-topic"); - let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx); - - let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - rx.send((GossipRequest::Ping, reply_sender)).unwrap(); - - let mut pong_received = false; - let _ = tokio::spawn(async move { - reactor.run().await - }); - - match reply_receiver.recv().await { - Some(GossipReply::Pong) => { - pong_received = true; - log::trace!("Pong received!"); - let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - rx.send((GossipRequest::Exit, reply_sender)).unwrap(); - } - Some(r) => { - assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); - } - None => { - // nothing - } - } - - assert!(pong_received, "No pong received"); - } - - #[tokio::test] - async fn test_gossip_reactor_gossipping() { - let _ = env_logger::try_init(); - - let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic"); - let (left_profile, left_reactor, left_rx) = { - let profile = Profile::new_inmemory("test-gossip-reactor-simple-left").await; - assert!(profile.is_ok()); - let profile = Arc::new(RwLock::new(profile.unwrap())); - - let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); - (profile, reactor, rx) - }; - log::trace!("Built left GossipReactor"); - - let (right_profile, right_reactor, right_rx) = { - let profile = Profile::new_inmemory("test-gossip-reactor-simple-right").await; - assert!(profile.is_ok()); - let profile = Arc::new(RwLock::new(profile.unwrap())); - - let (rx, tx) = tokio::sync::mpsc::unbounded_channel(); - let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); - (profile, reactor, rx) - }; - log::trace!("Built right GossipReactor"); - - async fn get_peer_id(profile: Arc>) -> Result { - profile.read() - .await - .client() - .ipfs - .identity() - .await - .map(|(pubkey, addr)| (pubkey.into_peer_id(), addr)) - .and_then(|(peerid, mut addr)| { - ipfs::MultiaddrWithPeerId::try_from({ - addr.pop().expect("At least one address for client") - }) - .map_err(anyhow::Error::from) - }) - } - - let left_running_reactor = tokio::spawn(async move { - left_reactor.run().await - }); - - let right_running_reactor = tokio::spawn(async move { - right_reactor.run().await - }); - - let left_peer_id = get_peer_id(left_profile.clone()).await; - log::trace!("Left GossipReactor = {:?}", left_peer_id); - assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id); - let left_peer_id = left_peer_id.unwrap(); - - let right_peer_id = get_peer_id(right_profile.clone()).await; - log::trace!("Right GossipReactor = {:?}", right_peer_id); - assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id); - let right_peer_id = right_peer_id.unwrap(); - - let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - - log::trace!("Right GossipReactor should now connect to left GossipReactor"); - right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap(); - - log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply"); - match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await { - Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"), - Ok(Some(GossipReply::ConnectResult(Ok(())))) => { - log::trace!("Right GossipReactor is connected"); - assert!(true) - }, - Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), - Ok(None) => assert!(false, "No reply from right reactor received"), - } - } -} diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/reactor/gossip/msg.rs deleted file mode 100644 index 049fc68..0000000 --- a/lib/src/reactor/gossip/msg.rs +++ /dev/null @@ -1,17 +0,0 @@ -use anyhow::Result; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum GossipMessage { - CurrentProfileState { - peer_id: Vec, - cid: Vec, - }, -} - -impl GossipMessage { - pub(super) fn into_bytes(self) -> Result> { - serde_json::to_string(&self) - .map(String::into_bytes) - .map_err(anyhow::Error::from) - } -} diff --git a/lib/src/reactor/gossip/strategy.rs b/lib/src/reactor/gossip/strategy.rs deleted file mode 100644 index 6fe9d1a..0000000 --- a/lib/src/reactor/gossip/strategy.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::msg::GossipMessage; - -#[async_trait::async_trait] -pub trait GossipHandlingStrategy: Sync + Send { - async fn handle_gossip_message(profile: Arc>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()>; -} - -pub struct LogStrategy; - -#[async_trait::async_trait] -impl GossipHandlingStrategy for LogStrategy { - async fn handle_gossip_message(_profile: Arc>, source: ipfs::PeerId, msg: GossipMessage) -> Result<()> { - use std::convert::TryFrom; - match msg { - GossipMessage::CurrentProfileState { peer_id, cid } => { - let peer_id = ipfs::PeerId::from_bytes(&peer_id); - let cid = cid::Cid::try_from(&*cid); - - log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); - } - } - - Ok(()) - } -} diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs deleted file mode 100644 index 5299851..0000000 --- a/lib/src/reactor/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::sync::Arc; -use std::fmt::Debug; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; - -mod gossip; -mod device; -mod account; -mod ctrl; - -use ctrl::ReactorReceiver; - -#[async_trait::async_trait] -pub trait Reactor { - type Request: Debug + Send + Sync; - type Reply: Debug + Send + Sync; - - async fn run(self) -> Result<()>; -} - -pub trait ReactorBuilder { - type Reactor: Reactor; - - fn build_with_receiver(self, rr: ReactorReceiver<::Request, ::Reply>) -> Self::Reactor; -} -- cgit v1.2.3 From fdd50d1bf5da9cb177ba0134b96f72a93441062e Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 18 Dec 2021 22:53:09 +0100 Subject: Implement gossip reactor loading via oneshot channels for subscription initialization Signed-off-by: Matthias Beyer --- gui/src/app/message.rs | 8 ++- gui/src/app/mod.rs | 110 +++++++++++++++++++++++++++++------------ gui/src/gossip.rs | 51 +++++++++++++++++++ gui/src/main.rs | 1 + lib/src/client.rs | 4 ++ lib/src/gossip/deserializer.rs | 8 +-- lib/src/gossip/handler.rs | 8 +-- lib/src/gossip/mod.rs | 7 +-- lib/src/gossip/msg.rs | 2 +- 9 files changed, 153 insertions(+), 46 deletions(-) create mode 100644 gui/src/gossip.rs diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index 68be044..309a0bc 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -2,16 +2,22 @@ use std::sync::Arc; use cid::Cid; +use distrox_lib::gossip::GossipMessage; use distrox_lib::profile::Profile; use distrox_lib::types::Payload; -#[derive(Debug, Clone)] +use crate::gossip::GossipRecipe; + +#[derive(Clone, Debug)] pub enum Message { Loaded(Arc), FailedToLoad(String), ToggleLog, + GossipSubscriptionFailed(String), + GossipHandled(GossipMessage), + InputChanged(String), CreatePost, diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index dfd5677..878fabf 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::RwLock as StdRwLock; use anyhow::Result; use iced::Application; @@ -18,11 +19,16 @@ use crate::timeline::PostLoadingRecipe; mod message; pub use message::Message; +use crate::gossip::GossipRecipe; + #[derive(Debug)] enum Distrox { - Loading, + Loading { + gossip_subscription_recv: StdRwLock>, + }, Loaded { profile: Arc, + gossip_subscription_recv: StdRwLock>, scroll: scrollable::State, input: text_input::State, @@ -40,15 +46,33 @@ impl Application for Distrox { type Flags = String; fn new(name: String) -> (Self, iced::Command) { + let (gossip_subscription_sender, gossip_subscription_recv) = tokio::sync::oneshot::channel(); ( - Distrox::Loading, + Distrox::Loading { + gossip_subscription_recv: StdRwLock::new(gossip_subscription_recv), + }, + iced::Command::perform(async move { - match Profile::load(&name).await { - Err(e) => Message::FailedToLoad(e.to_string()), - Ok(instance) => { - Message::Loaded(Arc::new(instance)) - } + let profile = match Profile::load(&name).await { + Err(e) => return Message::FailedToLoad(e.to_string()), + Ok(instance) => Arc::new(instance), + }; + + if let Err(e) = profile.client() + .pubsub_subscribe("distrox".to_string()) + .await + .map_err(anyhow::Error::from) + .map(|stream| { + log::trace!("Subscription to 'distrox' pubsub channel worked"); + GossipRecipe::new(profile.clone(), stream) + }) + .and_then(|s| gossip_subscription_sender.send(s).map_err(|_| anyhow::anyhow!("Failed to initialize gossipping module"))) + { + log::error!("Failed to load gossip recipe"); + return Message::FailedToLoad(e.to_string()) } + + Message::Loaded(profile) }, |m: Message| -> Message { m }) ) } @@ -59,33 +83,30 @@ impl Application for Distrox { fn update(&mut self, message: Self::Message) -> iced::Command { match self { - Distrox::Loading => { - match message { - Message::Loaded(profile) => { - *self = Distrox::Loaded { - profile, - scroll: scrollable::State::default(), - input: text_input::State::default(), - input_value: String::default(), - timeline: Timeline::new(), - log_visible: false - }; - } + Distrox::Loading { gossip_subscription_recv } => { + if let Message::Loaded(profile) = message { + *self = Distrox::Loaded { + profile, - Message::FailedToLoad(e) => { - log::error!("Failed to load: {}", e); - *self = Distrox::FailedToStart; - } + // Don't even try to think what hoops I am jumping through here... + gossip_subscription_recv: std::mem::replace(gossip_subscription_recv, StdRwLock::new(tokio::sync::oneshot::channel().1)), + scroll: scrollable::State::default(), + input: text_input::State::default(), + input_value: String::default(), + timeline: Timeline::new(), + log_visible: false + }; - _ => {} } - } + iced::Command::none() + }, Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, .. } => { match message { Message::InputChanged(input) => { *input_value = input; + iced::Command::none() } Message::CreatePost => { @@ -100,37 +121,45 @@ impl Application for Distrox { |res| match res { Ok(cid) => Message::PostCreated(cid), Err(e) => Message::PostCreationFailed(e.to_string()) - }); + }) + } else { + iced::Command::none() } } Message::PostCreated(cid) => { *input_value = String::new(); log::info!("Post created: {}", cid); + iced::Command::none() } Message::PostCreationFailed(err) => { log::error!("Post creation failed: {}", err); + iced::Command::none() } Message::PostLoaded((payload, content)) => { timeline.push(payload, content); + iced::Command::none() } Message::PostLoadingFailed => { log::error!("Failed to load some post, TODO: Better error logging"); + iced::Command::none() } Message::TimelineScrolled(f) => { log::trace!("Timeline scrolled: {}", f); + iced::Command::none() } Message::ToggleLog => { log::trace!("Log toggled"); *log_visible = !*log_visible; + iced::Command::none() } - _ => {} + _ => iced::Command::none(), } } @@ -138,12 +167,11 @@ impl Application for Distrox { unimplemented!() } } - iced::Command::none() } fn view(&mut self) -> iced::Element { match self { - Distrox::Loading => { + Distrox::Loading { .. } => { let text = iced::Text::new("Loading"); let content = Column::new() @@ -252,10 +280,28 @@ impl Application for Distrox { }) }; - iced::Subscription::batch(vec![ + let gossip_sub = match self { + Distrox::Loaded { gossip_subscription_recv, .. } => { + match gossip_subscription_recv.write().ok() { + Some(mut sub) => sub.try_recv() + .ok() // Either empty or closed, ignore both + .map(|sub| iced::Subscription::from_recipe(sub)), + None => None + } + }, + _ => None, + }; + + let mut subscriptions = vec![ post_loading_subs, - keyboard_subs - ]) + keyboard_subs, + ]; + + if let Some(gossip_sub) = gossip_sub { + subscriptions.push(gossip_sub); + } + + iced::Subscription::batch(subscriptions) } } diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs new file mode 100644 index 0000000..4fef601 --- /dev/null +++ b/gui/src/gossip.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use futures::StreamExt; + +use distrox_lib::profile::Profile; +use distrox_lib::client::Client; + +use crate::app::Message; + +#[derive(Clone, Debug)] +pub struct GossipRecipe { + profile: Arc, + subscription: Arc, +} + +impl GossipRecipe { + pub fn new(profile: Arc, subscription: ipfs::SubscriptionStream) -> Self { + Self { profile, subscription: Arc::new(subscription) } + } +} + + +// Make sure iced can use our download stream +impl iced_native::subscription::Recipe for GossipRecipe +where + H: std::hash::Hasher, +{ + type Output = Message; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + unimplemented!() + } + + fn stream(self: Box, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> { + use distrox_lib::gossip::deserializer; + use distrox_lib::gossip::handler; + + // TODO: Do "right", whatever this means... + let stream = Arc::try_unwrap(self.subscription).unwrap(); + + Box::pin({ + let stream = deserializer::GossipDeserializer::::new().run(stream); + let stream = handler::GossipHandler::::new(self.profile.clone()).run(stream); + + stream.map(|(gossip_message, _handling_result)| { + Message::GossipHandled(gossip_message) + }) + }) + } +} diff --git a/gui/src/main.rs b/gui/src/main.rs index 6120152..486f007 100644 --- a/gui/src/main.rs +++ b/gui/src/main.rs @@ -4,6 +4,7 @@ mod app; mod cli; mod timeline; mod post; +mod gossip; fn main() -> Result<()> { let _ = env_logger::try_init()?; diff --git a/lib/src/client.rs b/lib/src/client.rs index d985769..2128f76 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -122,6 +122,10 @@ impl Client { self.get::(cid).await.map(|v| v.0) } + + pub async fn pubsub_subscribe(&self, topic: String) -> Result { + self.ipfs.pubsub_subscribe(topic).await.map_err(anyhow::Error::from) + } } fn now() -> DateTime { diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs index a50644f..dcd5082 100644 --- a/lib/src/gossip/deserializer.rs +++ b/lib/src/gossip/deserializer.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use futures::Stream; use futures::StreamExt; @@ -19,14 +21,14 @@ impl GossipDeserializer } } - pub fn run(mut self, input: S) -> impl Stream - where S: Stream + pub fn run(self, input: S) -> impl Stream + where S: Stream> { input.filter_map(|message| async move { log::trace!("Received gossip message"); match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { - Ok(m) => Some(m), + Ok(m) => Some((message.source, m)), Err(e) => { ErrStrategy::handle_error(e); None diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs index e524da8..7c9ffa6 100644 --- a/lib/src/gossip/handler.rs +++ b/lib/src/gossip/handler.rs @@ -18,14 +18,14 @@ use crate::gossip::GossipMessage; pub struct GossipHandler where Strategy: GossipHandlingStrategy + Sync + Send { - profile: Arc>, + profile: Arc, strategy: std::marker::PhantomData, } impl GossipHandler where Strat: GossipHandlingStrategy + Sync + Send { - pub fn new(profile: Arc>) -> Self { + pub fn new(profile: Arc) -> Self { Self { profile, strategy: std::marker::PhantomData, @@ -48,14 +48,14 @@ impl GossipHandler #[async_trait::async_trait] pub trait GossipHandlingStrategy: Sync + Send { - async fn handle_gossip_message(profile: Arc>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; + async fn handle_gossip_message(profile: Arc, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; } pub struct LogStrategy; #[async_trait::async_trait] impl GossipHandlingStrategy for LogStrategy { - async fn handle_gossip_message(_profile: Arc>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { + async fn handle_gossip_message(_profile: Arc, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { use std::convert::TryFrom; use std::ops::Deref; diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs index d6a6963..4f4d143 100644 --- a/lib/src/gossip/mod.rs +++ b/lib/src/gossip/mod.rs @@ -1,9 +1,6 @@ mod msg; pub use msg::GossipMessage; -mod handler; -pub use handler::*; - -mod deserializer; -pub use deserializer::*; +pub mod deserializer; +pub mod handler; diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs index 049fc68..f364762 100644 --- a/lib/src/gossip/msg.rs +++ b/lib/src/gossip/msg.rs @@ -1,6 +1,6 @@ use anyhow::Result; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum GossipMessage { CurrentProfileState { peer_id: Vec, -- cgit v1.2.3 From 8aae93d4f47ff6a14120fab34a39b88c8fdb74a5 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 22:28:35 +0100 Subject: Add message explanation logging Signed-off-by: Matthias Beyer --- gui/src/app/message.rs | 26 ++++++++++++++++++++++++++ gui/src/app/mod.rs | 1 + 2 files changed, 27 insertions(+) diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index 309a0bc..baffa31 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -29,3 +29,29 @@ pub enum Message { TimelineScrolled(f32), } + +impl Message { + pub fn description(&self) -> &'static str { + match self { + Message::Loaded(_) => "Loaded", + Message::FailedToLoad(_) => "FailedToLoad", + + Message::ToggleLog => "ToggleLog", + + Message::GossipSubscriptionFailed(_) => "GossipSubscriptionFailed", + Message::GossipHandled(_) => "GossipHandled", + + Message::InputChanged(_) => "InputChanged", + Message::CreatePost => "CreatePost", + + Message::PostCreated(_) => "PostCreated", + Message::PostCreationFailed(_) => "PostCreationFailed", + + Message::PostLoaded(_) => "PostLoaded", + Message::PostLoadingFailed => "PostLoadingFailed", + + Message::TimelineScrolled(_) => "TimelineScrolled", + } + } +} + diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index 878fabf..857debf 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -82,6 +82,7 @@ impl Application for Distrox { } fn update(&mut self, message: Self::Message) -> iced::Command { + log::trace!("Received message: {}", message.description()); match self { Distrox::Loading { gossip_subscription_recv } => { if let Message::Loaded(profile) = message { -- cgit v1.2.3 From 77b3592fba4fd9aed61efbcbfc72dceb31cfd2be Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 22:36:13 +0100 Subject: Impl hash() for GossipRecipe Signed-off-by: Matthias Beyer --- gui/src/gossip.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs index 4fef601..ac6bab0 100644 --- a/gui/src/gossip.rs +++ b/gui/src/gossip.rs @@ -29,7 +29,8 @@ where fn hash(&self, state: &mut H) { use std::hash::Hash; - unimplemented!() + struct Marker; + std::any::TypeId::of::().hash(state); } fn stream(self: Box, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> { -- cgit v1.2.3 From 904e7a053c519d6d93ba802b7931da3fbc97cc19 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 22:48:37 +0100 Subject: Add Gossip message logging Signed-off-by: Matthias Beyer --- gui/src/app/mod.rs | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index 857debf..ba0675b 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -36,6 +36,7 @@ enum Distrox { timeline: Timeline, log_visible: bool, + log: std::collections::VecDeque, }, FailedToStart, } @@ -95,7 +96,8 @@ impl Application for Distrox { input: text_input::State::default(), input_value: String::default(), timeline: Timeline::new(), - log_visible: false + log_visible: false, + log: std::collections::VecDeque::with_capacity(1000), }; @@ -103,7 +105,7 @@ impl Application for Distrox { iced::Command::none() }, - Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, .. } => { + Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, log, .. } => { match message { Message::InputChanged(input) => { *input_value = input; @@ -160,6 +162,15 @@ impl Application for Distrox { iced::Command::none() } + Message::GossipHandled(msg) => { + log::trace!("Gossip handled, adding to log: {:?}", msg); + log.push_back(msg); + while log.len() > 1000 { + let _ = log.pop_front(); + } + iced::Command::none() + } + _ => iced::Command::none(), } } @@ -187,7 +198,7 @@ impl Application for Distrox { .into() } - Distrox::Loaded { input, input_value, timeline, scroll, log_visible, .. } => { + Distrox::Loaded { input, input_value, timeline, scroll, log_visible, log, .. } => { let left_column = Column::new() .into(); @@ -230,11 +241,21 @@ impl Application for Distrox { .push(content); if *log_visible { - let log = Column::new() - .push({ - iced::Text::new("Here goes some log,... not yet implemented!") - .size(8) - }); + let log = Column::with_children({ + log.iter() + .map(|msg| { + use distrox_lib::gossip::GossipMessage; + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + format!("Peer {:?} is at {:?}", peer_id, cid) + } + } + }) + .map(iced::Text::new) + .map(|txt| txt.size(8)) + .map(iced::Element::from) + .collect() + }); content.push(log) } else { content -- cgit v1.2.3 From bb978abee6022a5b65189d4cddfdd1acb3c30b79 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 22:52:15 +0100 Subject: Store String in log, so we can write normal logging stuff to that log, too Signed-off-by: Matthias Beyer --- gui/src/app/mod.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index ba0675b..4426e2d 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -36,7 +36,7 @@ enum Distrox { timeline: Timeline, log_visible: bool, - log: std::collections::VecDeque, + log: std::collections::VecDeque, }, FailedToStart, } @@ -163,7 +163,14 @@ impl Application for Distrox { } Message::GossipHandled(msg) => { + use distrox_lib::gossip::GossipMessage; + log::trace!("Gossip handled, adding to log: {:?}", msg); + let msg = match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + format!("Peer {:?} is at {:?}", peer_id, cid) + } + }; log.push_back(msg); while log.len() > 1000 { let _ = log.pop_front(); @@ -243,14 +250,6 @@ impl Application for Distrox { if *log_visible { let log = Column::with_children({ log.iter() - .map(|msg| { - use distrox_lib::gossip::GossipMessage; - match msg { - GossipMessage::CurrentProfileState { peer_id, cid } => { - format!("Peer {:?} is at {:?}", peer_id, cid) - } - } - }) .map(iced::Text::new) .map(|txt| txt.size(8)) .map(iced::Element::from) -- cgit v1.2.3 From f1fb13130d471912401d91b284456f325701f6f5 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 23:07:48 +0100 Subject: Make GossipMessage::into_bytes() pub in module Signed-off-by: Matthias Beyer --- lib/src/gossip/msg.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs index f364762..8a6e6d2 100644 --- a/lib/src/gossip/msg.rs +++ b/lib/src/gossip/msg.rs @@ -9,7 +9,7 @@ pub enum GossipMessage { } impl GossipMessage { - pub(super) fn into_bytes(self) -> Result> { + pub(crate) fn into_bytes(self) -> Result> { serde_json::to_string(&self) .map(String::into_bytes) .map_err(anyhow::Error::from) -- cgit v1.2.3 From dfe25bddacd903154d73ac5ac7054dd1c3f9d754 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 23:08:02 +0100 Subject: Add Profile::gossip_own_state() Signed-off-by: Matthias Beyer --- lib/src/profile/mod.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs index d1ab90f..3b1e063 100644 --- a/lib/src/profile/mod.rs +++ b/lib/src/profile/mod.rs @@ -169,6 +169,30 @@ impl Profile { pub fn add_device(&mut self, d: Device) -> Result<()> { self.state.add_device(d) } + + pub async fn gossip_own_state(&self, topic: String) -> Result<()> { + let cid = self.state + .profile_head() + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Profile has no HEAD yet"))? + .to_bytes(); + + let peer_id = self.client + .own_id() + .await? + .to_bytes(); + + self.client + .ipfs + .pubsub_publish(topic, { + crate::gossip::GossipMessage::CurrentProfileState { + peer_id, + cid, + }.into_bytes()? + }) + .await + .map_err(anyhow::Error::from) + } } -- cgit v1.2.3 From 5a58021581187d11fb26d0de9d19514e6383956f Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 19 Dec 2021 23:08:23 +0100 Subject: Add gossipping of own state in gui Signed-off-by: Matthias Beyer --- gui/src/app/message.rs | 8 ++++++++ gui/src/app/mod.rs | 29 +++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index baffa31..dcb7477 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -18,6 +18,10 @@ pub enum Message { GossipSubscriptionFailed(String), GossipHandled(GossipMessage), + PublishGossipAboutMe, + OwnStateGossipped, + GossippingFailed(String), + InputChanged(String), CreatePost, @@ -41,6 +45,10 @@ impl Message { Message::GossipSubscriptionFailed(_) => "GossipSubscriptionFailed", Message::GossipHandled(_) => "GossipHandled", + Message::PublishGossipAboutMe => "PublishGossipAboutMe", + Message::OwnStateGossipped => "OwnStateGossipped", + Message::GossippingFailed(_) => "GossippingFailed", + Message::InputChanged(_) => "InputChanged", Message::CreatePost => "CreatePost", diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index 4426e2d..77f6c9d 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -178,6 +178,29 @@ impl Application for Distrox { iced::Command::none() } + Message::PublishGossipAboutMe => { + let profile = profile.clone(); + iced::Command::perform(async move { + if let Err(e) = profile.gossip_own_state("distrox".to_string()).await { + Message::GossippingFailed(e.to_string()) + } else { + Message::OwnStateGossipped + } + }, |m: Message| -> Message { m }) + } + + Message::OwnStateGossipped => { + log::trace!("Gossipped own state"); + log.push_back("Gossipped own state".to_string()); + iced::Command::none() + } + + Message::GossippingFailed(e) => { + log::trace!("Gossipped failed: {}", e); + log.push_back(format!("Gossipped failed: {}", e)); + iced::Command::none() + } + _ => iced::Command::none(), } } @@ -313,9 +336,15 @@ impl Application for Distrox { _ => None, }; + let gossip_sending_sub = { + iced::time::every(std::time::Duration::from_millis(100)) + .map(|_| Message::PublishGossipAboutMe) + }; + let mut subscriptions = vec![ post_loading_subs, keyboard_subs, + gossip_sending_sub, ]; if let Some(gossip_sub) = gossip_sub { -- cgit v1.2.3