summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:53:09 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-19 22:36:48 +0100
commitfdd50d1bf5da9cb177ba0134b96f72a93441062e (patch)
tree40a9488dcc8ac426ab3b0d8386318bce8b1f76c9
parent542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 (diff)
Implement gossip reactor loading via oneshot channels for subscription initialization
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--gui/src/app/message.rs8
-rw-r--r--gui/src/app/mod.rs110
-rw-r--r--gui/src/gossip.rs51
-rw-r--r--gui/src/main.rs1
-rw-r--r--lib/src/client.rs4
-rw-r--r--lib/src/gossip/deserializer.rs8
-rw-r--r--lib/src/gossip/handler.rs8
-rw-r--r--lib/src/gossip/mod.rs7
-rw-r--r--lib/src/gossip/msg.rs2
9 files changed, 153 insertions, 46 deletions
diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs
index 68be044..309a0bc 100644
--- a/gui/src/app/message.rs
+++ b/gui/src/app/message.rs
@@ -2,16 +2,22 @@ use std::sync::Arc;
use cid::Cid;
+use distrox_lib::gossip::GossipMessage;
use distrox_lib::profile::Profile;
use distrox_lib::types::Payload;
-#[derive(Debug, Clone)]
+use crate::gossip::GossipRecipe;
+
+#[derive(Clone, Debug)]
pub enum Message {
Loaded(Arc<Profile>),
FailedToLoad(String),
ToggleLog,
+ GossipSubscriptionFailed(String),
+ GossipHandled(GossipMessage),
+
InputChanged(String),
CreatePost,
diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs
index dfd5677..878fabf 100644
--- a/gui/src/app/mod.rs
+++ b/gui/src/app/mod.rs
@@ -1,4 +1,5 @@
use std::sync::Arc;
+use std::sync::RwLock as StdRwLock;
use anyhow::Result;
use iced::Application;
@@ -18,11 +19,16 @@ use crate::timeline::PostLoadingRecipe;
mod message;
pub use message::Message;
+use crate::gossip::GossipRecipe;
+
#[derive(Debug)]
enum Distrox {
- Loading,
+ Loading {
+ gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>,
+ },
Loaded {
profile: Arc<Profile>,
+ gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>,
scroll: scrollable::State,
input: text_input::State,
@@ -40,15 +46,33 @@ impl Application for Distrox {
type Flags = String;
fn new(name: String) -> (Self, iced::Command<Self::Message>) {
+ let (gossip_subscription_sender, gossip_subscription_recv) = tokio::sync::oneshot::channel();
(
- Distrox::Loading,
+ Distrox::Loading {
+ gossip_subscription_recv: StdRwLock::new(gossip_subscription_recv),
+ },
+
iced::Command::perform(async move {
- match Profile::load(&name).await {
- Err(e) => Message::FailedToLoad(e.to_string()),
- Ok(instance) => {
- Message::Loaded(Arc::new(instance))
- }
+ let profile = match Profile::load(&name).await {
+ Err(e) => return Message::FailedToLoad(e.to_string()),
+ Ok(instance) => Arc::new(instance),
+ };
+
+ if let Err(e) = profile.client()
+ .pubsub_subscribe("distrox".to_string())
+ .await
+ .map_err(anyhow::Error::from)
+ .map(|stream| {
+ log::trace!("Subscription to 'distrox' pubsub channel worked");
+ GossipRecipe::new(profile.clone(), stream)
+ })
+ .and_then(|s| gossip_subscription_sender.send(s).map_err(|_| anyhow::anyhow!("Failed to initialize gossipping module")))
+ {
+ log::error!("Failed to load gossip recipe");
+ return Message::FailedToLoad(e.to_string())
}
+
+ Message::Loaded(profile)
}, |m: Message| -> Message { m })
)
}
@@ -59,33 +83,30 @@ impl Application for Distrox {
fn update(&mut self, message: Self::Message) -> iced::Command<Self::Message> {
match self {
- Distrox::Loading => {
- match message {
- Message::Loaded(profile) => {
- *self = Distrox::Loaded {
- profile,
- scroll: scrollable::State::default(),
- input: text_input::State::default(),
- input_value: String::default(),
- timeline: Timeline::new(),
- log_visible: false
- };
- }
+ Distrox::Loading { gossip_subscription_recv } => {
+ if let Message::Loaded(profile) = message {
+ *self = Distrox::Loaded {
+ profile,
- Message::FailedToLoad(e) => {
- log::error!("Failed to load: {}", e);
- *self = Distrox::FailedToStart;
- }
+ // Don't even try to think what hoops I am jumping through here...
+ gossip_subscription_recv: std::mem::replace(gossip_subscription_recv, StdRwLock::new(tokio::sync::oneshot::channel().1)),
+ scroll: scrollable::State::default(),
+ input: text_input::State::default(),
+ input_value: String::default(),
+ timeline: Timeline::new(),
+ log_visible: false
+ };
- _ => {}
}
- }
+ iced::Command::none()
+ },
Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, .. } => {
match message {
Message::InputChanged(input) => {
*input_value = input;
+ iced::Command::none()
}
Message::CreatePost => {
@@ -100,37 +121,45 @@ impl Application for Distrox {
|res| match res {
Ok(cid) => Message::PostCreated(cid),
Err(e) => Message::PostCreationFailed(e.to_string())
- });
+ })
+ } else {
+ iced::Command::none()
}
}
Message::PostCreated(cid) => {
*input_value = String::new();
log::info!("Post created: {}", cid);
+ iced::Command::none()
}
Message::PostCreationFailed(err) => {
log::error!("Post creation failed: {}", err);
+ iced::Command::none()
}
Message::PostLoaded((payload, content)) => {
timeline.push(payload, content);
+ iced::Command::none()
}
Message::PostLoadingFailed => {
log::error!("Failed to load some post, TODO: Better error logging");
+ iced::Command::none()
}
Message::TimelineScrolled(f) => {
log::trace!("Timeline scrolled: {}", f);
+ iced::Command::none()
}
Message::ToggleLog => {
log::trace!("Log toggled");
*log_visible = !*log_visible;
+ iced::Command::none()
}
- _ => {}
+ _ => iced::Command::none(),
}
}
@@ -138,12 +167,11 @@ impl Application for Distrox {
unimplemented!()
}
}
- iced::Command::none()
}
fn view(&mut self) -> iced::Element<Self::Message> {
match self {
- Distrox::Loading => {
+ Distrox::Loading { .. } => {
let text = iced::Text::new("Loading");
let content = Column::new()
@@ -252,10 +280,28 @@ impl Application for Distrox {
})
};
- iced::Subscription::batch(vec![
+ let gossip_sub = match self {
+ Distrox::Loaded { gossip_subscription_recv, .. } => {
+ match gossip_subscription_recv.write().ok() {
+ Some(mut sub) => sub.try_recv()
+ .ok() // Either empty or closed, ignore both
+ .map(|sub| iced::Subscription::from_recipe(sub)),
+ None => None
+ }
+ },
+ _ => None,
+ };
+
+ let mut subscriptions = vec![
post_loading_subs,
- keyboard_subs
- ])
+ keyboard_subs,
+ ];
+
+ if let Some(gossip_sub) = gossip_sub {
+ subscriptions.push(gossip_sub);
+ }
+
+ iced::Subscription::batch(subscriptions)
}
}
diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs
new file mode 100644
index 0000000..4fef601
--- /dev/null
+++ b/gui/src/gossip.rs
@@ -0,0 +1,51 @@
+use std::sync::Arc;
+
+use futures::StreamExt;
+
+use distrox_lib::profile::Profile;
+use distrox_lib::client::Client;
+
+use crate::app::Message;
+
+#[derive(Clone, Debug)]
+pub struct GossipRecipe {
+ profile: Arc<Profile>,
+ subscription: Arc<ipfs::SubscriptionStream>,
+}
+
+impl GossipRecipe {
+ pub fn new(profile: Arc<Profile>, subscription: ipfs::SubscriptionStream) -> Self {
+ Self { profile, subscription: Arc::new(subscription) }
+ }
+}
+
+
+// Make sure iced can use our download stream
+impl<H, I> iced_native::subscription::Recipe<H, I> for GossipRecipe
+where
+ H: std::hash::Hasher,
+{
+ type Output = Message;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+ unimplemented!()
+ }
+
+ fn stream(self: Box<Self>, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> {
+ use distrox_lib::gossip::deserializer;
+ use distrox_lib::gossip::handler;
+
+ // TODO: Do "right", whatever this means...
+ let stream = Arc::try_unwrap(self.subscription).unwrap();
+
+ Box::pin({
+ let stream = deserializer::GossipDeserializer::<deserializer::LogStrategy>::new().run(stream);
+ let stream = handler::GossipHandler::<handler::LogStrategy>::new(self.profile.clone()).run(stream);
+
+ stream.map(|(gossip_message, _handling_result)| {
+ Message::GossipHandled(gossip_message)
+ })
+ })
+ }
+}
diff --git a/gui/src/main.rs b/gui/src/main.rs
index 6120152..486f007 100644
--- a/gui/src/main.rs
+++ b/gui/src/main.rs
@@ -4,6 +4,7 @@ mod app;
mod cli;
mod timeline;
mod post;
+mod gossip;
fn main() -> Result<()> {
let _ = env_logger::try_init()?;
diff --git a/lib/src/client.rs b/lib/src/client.rs
index d985769..2128f76 100644
--- a/lib/src/client.rs
+++ b/lib/src/client.rs
@@ -122,6 +122,10 @@ impl Client {
self.get::<S>(cid).await.map(|v| v.0)
}
+
+ pub async fn pubsub_subscribe(&self, topic: String) -> Result<ipfs::SubscriptionStream> {
+ self.ipfs.pubsub_subscribe(topic).await.map_err(anyhow::Error::from)
+ }
}
fn now() -> DateTime {
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs
index a50644f..dcd5082 100644
--- a/lib/src/gossip/deserializer.rs
+++ b/lib/src/gossip/deserializer.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use anyhow::Result;
use futures::Stream;
use futures::StreamExt;
@@ -19,14 +21,14 @@ impl<ErrStrategy> GossipDeserializer<ErrStrategy>
}
}
- pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage>
- where S: Stream<Item = ipfs::PubsubMessage>
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)>
+ where S: Stream<Item = Arc<ipfs::PubsubMessage>>
{
input.filter_map(|message| async move {
log::trace!("Received gossip message");
match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) {
- Ok(m) => Some(m),
+ Ok(m) => Some((message.source, m)),
Err(e) => {
ErrStrategy::handle_error(e);
None
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
index e524da8..7c9ffa6 100644
--- a/lib/src/gossip/handler.rs
+++ b/lib/src/gossip/handler.rs
@@ -18,14 +18,14 @@ use crate::gossip::GossipMessage;
pub struct GossipHandler<Strategy = LogStrategy>
where Strategy: GossipHandlingStrategy + Sync + Send
{
- profile: Arc<RwLock<Profile>>,
+ profile: Arc<Profile>,
strategy: std::marker::PhantomData<Strategy>,
}
impl<Strat> GossipHandler<Strat>
where Strat: GossipHandlingStrategy + Sync + Send
{
- pub fn new(profile: Arc<RwLock<Profile>>) -> Self {
+ pub fn new(profile: Arc<Profile>) -> Self {
Self {
profile,
strategy: std::marker::PhantomData,
@@ -48,14 +48,14 @@ impl<Strat> GossipHandler<Strat>
#[async_trait::async_trait]
pub trait GossipHandlingStrategy: Sync + Send {
- async fn handle_gossip_message(profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
+ async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
}
pub struct LogStrategy;
#[async_trait::async_trait]
impl GossipHandlingStrategy for LogStrategy {
- async fn handle_gossip_message(_profile: Arc<RwLock<Profile>>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
+ async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
use std::convert::TryFrom;
use std::ops::Deref;
diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs
index d6a6963..4f4d143 100644
--- a/lib/src/gossip/mod.rs
+++ b/lib/src/gossip/mod.rs
@@ -1,9 +1,6 @@
mod msg;
pub use msg::GossipMessage;
-mod handler;
-pub use handler::*;
-
-mod deserializer;
-pub use deserializer::*;
+pub mod deserializer;
+pub mod handler;
diff --git a/lib/src/gossip/msg.rs b/lib/src/gossip/msg.rs
index 049fc68..f364762 100644
--- a/lib/src/gossip/msg.rs
+++ b/lib/src/gossip/msg.rs
@@ -1,6 +1,6 @@
use anyhow::Result;
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum GossipMessage {
CurrentProfileState {
peer_id: Vec<u8>,