diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-20 09:29:35 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-20 09:29:35 +0100 |
commit | a23b897b5c3c9ee721b793e26401d1863d97f84e (patch) | |
tree | d22a4fb5735bc72704f2622a4748ec3ba5c6189e | |
parent | 860177170ce583be7c7d86d8d81fdc6a7c402dc4 (diff) | |
parent | 5a58021581187d11fb26d0de9d19514e6383956f (diff) |
Merge branch 'gossipping-gui'
-rw-r--r-- | gui/src/app/message.rs | 44 | ||||
-rw-r--r-- | gui/src/app/mod.rs | 262 | ||||
-rw-r--r-- | gui/src/gossip.rs | 52 | ||||
-rw-r--r-- | gui/src/main.rs | 1 | ||||
-rw-r--r-- | lib/src/client.rs | 4 | ||||
-rw-r--r-- | lib/src/gossip/deserializer.rs | 57 | ||||
-rw-r--r-- | lib/src/gossip/handler.rs | 73 | ||||
-rw-r--r-- | lib/src/gossip/mod.rs | 6 | ||||
-rw-r--r-- | lib/src/gossip/msg.rs (renamed from lib/src/reactor/gossip/msg.rs) | 4 | ||||
-rw-r--r-- | lib/src/lib.rs | 2 | ||||
-rw-r--r-- | lib/src/profile/mod.rs | 24 | ||||
-rw-r--r-- | lib/src/reactor/account.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/ctrl.rs | 15 | ||||
-rw-r--r-- | lib/src/reactor/device.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/gossip/ctrl.rs | 19 | ||||
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 329 | ||||
-rw-r--r-- | lib/src/reactor/gossip/strategy.rs | 31 | ||||
-rw-r--r-- | lib/src/reactor/mod.rs | 28 |
18 files changed, 475 insertions, 514 deletions
diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs index f530774..dcb7477 100644 --- a/gui/src/app/message.rs +++ b/gui/src/app/message.rs @@ -2,14 +2,26 @@ use std::sync::Arc; use cid::Cid; +use distrox_lib::gossip::GossipMessage; use distrox_lib::profile::Profile; use distrox_lib::types::Payload; -#[derive(Debug, Clone)] +use crate::gossip::GossipRecipe; + +#[derive(Clone, Debug)] pub enum Message { Loaded(Arc<Profile>), FailedToLoad(String), + ToggleLog, + + GossipSubscriptionFailed(String), + GossipHandled(GossipMessage), + + PublishGossipAboutMe, + OwnStateGossipped, + GossippingFailed(String), + InputChanged(String), CreatePost, @@ -21,3 +33,33 @@ pub enum Message { TimelineScrolled(f32), } + +impl Message { + pub fn description(&self) -> &'static str { + match self { + Message::Loaded(_) => "Loaded", + Message::FailedToLoad(_) => "FailedToLoad", + + Message::ToggleLog => "ToggleLog", + + Message::GossipSubscriptionFailed(_) => "GossipSubscriptionFailed", + Message::GossipHandled(_) => "GossipHandled", + + Message::PublishGossipAboutMe => "PublishGossipAboutMe", + Message::OwnStateGossipped => "OwnStateGossipped", + Message::GossippingFailed(_) => "GossippingFailed", + + Message::InputChanged(_) => "InputChanged", + Message::CreatePost => "CreatePost", + + Message::PostCreated(_) => "PostCreated", + Message::PostCreationFailed(_) => "PostCreationFailed", + + Message::PostLoaded(_) => "PostLoaded", + Message::PostLoadingFailed => "PostLoadingFailed", + + Message::TimelineScrolled(_) => "TimelineScrolled", + } + } +} + diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs index 8fefdbc..77f6c9d 100644 --- a/gui/src/app/mod.rs +++ b/gui/src/app/mod.rs @@ -1,10 +1,12 @@ use std::sync::Arc; +use std::sync::RwLock as StdRwLock; use anyhow::Result; use iced::Application; use iced::Column; use iced::Container; use iced::Length; +use iced::Row; use iced::Scrollable; use iced::TextInput; use iced::scrollable; @@ -17,16 +19,24 @@ use crate::timeline::PostLoadingRecipe; mod message; pub use message::Message; +use crate::gossip::GossipRecipe; + #[derive(Debug)] enum Distrox { - Loading, + Loading { + gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>, + }, Loaded { profile: Arc<Profile>, + gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>, scroll: scrollable::State, input: text_input::State, input_value: String, timeline: Timeline, + + log_visible: bool, + log: std::collections::VecDeque<String>, }, FailedToStart, } @@ -37,15 +47,33 @@ impl Application for Distrox { type Flags = String; fn new(name: String) -> (Self, iced::Command<Self::Message>) { + let (gossip_subscription_sender, gossip_subscription_recv) = tokio::sync::oneshot::channel(); ( - Distrox::Loading, + Distrox::Loading { + gossip_subscription_recv: StdRwLock::new(gossip_subscription_recv), + }, + iced::Command::perform(async move { - match Profile::load(&name).await { - Err(e) => Message::FailedToLoad(e.to_string()), - Ok(instance) => { - Message::Loaded(Arc::new(instance)) - } + let profile = match Profile::load(&name).await { + Err(e) => return Message::FailedToLoad(e.to_string()), + Ok(instance) => Arc::new(instance), + }; + + if let Err(e) = profile.client() + .pubsub_subscribe("distrox".to_string()) + .await + .map_err(anyhow::Error::from) + .map(|stream| { + log::trace!("Subscription to 'distrox' pubsub channel worked"); + GossipRecipe::new(profile.clone(), stream) + }) + .and_then(|s| gossip_subscription_sender.send(s).map_err(|_| anyhow::anyhow!("Failed to initialize gossipping module"))) + { + log::error!("Failed to load gossip recipe"); + return Message::FailedToLoad(e.to_string()) } + + Message::Loaded(profile) }, |m: Message| -> Message { m }) ) } @@ -55,33 +83,33 @@ impl Application for Distrox { } fn update(&mut self, message: Self::Message) -> iced::Command<Self::Message> { + log::trace!("Received message: {}", message.description()); match self { - Distrox::Loading => { - match message { - Message::Loaded(profile) => { - *self = Distrox::Loaded { - profile, - scroll: scrollable::State::default(), - input: text_input::State::default(), - input_value: String::default(), - timeline: Timeline::new(), - }; - } + Distrox::Loading { gossip_subscription_recv } => { + if let Message::Loaded(profile) = message { + *self = Distrox::Loaded { + profile, + + // Don't even try to think what hoops I am jumping through here... + gossip_subscription_recv: std::mem::replace(gossip_subscription_recv, StdRwLock::new(tokio::sync::oneshot::channel().1)), + scroll: scrollable::State::default(), + input: text_input::State::default(), + input_value: String::default(), + timeline: Timeline::new(), + log_visible: false, + log: std::collections::VecDeque::with_capacity(1000), + }; - Message::FailedToLoad(e) => { - log::error!("Failed to load: {}", e); - *self = Distrox::FailedToStart; - } - - _ => {} } - } + iced::Command::none() + }, - Distrox::Loaded { profile, ref mut input_value, timeline, .. } => { + Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, log, .. } => { match message { Message::InputChanged(input) => { *input_value = input; + iced::Command::none() } Message::CreatePost => { @@ -96,32 +124,84 @@ impl Application for Distrox { |res| match res { Ok(cid) => Message::PostCreated(cid), Err(e) => Message::PostCreationFailed(e.to_string()) - }); + }) + } else { + iced::Command::none() } } Message::PostCreated(cid) => { *input_value = String::new(); log::info!("Post created: {}", cid); + iced::Command::none() } Message::PostCreationFailed(err) => { log::error!("Post creation failed: {}", err); + iced::Command::none() } Message::PostLoaded((payload, content)) => { timeline.push(payload, content); + iced::Command::none() } Message::PostLoadingFailed => { log::error!("Failed to load some post, TODO: Better error logging"); + iced::Command::none() } Message::TimelineScrolled(f) => { log::trace!("Timeline scrolled: {}", f); + iced::Command::none() + } + + Message::ToggleLog => { + log::trace!("Log toggled"); + *log_visible = !*log_visible; + iced::Command::none() } - _ => {} + Message::GossipHandled(msg) => { + use distrox_lib::gossip::GossipMessage; + + log::trace!("Gossip handled, adding to log: {:?}", msg); + let msg = match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + format!("Peer {:?} is at {:?}", peer_id, cid) + } + }; + log.push_back(msg); + while log.len() > 1000 { + let _ = log.pop_front(); + } + iced::Command::none() + } + + Message::PublishGossipAboutMe => { + let profile = profile.clone(); + iced::Command::perform(async move { + if let Err(e) = profile.gossip_own_state("distrox".to_string()).await { + Message::GossippingFailed(e.to_string()) + } else { + Message::OwnStateGossipped + } + }, |m: Message| -> Message { m }) + } + + Message::OwnStateGossipped => { + log::trace!("Gossipped own state"); + log.push_back("Gossipped own state".to_string()); + iced::Command::none() + } + + Message::GossippingFailed(e) => { + log::trace!("Gossipped failed: {}", e); + log.push_back(format!("Gossipped failed: {}", e)); + iced::Command::none() + } + + _ => iced::Command::none(), } } @@ -129,43 +209,79 @@ impl Application for Distrox { unimplemented!() } } - iced::Command::none() } fn view(&mut self) -> iced::Element<Self::Message> { match self { - Distrox::Loading => { + Distrox::Loading { .. } => { let text = iced::Text::new("Loading"); let content = Column::new() - .max_width(800) .spacing(20) .push(text); Container::new(content) .width(Length::Fill) + .height(Length::Fill) .center_x() + .center_y() .into() } - Distrox::Loaded { input, input_value, timeline, scroll, .. } => { - let input = TextInput::new( - input, - "What do you want to tell the world?", - input_value, - Message::InputChanged, - ) - .padding(15) - .size(12) - .on_submit(Message::CreatePost); - - let timeline = timeline.view(); - - Scrollable::new(scroll) - .padding(40) - .push(input) - .push(timeline) - .into() + Distrox::Loaded { input, input_value, timeline, scroll, log_visible, log, .. } => { + let left_column = Column::new() + .into(); + + let mid_column = Column::new() + .push({ + let input = TextInput::new( + input, + "What do you want to tell the world?", + input_value, + Message::InputChanged, + ) + .padding(15) + .size(12) + .on_submit(Message::CreatePost); + + let timeline = timeline.view(); + + Scrollable::new(scroll) + .padding(40) + .push(input) + .push(timeline) + }) + .into(); + + let right_column = Column::new() + .into(); + + let content = Row::with_children(vec![ + left_column, + mid_column, + right_column + ]) + .spacing(20) + .height(Length::Fill) + .width(Length::Fill); + + let content = Column::new() + .height(Length::Fill) + .width(Length::Fill) + .push(content); + + if *log_visible { + let log = Column::with_children({ + log.iter() + .map(iced::Text::new) + .map(|txt| txt.size(8)) + .map(iced::Element::from) + .collect() + }); + content.push(log) + } else { + content + }.into() } Distrox::FailedToStart => { @@ -175,7 +291,7 @@ impl Application for Distrox { } fn subscription(&self) -> iced::Subscription<Self::Message> { - match self { + let post_loading_subs = match self { Distrox::Loaded { profile, .. } => { let head = profile.head(); @@ -189,7 +305,53 @@ impl Application for Distrox { } } _ => iced::Subscription::none(), + }; + + let keyboard_subs = { + use iced_native::event::Event; + + iced_native::subscription::events_with(|event, _| { + match event { + Event::Keyboard(iced_native::keyboard::Event::KeyPressed { key_code, .. }) => { + if key_code == iced_native::keyboard::KeyCode::F11 { + Some(Message::ToggleLog) + } else { + None + } + }, + _ => None, + } + }) + }; + + let gossip_sub = match self { + Distrox::Loaded { gossip_subscription_recv, .. } => { + match gossip_subscription_recv.write().ok() { + Some(mut sub) => sub.try_recv() + .ok() // Either empty or closed, ignore both + .map(|sub| iced::Subscription::from_recipe(sub)), + None => None + } + }, + _ => None, + }; + + let gossip_sending_sub = { + iced::time::every(std::time::Duration::from_millis(100)) + .map(|_| Message::PublishGossipAboutMe) + }; + + let mut subscriptions = vec![ + post_loading_subs, + keyboard_subs, + gossip_sending_sub, + ]; + + if let Some(gossip_sub) = gossip_sub { + subscriptions.push(gossip_sub); } + + iced::Subscription::batch(subscriptions) } } diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs new file mode 100644 index 0000000..ac6bab0 --- /dev/null +++ b/gui/src/gossip.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; + +use futures::StreamExt; + +use distrox_lib::profile::Profile; +use distrox_lib::client::Client; + +use crate::app::Message; + +#[derive(Clone, Debug)] +pub struct GossipRecipe { + profile: Arc<Profile>, + subscription: Arc<ipfs::SubscriptionStream>, +} + +impl GossipRecipe { + pub fn new(profile: Arc<Profile>, subscription: ipfs::SubscriptionStream) -> Self { + Self { profile, subscription: Arc::new(subscription) } + } +} + + +// Make sure iced can use our download stream +impl<H, I> iced_native::subscription::Recipe<H, I> for GossipRecipe +where + H: std::hash::Hasher, +{ + type Output = Message; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + struct Marker; + std::any::TypeId::of::<Marker>().hash(state); + } + + fn stream(self: Box<Self>, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> { + use distrox_lib::gossip::deserializer; + use distrox_lib::gossip::handler; + + // TODO: Do "right", whatever this means... + let stream = Arc::try_unwrap(self.subscription).unwrap(); + + Box::pin({ + let stream = deserializer::GossipDeserializer::<deserializer::LogStrategy>::new().run(stream); + let stream = handler::GossipHandler::<handler::LogStrategy>::new(self.profile.clone()).run(stream); + + stream.map(|(gossip_message, _handling_result)| { + Message::GossipHandled(gossip_message) + }) + }) + } +} diff --git a/gui/src/main.rs b/gui/src/main.rs index 6120152..486f007 100644 --- a/gui/src/main.rs +++ b/gui/src/main.rs @@ -4,6 +4,7 @@ mod app; mod cli; mod timeline; mod post; +mod gossip; fn main() -> Result<()> { let _ = env_logger::try_init()?; diff --git a/lib/src/client.rs b/lib/src/client.rs index d985769..2128f76 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -122,6 +122,10 @@ impl Client { self.get::<S>(cid).await.map(|v| v.0) } + + pub async fn pubsub_subscribe(&self, topic: String) -> Result<ipfs::SubscriptionStream> { + self.ipfs.pubsub_subscribe(topic).await.map_err(anyhow::Error::from) + } } fn now() -> DateTime { diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs new file mode 100644 index 0000000..dcd5082 --- /dev/null +++ b/lib/src/gossip/deserializer.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; + +use crate::gossip::GossipMessage; + +pub struct GossipDeserializer<ErrStrategy = LogStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + strategy: std::marker::PhantomData<ErrStrategy>, +} + +impl<ErrStrategy> GossipDeserializer<ErrStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + pub fn new() -> Self { + Self { + strategy: std::marker::PhantomData, + } + } + + pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)> + where S: Stream<Item = Arc<ipfs::PubsubMessage>> + { + input.filter_map(|message| async move { + log::trace!("Received gossip message"); + + match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { + Ok(m) => Some((message.source, m)), + Err(e) => { + ErrStrategy::handle_error(e); + None + } + } + }) + } +} + +pub trait GossipDeserializerErrorStrategy { + fn handle_error(err: anyhow::Error); +} + +pub struct LogStrategy; +impl GossipDeserializerErrorStrategy for LogStrategy { + fn handle_error(err: anyhow::Error) { + log::trace!("Error: {}", err); + } +} + +pub struct IgnoreStrategy; +impl GossipDeserializerErrorStrategy for IgnoreStrategy { + fn handle_error(_: anyhow::Error) { + () + } +} diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs new file mode 100644 index 0000000..7c9ffa6 --- /dev/null +++ b/lib/src/gossip/handler.rs @@ -0,0 +1,73 @@ +//! Low-level module for gossip'ing code +//! +//! This module implements the low-level gossiping functionality that other modules use to +//! implement actual behaviours on +//! + +use std::sync::Arc; + +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; +use tokio::sync::RwLock; + +use crate::profile::Profile; +use crate::gossip::GossipMessage; + +#[derive(Debug)] +pub struct GossipHandler<Strategy = LogStrategy> + where Strategy: GossipHandlingStrategy + Sync + Send +{ + profile: Arc<Profile>, + strategy: std::marker::PhantomData<Strategy>, +} + +impl<Strat> GossipHandler<Strat> + where Strat: GossipHandlingStrategy + Sync + Send +{ + pub fn new(profile: Arc<Profile>) -> Self { + Self { + profile, + strategy: std::marker::PhantomData, + } + } + + pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)> + where S: Stream<Item = (ipfs::PeerId, GossipMessage)> + { + input.then(move |(source, msg)| { + let pr = self.profile.clone(); + async move { + log::trace!("Received gossip message from {}: {:?}", source, msg); + let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await; + (msg, res) + } + }) + } +} + +#[async_trait::async_trait] +pub trait GossipHandlingStrategy: Sync + Send { + async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>; +} + +pub struct LogStrategy; + +#[async_trait::async_trait] +impl GossipHandlingStrategy for LogStrategy { + async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> { + use std::convert::TryFrom; + use std::ops::Deref; + + match msg { + GossipMessage::CurrentProfileState { peer_id, cid } => { + let peer_id = ipfs::PeerId::from_bytes(peer_id); + let cid = cid::Cid::try_from(cid.deref()); + + log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid); + } + } + + Ok(()) + } +} diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs new file mode 100644 index 0000000..4f4d143 --- /dev/null +++ b/lib/src/gossip/mod.rs @@ -0,0 +1,6 @@ +mod msg; +pub use msg::GossipMessage; + +pub mod deserializer; +pub mod handler; + diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/gossip/msg.rs index 049fc68..8a6e6d2 100644 --- a/lib/src/reactor/gossip/msg.rs +++ b/lib/src/gossip/msg.rs @@ -1,6 +1,6 @@ use anyhow::Result; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum GossipMessage { CurrentProfileState { peer_id: Vec<u8>, @@ -9,7 +9,7 @@ pub enum GossipMessage { } impl GossipMessage { - pub(super) fn into_bytes(self) -> Result<Vec<u8>> { + pub(crate) fn into_bytes(self) -> Result<Vec<u8>> { serde_json::to_string(&self) .map(String::into_bytes) .map_err(anyhow::Error::from) diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b7b05e2..e836c37 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -4,4 +4,4 @@ pub mod ipfs_client; pub mod profile; pub mod stream; pub mod types; -pub mod reactor; +pub mod gossip; diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs index d1ab90f..3b1e063 100644 --- a/lib/src/profile/mod.rs +++ b/lib/src/profile/mod.rs @@ -169,6 +169,30 @@ impl Profile { pub fn add_device(&mut self, d: Device) -> Result<()> { self.state.add_device(d) } + + pub async fn gossip_own_state(&self, topic: String) -> Result<()> { + let cid = self.state + .profile_head() + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Profile has no HEAD yet"))? + .to_bytes(); + + let peer_id = self.client + .own_id() + .await? + .to_bytes(); + + self.client + .ipfs + .pubsub_publish(topic, { + crate::gossip::GossipMessage::CurrentProfileState { + peer_id, + cid, + }.into_bytes()? + }) + .await + .map_err(anyhow::Error::from) + } } diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs deleted file mode 100644 index 59913b5..0000000 --- a/lib/src/reactor/account.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Module for account handling (following accounts, caching account updates) using the gossip -//! module for the lower-level handling - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::GossipReactor; - -#[derive(Debug)] -pub struct AccountReactor(GossipReactor); - -impl AccountReactor { - pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { - unimplemented!() - } -} diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs deleted file mode 100644 index a32f1c3..0000000 --- a/lib/src/reactor/ctrl.rs +++ /dev/null @@ -1,15 +0,0 @@ -use tokio::sync::mpsc::UnboundedSender as Sender; -use tokio::sync::mpsc::UnboundedReceiver as Receiver; - -/// Type for sending messages to a reactor -pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>; - -/// Type that is used by a reactor for receiving messages -pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>; - -/// Type that represents the channel that has to be send with a request to a reactor for getting an -/// answer back -pub type ReplySender<Reply> = Sender<Reply>; - -pub type ReplyReceiver<Reply> = Receiver<Reply>; - diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs deleted file mode 100644 index 1014ca1..0000000 --- a/lib/src/reactor/device.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Module for multi-device support functionality, -//! which uses the gossip module for the lower-level handling - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::gossip::GossipReactor; - -#[derive(Debug)] -pub struct DeviceReactor(GossipReactor); - -impl DeviceReactor { - pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self { - unimplemented!() - } -} diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs deleted file mode 100644 index 68fbf06..0000000 --- a/lib/src/reactor/gossip/ctrl.rs +++ /dev/null @@ -1,19 +0,0 @@ -use anyhow::Result; - -#[derive(Debug)] -pub enum GossipRequest { - Exit, - Ping, - PublishMe, - Connect(ipfs::MultiaddrWithPeerId), -} - -#[derive(Debug)] -pub enum GossipReply { - Exiting, - Pong, - NoHead, - PublishMeResult(Result<()>), - ConnectResult(Result<()>), -} - diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs deleted file mode 100644 index 7658509..0000000 --- a/lib/src/reactor/gossip/mod.rs +++ /dev/null @@ -1,329 +0,0 @@ -//! Low-level module for gossip'ing code -//! -//! This module implements the low-level gossiping functionality that other modules use to -//! implement actual behaviours on -//! - -use std::sync::Arc; - -use anyhow::Result; -use tokio::sync::RwLock; - -use crate::profile::Profile; -use crate::reactor::Reactor; -use crate::reactor::ReactorBuilder; -use crate::reactor::ctrl::ReactorReceiver; -use crate::reactor::ctrl::ReactorSender; -use crate::reactor::ctrl::ReplySender; - -mod ctrl; -pub use ctrl::GossipRequest; -pub use ctrl::GossipReply; - -mod msg; -pub use msg::GossipMessage; - -mod strategy; -pub use strategy::GossipHandlingStrategy; -pub use strategy::LogStrategy; - -#[derive(Debug)] -pub struct GossipReactorBuilder { - profile: Arc<RwLock<Profile>>, - gossip_topic_name: String, -} - -impl GossipReactorBuilder { - pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self { - Self { profile, gossip_topic_name } - } -} - -impl ReactorBuilder for GossipReactorBuilder { - type Reactor = GossipReactor; - - fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor { - GossipReactor { - profile: self.profile, - gossip_topic_name: self.gossip_topic_name, - receiver: rr, - strategy: std::marker::PhantomData, - } - } -} - -pub struct GossipReactor<Strategy = LogStrategy> - where Strategy: GossipHandlingStrategy + Sync + Send -{ - profile: Arc<RwLock<Profile>>, - gossip_topic_name: String, - receiver: ReactorReceiver<GossipRequest, GossipReply>, - strategy: std::marker::PhantomData<Strategy>, -} - -impl<S> std::fmt::Debug for GossipReactor<S> - where S: GossipHandlingStrategy + Sync + Send -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name) - } -} - -impl<S> GossipReactor<S> - where S: GossipHandlingStrategy + Sync + Send -{ - fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> { - if let Err(_) = channel.send(reply) { |