summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-10 16:48:08 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-10 16:48:08 +0100
commitec205af0d61ad834bd3033219c2590a7d83fdc9b (patch)
tree170ef5f40e08b5c9c0d3e9320efa9c976bc06b47
parentea0f2c399320b7bf0f51ada34451bc21100dbafa (diff)
Add abstract reactor implementation
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/mod.rs67
1 files changed, 52 insertions, 15 deletions
diff --git a/lib/src/reactor/mod.rs b/lib/src/reactor/mod.rs
index d55f5c6..b4aa597 100644
--- a/lib/src/reactor/mod.rs
+++ b/lib/src/reactor/mod.rs
@@ -1,4 +1,5 @@
use std::sync::Arc;
+use std::fmt::Debug;
use anyhow::Result;
use tokio::sync::RwLock;
@@ -8,21 +9,39 @@ use crate::profile::Profile;
mod gossip;
mod device;
mod account;
+mod ctrl;
+
+pub use ctrl::ReactorReceiver;
+pub use ctrl::ReactorReply;
+pub use ctrl::ReactorRequest;
+pub use ctrl::ReactorSender;
+pub use ctrl::ReplyChannel;
/// Reactor type, for running the application logic
///
/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and
/// keeping profile updates of other accounts, communication on the gossipsub topics... etc
#[derive(Debug)]
-pub struct Reactor {
+pub(super) struct Reactor<CustomReactorRequest, CustomReactorReply>
+ where CustomReactorRequest: Debug + Send + Sync,
+ CustomReactorReply: Debug + Send + Sync
+{
profile: Arc<RwLock<Profile>>,
+ rx: ReactorReceiver<CustomReactorRequest, CustomReactorReply>,
}
-impl Reactor {
- pub fn new(profile: Profile) -> Self {
- Reactor {
- profile: Arc::new(RwLock::new(profile)),
- }
+impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, CustomReactorReply>
+ where CustomReactorRequest: Debug + Send + Sync,
+ CustomReactorReply: Debug + Send + Sync
+{
+ pub(super) fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<CustomReactorRequest, CustomReactorReply>) {
+ let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ let reactor = Reactor {
+ profile,
+ rx,
+ };
+
+ (reactor, tx)
}
pub async fn head(&self) -> Option<cid::Cid> {
@@ -47,15 +66,33 @@ impl Reactor {
}
}
- /// Run the reactor
- ///
- /// Starts all inner functionality and exposes things
- ///
- /// # Return
- ///
- /// Return types are WIP, as this must return "running" objects that can be communicated with
- pub async fn run(self) -> Result<()> {
- unimplemented!()
+ pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)> {
+ self.rx.recv().await
+ }
+
+ /// Process the request if it is not a specialized request,
+ /// return the specialized request if it is one and cannot be processed by this reactor
+ /// implementation
+ pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)) -> Result<Option<(CustomReactorRequest, ReplyChannel<CustomReactorReply>)>> {
+ match request {
+ (ReactorRequest::Ping, reply_channel) => {
+ if let Err(_) = reply_channel.send(ReactorReply::Pong) {
+ anyhow::bail!("Failed sending PONG reply")
+ }
+ Ok(None)
+ },
+
+ (ReactorRequest::Exit, reply_channel) => {
+ if let Err(_) = reply_channel.send(ReactorReply::Exiting) {
+ anyhow::bail!("Failed sending EXITING reply")
+ }
+ Ok(None)
+ },
+
+ (ReactorRequest::Custom(c), reply_channel) => {
+ Ok(Some((c, reply_channel)))
+ }
+ }
}
}