diff options
Diffstat (limited to 'crates/core/tedge_api/examples')
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 121 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/universal_log.rs | 123 |
2 files changed, 132 insertions, 112 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index a9305fde..454be2ea 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::FutureExt; @@ -9,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -294,11 +295,17 @@ impl Plugin for CriticalService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec<MessageType>, - receiver: Option<tedge_api::address::MessageReceiver>, - sender: tedge_api::address::MessageSender, + sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -309,13 +316,12 @@ struct Communication { impl Communication { pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -349,7 +355,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -418,59 +426,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); - - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -478,11 +482,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs index f2b3dcea..82ff3130 100644 --- a/crates/core/tedge_api/examples/universal_log.rs +++ b/crates/core/tedge_api/examples/universal_log.rs @@ -1,6 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; +use futures::FutureExt; use tedge_api::{ address::ReplySenderFor, message::{AnyMessage, MessageType}, @@ -8,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -229,6 +231,7 @@ impl Plugin for LogService { } } +// // The following pieces of code would be implemented by a "core" component, that is responsible for // setting up plugins and their communication. // @@ -238,11 +241,17 @@ impl Plugin for LogService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec<MessageType>, - receiver: Option<tedge_api::address::MessageReceiver>, - sender: tedge_api::address::MessageSender, + sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -253,13 +262,12 @@ struct Communication { impl Communication { pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -293,7 +301,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -356,59 +366,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; - - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -416,11 +422,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await |