summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-10 18:35:02 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-10 18:36:19 +0100
commit4ed7c5aa6279e74d21829003f0a901e5ff3945f7 (patch)
tree95693b07bdbacbadc3d025fbda0c61ae5530a6b9
parent88450e0d8c2ae0046998180469345c0ce7f3a5aa (diff)
Add simple reactor-stopping mechanism
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/gossip/mod.rs5
-rw-r--r--lib/src/reactor/mod.rs6
2 files changed, 10 insertions, 1 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
index 7056b26..9ee19e5 100644
--- a/lib/src/reactor/gossip/mod.rs
+++ b/lib/src/reactor/gossip/mod.rs
@@ -123,6 +123,7 @@ impl GossipReactor {
pub async fn run(mut self) -> Result<()> {
use futures::stream::StreamExt;
+ self.inner.set_running(true);
let mut subscription_stream = self.inner.profile()
.read()
.await
@@ -148,6 +149,10 @@ impl GossipReactor {
}
}
}
+
+ if !self.inner.running() {
+ break;
+ }
}
Ok(())
}
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
index c35cecc..1f8159d 100644
--- a/lib/src/reactor/mod.rs
+++ b/lib/src/reactor/mod.rs
@@ -43,11 +43,13 @@ pub enum ReactorReply<CustomReply: Debug + Send + Sync> {
///
/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and
/// keeping profile updates of other accounts, communication on the gossipsub topics... etc
-#[derive(Debug)]
+#[derive(Debug, getset::Getters, getset::Setters)]
pub(super) struct Reactor<CustomReactorRequest, CustomReactorReply>
where CustomReactorRequest: Debug + Send + Sync,
CustomReactorReply: Debug + Send + Sync
{
+ #[getset(get = "pub", set = "pub")]
+ running: bool,
profile: Arc<RwLock<Profile>>,
rx: ReactorReceiver<CustomReactorRequest, CustomReactorReply>,
}
@@ -59,6 +61,7 @@ impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, Cus
pub(super) fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<CustomReactorRequest, CustomReactorReply>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let reactor = Reactor {
+ running: true,
profile,
rx,
};
@@ -105,6 +108,7 @@ impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, Cus
},
(ReactorRequest::Exit, reply_channel) => {
+ self.running = false;
if let Err(_) = reply_channel.send(ReactorReply::Exiting) {
anyhow::bail!("Failed sending EXITING reply")
}