summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-20 09:29:35 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-20 09:29:35 +0100
commita23b897b5c3c9ee721b793e26401d1863d97f84e (patch)
treed22a4fb5735bc72704f2622a4748ec3ba5c6189e
parent860177170ce583be7c7d86d8d81fdc6a7c402dc4 (diff)
parent5a58021581187d11fb26d0de9d19514e6383956f (diff)
Merge branch 'gossipping-gui'
-rw-r--r--gui/src/app/message.rs44
-rw-r--r--gui/src/app/mod.rs262
-rw-r--r--gui/src/gossip.rs52
-rw-r--r--gui/src/main.rs1
-rw-r--r--lib/src/client.rs4
-rw-r--r--lib/src/gossip/deserializer.rs57
-rw-r--r--lib/src/gossip/handler.rs73
-rw-r--r--lib/src/gossip/mod.rs6
-rw-r--r--lib/src/gossip/msg.rs (renamed from lib/src/reactor/gossip/msg.rs)4
-rw-r--r--lib/src/lib.rs2
-rw-r--r--lib/src/profile/mod.rs24
-rw-r--r--lib/src/reactor/account.rs19
-rw-r--r--lib/src/reactor/ctrl.rs15
-rw-r--r--lib/src/reactor/device.rs19
-rw-r--r--lib/src/reactor/gossip/ctrl.rs19
-rw-r--r--lib/src/reactor/gossip/mod.rs329
-rw-r--r--lib/src/reactor/gossip/strategy.rs31
-rw-r--r--lib/src/reactor/mod.rs28
18 files changed, 475 insertions, 514 deletions
diff --git a/gui/src/app/message.rs b/gui/src/app/message.rs
index f530774..dcb7477 100644
--- a/gui/src/app/message.rs
+++ b/gui/src/app/message.rs
@@ -2,14 +2,26 @@ use std::sync::Arc;
use cid::Cid;
+use distrox_lib::gossip::GossipMessage;
use distrox_lib::profile::Profile;
use distrox_lib::types::Payload;
-#[derive(Debug, Clone)]
+use crate::gossip::GossipRecipe;
+
+#[derive(Clone, Debug)]
pub enum Message {
Loaded(Arc<Profile>),
FailedToLoad(String),
+ ToggleLog,
+
+ GossipSubscriptionFailed(String),
+ GossipHandled(GossipMessage),
+
+ PublishGossipAboutMe,
+ OwnStateGossipped,
+ GossippingFailed(String),
+
InputChanged(String),
CreatePost,
@@ -21,3 +33,33 @@ pub enum Message {
TimelineScrolled(f32),
}
+
+impl Message {
+ pub fn description(&self) -> &'static str {
+ match self {
+ Message::Loaded(_) => "Loaded",
+ Message::FailedToLoad(_) => "FailedToLoad",
+
+ Message::ToggleLog => "ToggleLog",
+
+ Message::GossipSubscriptionFailed(_) => "GossipSubscriptionFailed",
+ Message::GossipHandled(_) => "GossipHandled",
+
+ Message::PublishGossipAboutMe => "PublishGossipAboutMe",
+ Message::OwnStateGossipped => "OwnStateGossipped",
+ Message::GossippingFailed(_) => "GossippingFailed",
+
+ Message::InputChanged(_) => "InputChanged",
+ Message::CreatePost => "CreatePost",
+
+ Message::PostCreated(_) => "PostCreated",
+ Message::PostCreationFailed(_) => "PostCreationFailed",
+
+ Message::PostLoaded(_) => "PostLoaded",
+ Message::PostLoadingFailed => "PostLoadingFailed",
+
+ Message::TimelineScrolled(_) => "TimelineScrolled",
+ }
+ }
+}
+
diff --git a/gui/src/app/mod.rs b/gui/src/app/mod.rs
index 8fefdbc..77f6c9d 100644
--- a/gui/src/app/mod.rs
+++ b/gui/src/app/mod.rs
@@ -1,10 +1,12 @@
use std::sync::Arc;
+use std::sync::RwLock as StdRwLock;
use anyhow::Result;
use iced::Application;
use iced::Column;
use iced::Container;
use iced::Length;
+use iced::Row;
use iced::Scrollable;
use iced::TextInput;
use iced::scrollable;
@@ -17,16 +19,24 @@ use crate::timeline::PostLoadingRecipe;
mod message;
pub use message::Message;
+use crate::gossip::GossipRecipe;
+
#[derive(Debug)]
enum Distrox {
- Loading,
+ Loading {
+ gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>,
+ },
Loaded {
profile: Arc<Profile>,
+ gossip_subscription_recv: StdRwLock<tokio::sync::oneshot::Receiver<GossipRecipe>>,
scroll: scrollable::State,
input: text_input::State,
input_value: String,
timeline: Timeline,
+
+ log_visible: bool,
+ log: std::collections::VecDeque<String>,
},
FailedToStart,
}
@@ -37,15 +47,33 @@ impl Application for Distrox {
type Flags = String;
fn new(name: String) -> (Self, iced::Command<Self::Message>) {
+ let (gossip_subscription_sender, gossip_subscription_recv) = tokio::sync::oneshot::channel();
(
- Distrox::Loading,
+ Distrox::Loading {
+ gossip_subscription_recv: StdRwLock::new(gossip_subscription_recv),
+ },
+
iced::Command::perform(async move {
- match Profile::load(&name).await {
- Err(e) => Message::FailedToLoad(e.to_string()),
- Ok(instance) => {
- Message::Loaded(Arc::new(instance))
- }
+ let profile = match Profile::load(&name).await {
+ Err(e) => return Message::FailedToLoad(e.to_string()),
+ Ok(instance) => Arc::new(instance),
+ };
+
+ if let Err(e) = profile.client()
+ .pubsub_subscribe("distrox".to_string())
+ .await
+ .map_err(anyhow::Error::from)
+ .map(|stream| {
+ log::trace!("Subscription to 'distrox' pubsub channel worked");
+ GossipRecipe::new(profile.clone(), stream)
+ })
+ .and_then(|s| gossip_subscription_sender.send(s).map_err(|_| anyhow::anyhow!("Failed to initialize gossipping module")))
+ {
+ log::error!("Failed to load gossip recipe");
+ return Message::FailedToLoad(e.to_string())
}
+
+ Message::Loaded(profile)
}, |m: Message| -> Message { m })
)
}
@@ -55,33 +83,33 @@ impl Application for Distrox {
}
fn update(&mut self, message: Self::Message) -> iced::Command<Self::Message> {
+ log::trace!("Received message: {}", message.description());
match self {
- Distrox::Loading => {
- match message {
- Message::Loaded(profile) => {
- *self = Distrox::Loaded {
- profile,
- scroll: scrollable::State::default(),
- input: text_input::State::default(),
- input_value: String::default(),
- timeline: Timeline::new(),
- };
- }
+ Distrox::Loading { gossip_subscription_recv } => {
+ if let Message::Loaded(profile) = message {
+ *self = Distrox::Loaded {
+ profile,
+
+ // Don't even try to think what hoops I am jumping through here...
+ gossip_subscription_recv: std::mem::replace(gossip_subscription_recv, StdRwLock::new(tokio::sync::oneshot::channel().1)),
+ scroll: scrollable::State::default(),
+ input: text_input::State::default(),
+ input_value: String::default(),
+ timeline: Timeline::new(),
+ log_visible: false,
+ log: std::collections::VecDeque::with_capacity(1000),
+ };
- Message::FailedToLoad(e) => {
- log::error!("Failed to load: {}", e);
- *self = Distrox::FailedToStart;
- }
-
- _ => {}
}
- }
+ iced::Command::none()
+ },
- Distrox::Loaded { profile, ref mut input_value, timeline, .. } => {
+ Distrox::Loaded { profile, ref mut input_value, timeline, log_visible, log, .. } => {
match message {
Message::InputChanged(input) => {
*input_value = input;
+ iced::Command::none()
}
Message::CreatePost => {
@@ -96,32 +124,84 @@ impl Application for Distrox {
|res| match res {
Ok(cid) => Message::PostCreated(cid),
Err(e) => Message::PostCreationFailed(e.to_string())
- });
+ })
+ } else {
+ iced::Command::none()
}
}
Message::PostCreated(cid) => {
*input_value = String::new();
log::info!("Post created: {}", cid);
+ iced::Command::none()
}
Message::PostCreationFailed(err) => {
log::error!("Post creation failed: {}", err);
+ iced::Command::none()
}
Message::PostLoaded((payload, content)) => {
timeline.push(payload, content);
+ iced::Command::none()
}
Message::PostLoadingFailed => {
log::error!("Failed to load some post, TODO: Better error logging");
+ iced::Command::none()
}
Message::TimelineScrolled(f) => {
log::trace!("Timeline scrolled: {}", f);
+ iced::Command::none()
+ }
+
+ Message::ToggleLog => {
+ log::trace!("Log toggled");
+ *log_visible = !*log_visible;
+ iced::Command::none()
}
- _ => {}
+ Message::GossipHandled(msg) => {
+ use distrox_lib::gossip::GossipMessage;
+
+ log::trace!("Gossip handled, adding to log: {:?}", msg);
+ let msg = match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ format!("Peer {:?} is at {:?}", peer_id, cid)
+ }
+ };
+ log.push_back(msg);
+ while log.len() > 1000 {
+ let _ = log.pop_front();
+ }
+ iced::Command::none()
+ }
+
+ Message::PublishGossipAboutMe => {
+ let profile = profile.clone();
+ iced::Command::perform(async move {
+ if let Err(e) = profile.gossip_own_state("distrox".to_string()).await {
+ Message::GossippingFailed(e.to_string())
+ } else {
+ Message::OwnStateGossipped
+ }
+ }, |m: Message| -> Message { m })
+ }
+
+ Message::OwnStateGossipped => {
+ log::trace!("Gossipped own state");
+ log.push_back("Gossipped own state".to_string());
+ iced::Command::none()
+ }
+
+ Message::GossippingFailed(e) => {
+ log::trace!("Gossipped failed: {}", e);
+ log.push_back(format!("Gossipped failed: {}", e));
+ iced::Command::none()
+ }
+
+ _ => iced::Command::none(),
}
}
@@ -129,43 +209,79 @@ impl Application for Distrox {
unimplemented!()
}
}
- iced::Command::none()
}
fn view(&mut self) -> iced::Element<Self::Message> {
match self {
- Distrox::Loading => {
+ Distrox::Loading { .. } => {
let text = iced::Text::new("Loading");
let content = Column::new()
- .max_width(800)
.spacing(20)
.push(text);
Container::new(content)
.width(Length::Fill)
+ .height(Length::Fill)
.center_x()
+ .center_y()
.into()
}
- Distrox::Loaded { input, input_value, timeline, scroll, .. } => {
- let input = TextInput::new(
- input,
- "What do you want to tell the world?",
- input_value,
- Message::InputChanged,
- )
- .padding(15)
- .size(12)
- .on_submit(Message::CreatePost);
-
- let timeline = timeline.view();
-
- Scrollable::new(scroll)
- .padding(40)
- .push(input)
- .push(timeline)
- .into()
+ Distrox::Loaded { input, input_value, timeline, scroll, log_visible, log, .. } => {
+ let left_column = Column::new()
+ .into();
+
+ let mid_column = Column::new()
+ .push({
+ let input = TextInput::new(
+ input,
+ "What do you want to tell the world?",
+ input_value,
+ Message::InputChanged,
+ )
+ .padding(15)
+ .size(12)
+ .on_submit(Message::CreatePost);
+
+ let timeline = timeline.view();
+
+ Scrollable::new(scroll)
+ .padding(40)
+ .push(input)
+ .push(timeline)
+ })
+ .into();
+
+ let right_column = Column::new()
+ .into();
+
+ let content = Row::with_children(vec![
+ left_column,
+ mid_column,
+ right_column
+ ])
+ .spacing(20)
+ .height(Length::Fill)
+ .width(Length::Fill);
+
+ let content = Column::new()
+ .height(Length::Fill)
+ .width(Length::Fill)
+ .push(content);
+
+ if *log_visible {
+ let log = Column::with_children({
+ log.iter()
+ .map(iced::Text::new)
+ .map(|txt| txt.size(8))
+ .map(iced::Element::from)
+ .collect()
+ });
+ content.push(log)
+ } else {
+ content
+ }.into()
}
Distrox::FailedToStart => {
@@ -175,7 +291,7 @@ impl Application for Distrox {
}
fn subscription(&self) -> iced::Subscription<Self::Message> {
- match self {
+ let post_loading_subs = match self {
Distrox::Loaded { profile, .. } => {
let head = profile.head();
@@ -189,7 +305,53 @@ impl Application for Distrox {
}
}
_ => iced::Subscription::none(),
+ };
+
+ let keyboard_subs = {
+ use iced_native::event::Event;
+
+ iced_native::subscription::events_with(|event, _| {
+ match event {
+ Event::Keyboard(iced_native::keyboard::Event::KeyPressed { key_code, .. }) => {
+ if key_code == iced_native::keyboard::KeyCode::F11 {
+ Some(Message::ToggleLog)
+ } else {
+ None
+ }
+ },
+ _ => None,
+ }
+ })
+ };
+
+ let gossip_sub = match self {
+ Distrox::Loaded { gossip_subscription_recv, .. } => {
+ match gossip_subscription_recv.write().ok() {
+ Some(mut sub) => sub.try_recv()
+ .ok() // Either empty or closed, ignore both
+ .map(|sub| iced::Subscription::from_recipe(sub)),
+ None => None
+ }
+ },
+ _ => None,
+ };
+
+ let gossip_sending_sub = {
+ iced::time::every(std::time::Duration::from_millis(100))
+ .map(|_| Message::PublishGossipAboutMe)
+ };
+
+ let mut subscriptions = vec![
+ post_loading_subs,
+ keyboard_subs,
+ gossip_sending_sub,
+ ];
+
+ if let Some(gossip_sub) = gossip_sub {
+ subscriptions.push(gossip_sub);
}
+
+ iced::Subscription::batch(subscriptions)
}
}
diff --git a/gui/src/gossip.rs b/gui/src/gossip.rs
new file mode 100644
index 0000000..ac6bab0
--- /dev/null
+++ b/gui/src/gossip.rs
@@ -0,0 +1,52 @@
+use std::sync::Arc;
+
+use futures::StreamExt;
+
+use distrox_lib::profile::Profile;
+use distrox_lib::client::Client;
+
+use crate::app::Message;
+
+#[derive(Clone, Debug)]
+pub struct GossipRecipe {
+ profile: Arc<Profile>,
+ subscription: Arc<ipfs::SubscriptionStream>,
+}
+
+impl GossipRecipe {
+ pub fn new(profile: Arc<Profile>, subscription: ipfs::SubscriptionStream) -> Self {
+ Self { profile, subscription: Arc::new(subscription) }
+ }
+}
+
+
+// Make sure iced can use our download stream
+impl<H, I> iced_native::subscription::Recipe<H, I> for GossipRecipe
+where
+ H: std::hash::Hasher,
+{
+ type Output = Message;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+ struct Marker;
+ std::any::TypeId::of::<Marker>().hash(state);
+ }
+
+ fn stream(self: Box<Self>, _input: futures::stream::BoxStream<'static, I>) -> futures::stream::BoxStream<'static, Self::Output> {
+ use distrox_lib::gossip::deserializer;
+ use distrox_lib::gossip::handler;
+
+ // TODO: Do "right", whatever this means...
+ let stream = Arc::try_unwrap(self.subscription).unwrap();
+
+ Box::pin({
+ let stream = deserializer::GossipDeserializer::<deserializer::LogStrategy>::new().run(stream);
+ let stream = handler::GossipHandler::<handler::LogStrategy>::new(self.profile.clone()).run(stream);
+
+ stream.map(|(gossip_message, _handling_result)| {
+ Message::GossipHandled(gossip_message)
+ })
+ })
+ }
+}
diff --git a/gui/src/main.rs b/gui/src/main.rs
index 6120152..486f007 100644
--- a/gui/src/main.rs
+++ b/gui/src/main.rs
@@ -4,6 +4,7 @@ mod app;
mod cli;
mod timeline;
mod post;
+mod gossip;
fn main() -> Result<()> {
let _ = env_logger::try_init()?;
diff --git a/lib/src/client.rs b/lib/src/client.rs
index d985769..2128f76 100644
--- a/lib/src/client.rs
+++ b/lib/src/client.rs
@@ -122,6 +122,10 @@ impl Client {
self.get::<S>(cid).await.map(|v| v.0)
}
+
+ pub async fn pubsub_subscribe(&self, topic: String) -> Result<ipfs::SubscriptionStream> {
+ self.ipfs.pubsub_subscribe(topic).await.map_err(anyhow::Error::from)
+ }
}
fn now() -> DateTime {
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs
new file mode 100644
index 0000000..dcd5082
--- /dev/null
+++ b/lib/src/gossip/deserializer.rs
@@ -0,0 +1,57 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+
+use crate::gossip::GossipMessage;
+
+pub struct GossipDeserializer<ErrStrategy = LogStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ strategy: std::marker::PhantomData<ErrStrategy>,
+}
+
+impl<ErrStrategy> GossipDeserializer<ErrStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ pub fn new() -> Self {
+ Self {
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)>
+ where S: Stream<Item = Arc<ipfs::PubsubMessage>>
+ {
+ input.filter_map(|message| async move {
+ log::trace!("Received gossip message");
+
+ match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) {
+ Ok(m) => Some((message.source, m)),
+ Err(e) => {
+ ErrStrategy::handle_error(e);
+ None
+ }
+ }
+ })
+ }
+}
+
+pub trait GossipDeserializerErrorStrategy {
+ fn handle_error(err: anyhow::Error);
+}
+
+pub struct LogStrategy;
+impl GossipDeserializerErrorStrategy for LogStrategy {
+ fn handle_error(err: anyhow::Error) {
+ log::trace!("Error: {}", err);
+ }
+}
+
+pub struct IgnoreStrategy;
+impl GossipDeserializerErrorStrategy for IgnoreStrategy {
+ fn handle_error(_: anyhow::Error) {
+ ()
+ }
+}
diff --git a/lib/src/gossip/handler.rs b/lib/src/gossip/handler.rs
new file mode 100644
index 0000000..7c9ffa6
--- /dev/null
+++ b/lib/src/gossip/handler.rs
@@ -0,0 +1,73 @@
+//! Low-level module for gossip'ing code
+//!
+//! This module implements the low-level gossiping functionality that other modules use to
+//! implement actual behaviours on
+//!
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+use tokio::sync::RwLock;
+
+use crate::profile::Profile;
+use crate::gossip::GossipMessage;
+
+#[derive(Debug)]
+pub struct GossipHandler<Strategy = LogStrategy>
+ where Strategy: GossipHandlingStrategy + Sync + Send
+{
+ profile: Arc<Profile>,
+ strategy: std::marker::PhantomData<Strategy>,
+}
+
+impl<Strat> GossipHandler<Strat>
+ where Strat: GossipHandlingStrategy + Sync + Send
+{
+ pub fn new(profile: Arc<Profile>) -> Self {
+ Self {
+ profile,
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(self, input: S) -> impl Stream<Item = (GossipMessage, Result<()>)>
+ where S: Stream<Item = (ipfs::PeerId, GossipMessage)>
+ {
+ input.then(move |(source, msg)| {
+ let pr = self.profile.clone();
+ async move {
+ log::trace!("Received gossip message from {}: {:?}", source, msg);
+ let res = Strat::handle_gossip_message(pr.clone(), &source, &msg).await;
+ (msg, res)
+ }
+ })
+ }
+}
+
+#[async_trait::async_trait]
+pub trait GossipHandlingStrategy: Sync + Send {
+ async fn handle_gossip_message(profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()>;
+}
+
+pub struct LogStrategy;
+
+#[async_trait::async_trait]
+impl GossipHandlingStrategy for LogStrategy {
+ async fn handle_gossip_message(_profile: Arc<Profile>, source: &ipfs::PeerId, msg: &GossipMessage) -> Result<()> {
+ use std::convert::TryFrom;
+ use std::ops::Deref;
+
+ match msg {
+ GossipMessage::CurrentProfileState { peer_id, cid } => {
+ let peer_id = ipfs::PeerId::from_bytes(peer_id);
+ let cid = cid::Cid::try_from(cid.deref());
+
+ log::trace!("{:?} told me that {:?} is at {:?}", source, peer_id, cid);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/lib/src/gossip/mod.rs b/lib/src/gossip/mod.rs
new file mode 100644
index 0000000..4f4d143
--- /dev/null
+++ b/lib/src/gossip/mod.rs
@@ -0,0 +1,6 @@
+mod msg;
+pub use msg::GossipMessage;
+
+pub mod deserializer;
+pub mod handler;
+
diff --git a/lib/src/reactor/gossip/msg.rs b/lib/src/gossip/msg.rs
index 049fc68..8a6e6d2 100644
--- a/lib/src/reactor/gossip/msg.rs
+++ b/lib/src/gossip/msg.rs
@@ -1,6 +1,6 @@
use anyhow::Result;
-#[derive(Debug, serde::Serialize, serde::Deserialize)]
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum GossipMessage {
CurrentProfileState {
peer_id: Vec<u8>,
@@ -9,7 +9,7 @@ pub enum GossipMessage {
}
impl GossipMessage {
- pub(super) fn into_bytes(self) -> Result<Vec<u8>> {
+ pub(crate) fn into_bytes(self) -> Result<Vec<u8>> {
serde_json::to_string(&self)
.map(String::into_bytes)
.map_err(anyhow::Error::from)
diff --git a/lib/src/lib.rs b/lib/src/lib.rs
index b7b05e2..e836c37 100644
--- a/lib/src/lib.rs
+++ b/lib/src/lib.rs
@@ -4,4 +4,4 @@ pub mod ipfs_client;
pub mod profile;
pub mod stream;
pub mod types;
-pub mod reactor;
+pub mod gossip;
diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs
index d1ab90f..3b1e063 100644
--- a/lib/src/profile/mod.rs
+++ b/lib/src/profile/mod.rs
@@ -169,6 +169,30 @@ impl Profile {
pub fn add_device(&mut self, d: Device) -> Result<()> {
self.state.add_device(d)
}
+
+ pub async fn gossip_own_state(&self, topic: String) -> Result<()> {
+ let cid = self.state
+ .profile_head()
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Profile has no HEAD yet"))?
+ .to_bytes();
+
+ let peer_id = self.client
+ .own_id()
+ .await?
+ .to_bytes();
+
+ self.client
+ .ipfs
+ .pubsub_publish(topic, {
+ crate::gossip::GossipMessage::CurrentProfileState {
+ peer_id,
+ cid,
+ }.into_bytes()?
+ })
+ .await
+ .map_err(anyhow::Error::from)
+ }
}
diff --git a/lib/src/reactor/account.rs b/lib/src/reactor/account.rs
deleted file mode 100644
index 59913b5..0000000
--- a/lib/src/reactor/account.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for account handling (following accounts, caching account updates) using the gossip
-//! module for the lower-level handling
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-
-#[derive(Debug)]
-pub struct AccountReactor(GossipReactor);
-
-impl AccountReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
-}
diff --git a/lib/src/reactor/ctrl.rs b/lib/src/reactor/ctrl.rs
deleted file mode 100644
index a32f1c3..0000000
--- a/lib/src/reactor/ctrl.rs
+++ /dev/null
@@ -1,15 +0,0 @@
-use tokio::sync::mpsc::UnboundedSender as Sender;
-use tokio::sync::mpsc::UnboundedReceiver as Receiver;
-
-/// Type for sending messages to a reactor
-pub type ReactorSender<Request, Reply> = Sender<(Request, ReplySender<Reply>)>;
-
-/// Type that is used by a reactor for receiving messages
-pub type ReactorReceiver<Request, Reply> = Receiver<(Request, ReplySender<Reply>)>;
-
-/// Type that represents the channel that has to be send with a request to a reactor for getting an
-/// answer back
-pub type ReplySender<Reply> = Sender<Reply>;
-
-pub type ReplyReceiver<Reply> = Receiver<Reply>;
-
diff --git a/lib/src/reactor/device.rs b/lib/src/reactor/device.rs
deleted file mode 100644
index 1014ca1..0000000
--- a/lib/src/reactor/device.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-//! Module for multi-device support functionality,
-//! which uses the gossip module for the lower-level handling
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::gossip::GossipReactor;
-
-#[derive(Debug)]
-pub struct DeviceReactor(GossipReactor);
-
-impl DeviceReactor {
- pub(super) fn new(profile: Arc<RwLock<Profile>>) -> Self {
- unimplemented!()
- }
-}
diff --git a/lib/src/reactor/gossip/ctrl.rs b/lib/src/reactor/gossip/ctrl.rs
deleted file mode 100644
index 68fbf06..0000000
--- a/lib/src/reactor/gossip/ctrl.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-use anyhow::Result;
-
-#[derive(Debug)]
-pub enum GossipRequest {
- Exit,
- Ping,
- PublishMe,
- Connect(ipfs::MultiaddrWithPeerId),
-}
-
-#[derive(Debug)]
-pub enum GossipReply {
- Exiting,
- Pong,
- NoHead,
- PublishMeResult(Result<()>),
- ConnectResult(Result<()>),
-}
-
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
deleted file mode 100644
index 7658509..0000000
--- a/lib/src/reactor/gossip/mod.rs
+++ /dev/null
@@ -1,329 +0,0 @@
-//! Low-level module for gossip'ing code
-//!
-//! This module implements the low-level gossiping functionality that other modules use to
-//! implement actual behaviours on
-//!
-
-use std::sync::Arc;
-
-use anyhow::Result;
-use tokio::sync::RwLock;
-
-use crate::profile::Profile;
-use crate::reactor::Reactor;
-use crate::reactor::ReactorBuilder;
-use crate::reactor::ctrl::ReactorReceiver;
-use crate::reactor::ctrl::ReactorSender;
-use crate::reactor::ctrl::ReplySender;
-
-mod ctrl;
-pub use ctrl::GossipRequest;
-pub use ctrl::GossipReply;
-
-mod msg;
-pub use msg::GossipMessage;
-
-mod strategy;
-pub use strategy::GossipHandlingStrategy;
-pub use strategy::LogStrategy;
-
-#[derive(Debug)]
-pub struct GossipReactorBuilder {
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
-}
-
-impl GossipReactorBuilder {
- pub fn new(profile: Arc<RwLock<Profile>>, gossip_topic_name: String) -> Self {
- Self { profile, gossip_topic_name }
- }
-}
-
-impl ReactorBuilder for GossipReactorBuilder {
- type Reactor = GossipReactor;
-
- fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
- GossipReactor {
- profile: self.profile,
- gossip_topic_name: self.gossip_topic_name,
- receiver: rr,
- strategy: std::marker::PhantomData,
- }
- }
-}
-
-pub struct GossipReactor<Strategy = LogStrategy>
- where Strategy: GossipHandlingStrategy + Sync + Send
-{
- profile: Arc<RwLock<Profile>>,
- gossip_topic_name: String,
- receiver: ReactorReceiver<GossipRequest, GossipReply>,
- strategy: std::marker::PhantomData<Strategy>,
-}
-
-impl<S> std::fmt::Debug for GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GossipReactor {{ topic: '{}' }}", self.gossip_topic_name)
- }
-}
-
-impl<S> GossipReactor<S>
- where S: GossipHandlingStrategy + Sync + Send
-{
- fn send_gossip_reply(channel: ReplySender<GossipReply>, reply: GossipReply) -> Result<()> {
- if let Err(_) = channel.send(reply) {