From ec205af0d61ad834bd3033219c2590a7d83fdc9b Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 10 Dec 2021 16:48:08 +0100 Subject: Add abstract reactor implementation Signed-off-by: Matthias Beyer --- lib/src/reactor/mod.rs | 67 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs index d55f5c6..b4aa597 100644 --- a/lib/src/reactor/mod.rs +++ b/lib/src/reactor/mod.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::fmt::Debug; use anyhow::Result; use tokio::sync::RwLock; @@ -8,21 +9,39 @@ use crate::profile::Profile; mod gossip; mod device; mod account; +mod ctrl; + +pub use ctrl::ReactorReceiver; +pub use ctrl::ReactorReply; +pub use ctrl::ReactorRequest; +pub use ctrl::ReactorSender; +pub use ctrl::ReplyChannel; /// Reactor type, for running the application logic /// /// The Reactor runs the whole application logic, that is syncing with other devices, fetching and /// keeping profile updates of other accounts, communication on the gossipsub topics... etc #[derive(Debug)] -pub struct Reactor { +pub(super) struct Reactor + where CustomReactorRequest: Debug + Send + Sync, + CustomReactorReply: Debug + Send + Sync +{ profile: Arc>, + rx: ReactorReceiver, } -impl Reactor { - pub fn new(profile: Profile) -> Self { - Reactor { - profile: Arc::new(RwLock::new(profile)), - } +impl Reactor + where CustomReactorRequest: Debug + Send + Sync, + CustomReactorReply: Debug + Send + Sync +{ + pub(super) fn new(profile: Arc>) -> (Self, ReactorSender) { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let reactor = Reactor { + profile, + rx, + }; + + (reactor, tx) } pub async fn head(&self) -> Option { @@ -47,15 +66,33 @@ impl Reactor { } } - /// Run the reactor - /// - /// Starts all inner functionality and exposes things - /// - /// # Return - /// - /// Return types are WIP, as this must return "running" objects that can be communicated with - pub async fn run(self) -> Result<()> { - unimplemented!() + pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest, ReplyChannel)> { + self.rx.recv().await + } + + /// Process the request if it is not a specialized request, + /// return the specialized request if it is one and cannot be processed by this reactor + /// implementation + pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest, ReplyChannel)) -> Result)>> { + match request { + (ReactorRequest::Ping, reply_channel) => { + if let Err(_) = reply_channel.send(ReactorReply::Pong) { + anyhow::bail!("Failed sending PONG reply") + } + Ok(None) + }, + + (ReactorRequest::Exit, reply_channel) => { + if let Err(_) = reply_channel.send(ReactorReply::Exiting) { + anyhow::bail!("Failed sending EXITING reply") + } + Ok(None) + }, + + (ReactorRequest::Custom(c), reply_channel) => { + Ok(Some((c, reply_channel))) + } + } } } -- cgit v1.2.3