diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 22:53:09 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-19 22:36:48 +0100 |
commit | fdd50d1bf5da9cb177ba0134b96f72a93441062e (patch) | |
tree | 40a9488dcc8ac426ab3b0d8386318bce8b1f76c9 | |
parent | 542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 (diff) |
Implement gossip reactor loading via oneshot channels for subscription initialization
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | gui/src/app/message.rs | 8 | ||||
-rw-r--r-- | gui/src/app/mod.rs | 110 | ||||
-rw-r--r-- | gui/src/gossip.rs | 51 | ||||
-rw-r--r-- | gui/src/main.rs | 1 | ||||
-rw-r--r-- | lib/src/client.rs | 4 | ||||
-rw-r--r-- | lib/src/gossip/deserializer.rs | 8 | ||||
-rw-r--r-- | lib/src/gossip/handler.rs | 8 | ||||
-rw-r--r-- | lib/src/gossip/mod.rs | 7 | ||||
-rw-r--r-- | lib/src/gossip/msg.rs | 2 |
9 files changed, 153 insertions, 46 deletions
diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index 68be044..309a0bc 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -2,16 +2,22 @@ use std::sync::Arc; use cid::Cid; +use distrox_lib::gossip::GossipMessage; use distrox_lib::profile::Profile; use distrox_lib::types::Payload; -#[derive(Debug, Clone)] +use crate::gossip::GossipRecipe; + +#[derive(Clone, Debug)] pub enum Message { Loaded(Arc<Profile>), FailedToLoad(String), ToggleLog, + GossipSubscriptionFailed(String), + GossipHandled(GossipMessage), + InputChanged(String), CreatePost, diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index dfd5677..878fabf 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::RwLock as StdRwLock; use anyhow::Result; use iced::Application; @@ -18,11 +19,16 @@ use crate::timeline::PostLoadingRecipe; mod message; pub use message::Message; +use crate::gossip::GossipRecipe; + #[derive(Debug)] enum Distrox { - Loading, + Loading { + gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>, + }, Loaded { profile: Arc<Profile>, + gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>, scroll: scrollable::State, input: text_input::State, @@ -40,15 +46,33 @@ impl Application for Distrox { type Flags = String; fn new(name: String) -> (Self, iced::Command<Self::Message>) { + let (gossip_subscription_sender, gossip_subscription_recv) = tokio::sync::oneshot::channel(); ( - Distrox::Loading, + Distrox::Loading { + gossip_subscription_recv: StdRwLock::new(gossip_subscription_recv), + }, + iced::Command::perform(async move { - match Profile::load(&name).await { - Err(e) => Message::FailedToLoad(e.to_string()), - Ok(instance) => { - Message::Loaded(Arc::new(instance)) - } + let profile = match Profile::load(&name).await { + Err(e) => return Message::FailedToLoad(e.to_string()), + Ok(instance) => Arc::new(instance), + }; + + if let Err(e) = profile.client() + .pubsub_subscribe("distrox".to_string()) + .await + .map_err(anyhow::Error::from) + .map(|stream| { + log::trace!("Subscription to 'distrox' pubsub channel worked"); + GossipRecipe::new(profile.clone(), stream) + }) + .and_then(|s| gossip_subscription_sender.send(s).map_err(|_| anyhow::anyhow!("Failed to initialize gossipping module"))) + { + log::error!("Failed to load gossip recipe"); + return Message::FailedToLoad(e.to_string()) } + + Message::Loaded(profile) }, |m: Message| -> Message { m }) ) } @@ -59,33 +83,30 @@ impl Application for Distrox { fn update(&mut self, message: Self::Message) -> iced::Command<Self::Message> { match self { - Distrox::Loading => { - match message { - Message::Loaded(profile) => { - *self = Distrox::Loaded { - profile, - scroll: scrollable::State::default(), - input: text_input::State::default(), - input_value: String::default(), - timeline: Timeline::new(), - log_visible: false - }; - } + Distrox::Loading { gossip_subscription_recv } => { + if let Message::Loaded(profile) = message { + *self = Distrox::Loaded { + profile, - Message::FailedToLoad(e) => { - log::error!("Failed to load: {}", e); - *self = Distrox::FailedToStart; - } + // Don't even try to think what hoops I am jumping through here... + gossip_subscription_recv: std::mem::replace(gossip_subscription_recv, StdRwLock::new(tokio::sync::oneshot::channel().1)), + scroll: scrollable::State::default(), + input: text_input::State::default(), + input_value: String::default(), + timeline: Timeline::new(), + log_visible: false + }; - _ => {} } - } + iced::Command::none() + }, Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, .. } => { match message { Message::InputChanged(input) => { *input_value = input; + iced::Command::none() } Message::CreatePost => { @@ -100,37 +121,45 @@ impl Application for Distrox { |res| match res { Ok(cid) => Message::PostCreated(cid), Err(e) => Message::PostCreationFailed(e.to_string()) - }); + }) + } else { + iced::Command::none() } } Message::PostCreated(cid) => { *input_value = String::new(); log::info!("Post created: {}", cid); + iced::Command::none() } Message::PostCreationFailed(err) => { log::error!("Post creation failed: {}", err); + iced::Command::none() } Message::PostLoaded((payload, content)) => { timeline.push(payload, content); + iced::Command::none() } Message::PostLoadingFailed => { log::error!("Failed to load some post, TODO: Better error logging"); + iced::Command::none() } Message::TimelineScrolled(f) => { log::trace!("Timeline scrolled: {}", f); + iced::Command::none() } Message::ToggleLog => { log::trace!("Log toggled"); *log_visible = !*log_visible; + iced::Command::none() } - _ => {} + _ => iced::Command::none(), } } @@ -138,12 +167,11 @@ impl Application for Distrox { unimplemented!() } } - iced::Command::none() } fn view(&mut self) -> iced::Element<Self::Message> { match self { - Distrox::Loading => { + Distrox::Loading { .. } => { let text = iced::Text::new("Loading"); let content = Column::new() @@ -252,10 +280,28 @@ impl Application for Distrox { }) }; - iced::Subscription::batch(vec![ + let gossip_sub = match self { + Distrox::Loaded { gossip_subscription_recv, .. } => { + match gossip_subscription_recv.write().ok() { + Some(mut sub) => sub.try_recv() + .ok() // Either empty or closed, ignore both + .map(|sub| iced::Subscription::from_recipe(sub)), + None => None + } + }, + _ => None, + }; + + let mut subscriptions = vec![ post_loading_subs, - keyboard_subs - ]) + keyboard_subs, + ]; + + if let Some(gossip_sub) = gossip_sub { + subscriptions.push(gossip_sub); + } + + iced::Subscription::batch(subscriptions) } } diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs new file mode 100644 index 0000000..4fef601 --- /dev/null +++ b/gui/src/gossip.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use futures::StreamExt; + +use distrox_lib::profile::Profile; +use distrox_lib::client::Client; + +use crate::app::Message; + +#[derive(Clone, Debug)] +pub struct GossipRecipe { + profile: Arc<Profile>, + subscription: Arc<ipfs::SubscriptionStream>, +} + +impl GossipRecipe { + pub fn new(profile: Arc<Profile>, subscription: ipfs::SubscriptionStream) -> Self { + Self { profile, subscription: Arc::new(subscription) } + } +} + + +// Make sure iced can use our download stream +impl<H, I> iced_native::subscription::Recipe<H, I> for GossipRecipe +where + H: std::hash::Hasher, +{ + type Output = Message; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + unimplemented!() + } + + fn stream(self: Box<Self>, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> { + use distrox_lib::gossip::deserializer; + use distrox_lib::gossip::handler; + + // TODO: Do "right", whatever this means... + let stream = Arc::try_unwrap(self.subscription).unwrap(); + + Box::pin({ + let stream = deserializer::GossipDeserializer::<deserializer::LogStrategy>::new().run(stream); + let stream = handler::GossipHandler::<handler::LogStrategy>::new(self.profile.clone()).run(stream); + + stream.map(|(gossip_message, _handling_result)| { + Message::GossipHandled(gossip_message) + }) + }) + } +} diff --git a/gui/src/main.rs b/gui/src/main.rs index 6120152..486f007 100644 --- a/gui/src/main.rs +++ b/gui/src/main.rs @@ -4,6 +4,7 @@ mod app; mod cli; mod timeline; mod post; +mod gossip; fn main() -> Result<()> { let _ = env_logger::try_init()?; diff --git a/lib/src/client.rs b/lib/src/client.rs index d985769..2128f76 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -122,6 +122,10 @@ impl Client { self.get::<S>(cid).await.map(|v| v.0) } + + pub async fn pubsub_subscribe(&self, topic: String) -> Result<ipfs::SubscriptionStream> { + self.ipfs.pubsub_subscribe(topic).await.map_err(anyhow::Error::from) + } } fn now() -> DateTime { diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs index a50644f..dcd5082 100644 --- a/lib/src/gossip/deserializer.rs +++ b/lib/src/gossip/deserializer.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use futures::Stream; use futures::StreamExt; @@ -19,14 +21,14 @@ impl<ErrStrategy> GossipDeserializer<ErrStrategy> } } - pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage> - where S: Stream<Item = ipfs::PubsubMessage> + pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)> + where S: Stream<Item = Arc<ipfs::PubsubMessage>> { input.filter_map(|message| async move { log::trace!("Received gossip message"); match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { - Ok(m) => Some(m), + Ok(m) => Some((message.source, m)), Err(e) => { ErrStrategy::handle_error(e); None diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs index e524da8..7c9ffa6 100644 --- a/lib/src/gossip/handler.rs +++ b/lib/src/gossip/handler.rs @@ -18,14 +18,14 @@ use crate::gossip::GossipMessage; pub struct GossipHandler<Strategy = LogStrategy> where Strategy: GossipHandlingStrategy + Sync + Send { - profile: Arc<RwLock<Profile>>, + profile: Arc<Profile>, strategy: std::marker::PhantomData<Strategy>, } impl<Strat> GossipHandler<Strat> where Strat: GossipHandlingStrategy + Sync + Send { - pub fn new(profile: Arc<RwLock<Profile>>) -> Self { + pub fn new(profile: Arc<Profile>) -> Self { Self { profile, strategy: std::marker::PhantomData, @@ -48,14 +48,14 @@ impl<Strat> GossipHandler<Strat> #[async_trait::async_trait] pub trait GossipHandlingStrategy: Sync + Send { - async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; + async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; } pub struct LogStrategy; #[async_trait::async_trait] impl GossipHandlingStrategy for LogStrategy { - async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { + async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { use std::convert::TryFrom; use std::ops::Deref; diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs index d6a6963..4f4d143 100644 --- a/lib/src/gossip/mod.rs +++ b/lib/src/gossip/mod.rs @@ -1,9 +1,6 @@ mod msg; pub use msg::GossipMessage; -mod handler; -pub use handler::*; - -mod deserializer; -pub use deserializer::*; +pub mod deserializer; +pub mod handler; diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs index 049fc68..f364762 100644 --- a/lib/src/gossip/msg.rs +++ b/lib/src/gossip/msg.rs @@ -1,6 +1,6 @@ use anyhow::Result; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum GossipMessage { CurrentProfileState { peer_id: Vec<u8>, |