diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-10 18:35:02 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-10 18:36:19 +0100 |
commit | 4ed7c5aa6279e74d21829003f0a901e5ff3945f7 (patch) | |
tree | 95693b07bdbacbadc3d025fbda0c61ae5530a6b9 | |
parent | 88450e0d8c2ae0046998180469345c0ce7f3a5aa (diff) |
Add simple reactor-stopping mechanism
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | lib/src/reactor/gossip/mod.rs | 5 | ||||
-rw-r--r-- | lib/src/reactor/mod.rs | 6 |
2 files changed, 10 insertions, 1 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 7056b26..9ee19e5 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -123,6 +123,7 @@ impl GossipReactor { pub async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; + self.inner.set_running(true); let mut subscription_stream = self.inner.profile() .read() .await @@ -148,6 +149,10 @@ impl GossipReactor { } } } + + if !self.inner.running() { + break; + } } Ok(()) } diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index c35cecc..1f8159d 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -43,11 +43,13 @@ pub enum ReactorReply<CustomReply: Debug + Send + Sync> { /// /// The Reactor runs the whole application logic, that is syncing with other devices, fetching and /// keeping profile updates of other accounts, communication on the gossipsub topics... etc -#[derive(Debug)] +#[derive(Debug, getset::Getters, getset::Setters)] pub(super) struct Reactor<CustomReactorRequest, CustomReactorReply> where CustomReactorRequest: Debug + Send + Sync, CustomReactorReply: Debug + Send + Sync { + #[getset(get = "pub", set = "pub")] + running: bool, profile: Arc<RwLock<Profile>>, rx: ReactorReceiver<CustomReactorRequest, CustomReactorReply>, } @@ -59,6 +61,7 @@ impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, Cus pub(super) fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<CustomReactorRequest, CustomReactorReply>) { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let reactor = Reactor { + running: true, profile, rx, }; @@ -105,6 +108,7 @@ impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, Cus }, (ReactorRequest::Exit, reply_channel) => { + self.running = false; if let Err(_) = reply_channel.send(ReactorReply::Exiting) { anyhow::bail!("Failed sending EXITING reply") } |