From abab64c856c0c299b13fcc1857a5b78f449e4a9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=BCller?= Date: Fri, 29 Apr 2022 09:30:10 +0200 Subject: Add AnyMessage & AnyMessages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds the ability to send any kind of messages to plugins that declare handling `AnyMessage` + `AnyMessages`. It does it with these changes: - Remove the `Message::Reply` associated type - This allows `Message` to be used as a trait object - Add the `MessageType` which allows to identify messages - This permits `AnyMessage` to be received by plugins Signed-off-by: Marcel Müller --- crates/core/tedge_api/examples/heartbeat.rs | 46 +-- crates/core/tedge_api/examples/universal_log.rs | 430 ++++++++++++++++++++++++ 2 files changed, 447 insertions(+), 29 deletions(-) create mode 100644 crates/core/tedge_api/examples/universal_log.rs (limited to 'crates/core/tedge_api/examples') diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 95e6ca15..37e9c8ec 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,15 +1,11 @@ -use std::{ - any::TypeId, - collections::{HashMap, HashSet}, - time::Duration, -}; +use std::{collections::HashMap, time::Duration}; use async_trait::async_trait; use futures::FutureExt; use tedge_api::{ - address::ReplySender, - message::NoReply, - plugin::{BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt}, + address::ReplySenderFor, + message::MessageType, + plugin::{AcceptsReplies, BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt}, Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; @@ -17,7 +13,8 @@ use tedge_api::{ /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] struct Heartbeat; -impl Message for Heartbeat { +impl Message for Heartbeat {} +impl AcceptsReplies for Heartbeat { type Reply = HeartbeatStatus; } @@ -27,9 +24,7 @@ enum HeartbeatStatus { Alive, Degraded, } -impl Message for HeartbeatStatus { - type Reply = NoReply; -} +impl Message for HeartbeatStatus {} /// A PluginBuilder that gets used to build a HeartbeatService plugin instance #[derive(Debug)] @@ -253,7 +248,7 @@ impl Handle for CriticalService { async fn handle_message( &self, _message: Heartbeat, - sender: ReplySender, + sender: ReplySenderFor, ) -> Result<(), PluginError> { println!("CriticalService: Received Heartbeat!"); let mut status = self.status.lock().await; @@ -301,23 +296,13 @@ impl Plugin for CriticalService { /// Helper type for keeping information about plugins during runtime #[derive(Debug)] struct PluginInfo { - types: HashSet<(&'static str, TypeId)>, + types: Vec, receiver: Option, sender: tedge_api::address::MessageSender, } -impl Clone for PluginInfo { - fn clone(&self) -> Self { - PluginInfo { - types: self.types.clone(), - receiver: None, - sender: self.sender.clone(), - } - } -} - /// The type that provides the communication infrastructure to the plugins. -#[derive(Clone, Debug)] +#[derive(Debug)] struct Communication { plugins: HashMap, } @@ -328,7 +313,7 @@ impl Communication { self.plugins.insert( name.to_owned(), PluginInfo { - types: PB::kind_message_types().into(), + types: PB::kind_message_types().into_types(), sender, receiver: Some(receiver), }, @@ -341,7 +326,7 @@ impl PluginDirectory for Communication { &self, name: &str, ) -> Result, tedge_api::error::DirectoryError> { - let types = MB::get_ids().into_iter().collect(); + let asked_types: Vec<_> = MB::get_ids().into_iter().collect(); let plug = self.plugins.get(name).unwrap_or_else(|| { // This is an example, so we panic!() here. @@ -353,12 +338,15 @@ impl PluginDirectory for Communication { ) }); - if !plug.types.is_superset(&types) { + if !asked_types + .iter() + .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type))) + { // This is an example, so we panic!() here // In real-world, we would do some reporting and return an error panic!( "Asked for {:#?} but plugin {} only has types {:#?}", - types, name, plug.types, + asked_types, name, plug.types, ); } else { Ok(Address::new(plug.sender.clone())) diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs new file mode 100644 index 00000000..f2b3dcea --- /dev/null +++ b/crates/core/tedge_api/examples/universal_log.rs @@ -0,0 +1,430 @@ +use std::{collections::HashMap, time::Duration}; + +use async_trait::async_trait; +use tedge_api::{ + address::ReplySenderFor, + message::{AnyMessage, MessageType}, + plugin::{AnyMessages, BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt}, + Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, + PluginError, +}; + +/// A message that represents a heartbeat that gets sent to plugins +#[derive(Debug)] +struct Heartbeat; +impl Message for Heartbeat {} + +#[derive(Debug)] +struct RandomData; +impl Message for RandomData {} + +/// A PluginBuilder that gets used to build a HeartbeatService plugin instance +#[derive(Debug)] +struct HeartbeatServiceBuilder; + +#[derive(miette::Diagnostic, thiserror::Error, Debug)] +enum HeartbeatBuildError { + #[error(transparent)] + TomlParse(#[from] toml::de::Error), +} + +#[async_trait] +impl PluginBuilder for HeartbeatServiceBuilder { + fn kind_name() -> &'static str { + todo!() + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + HeartbeatService::get_handled_types() + } + + async fn verify_configuration( + &self, + _config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + Ok(()) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result + where + PD: 'async_trait, + { + let hb_config: HeartbeatConfig = + toml::Value::try_into(config).map_err(HeartbeatBuildError::from)?; + let monitored_services = hb_config + .plugins + .iter() + .map(|name| { + plugin_dir + .get_address_for::(name) + .map(|addr| (name.clone(), addr)) + }) + .collect::, _>>()?; + Ok(HeartbeatService::new( + Duration::from_millis(hb_config.interval), + monitored_services, + cancellation_token, + ) + .finish()) + } +} + +/// The configuration a HeartbeatServices can receive is represented by this type +#[derive(serde::Deserialize, Debug)] +struct HeartbeatConfig { + interval: u64, + plugins: Vec, +} + +/// The HeartbeatService type represents the actual plugin +struct HeartbeatService { + interval_duration: Duration, + monitored_services: Vec<(String, Address)>, + cancel_token: CancellationToken, +} + +impl PluginDeclaration for HeartbeatService { + type HandledMessages = (); +} + +#[async_trait] +impl Plugin for HeartbeatService { + /// The setup function of the HeartbeatService can be used by the plugin author to setup for + /// example a connection to an external service. In this example, it is simply used to send the + /// heartbeat + /// + /// Because this example is _simple_, we do not spawn a background task that periodically sends + /// the heartbeat. In a real world scenario, that background task would be started here. + async fn start(&mut self) -> Result<(), PluginError> { + println!( + "HeartbeatService: Setting up heartbeat service with interval: {:?}!", + self.interval_duration + ); + + for service in &self.monitored_services { + let mut interval = tokio::time::interval(self.interval_duration); + let service = service.clone(); + let cancel_token = self.cancel_token.child_token(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = interval.tick() => {} + _ = cancel_token.cancelled() => { + break + } + } + println!( + "HeartbeatService: Sending heartbeat to service: {:?}", + service + ); + service.1.send_and_wait(Heartbeat).await.unwrap(); + service.1.send_and_wait(RandomData).await.unwrap(); + } + }); + } + Ok(()) + } + + /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down + async fn shutdown(&mut self) -> Result<(), PluginError> { + println!("HeartbeatService: Shutting down heartbeat service!"); + Ok(()) + } +} + +impl HeartbeatService { + fn new( + interval_duration: Duration, + monitored_services: Vec<(String, Address)>, + cancel_token: CancellationToken, + ) -> Self { + Self { + interval_duration, + monitored_services, + cancel_token, + } + } +} + +/// A plugin that receives heartbeats +struct LogServiceBuilder; + +// declare a set of messages that the CriticalService can receive. +// In this example, it can only receive a Heartbeat. +tedge_api::make_receiver_bundle!(struct HeartbeatMessages(Heartbeat, RandomData)); + +#[async_trait] +impl PluginBuilder for LogServiceBuilder { + fn kind_name() -> &'static str { + todo!() + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + LogService::get_handled_types() + } + + async fn verify_configuration( + &self, + _config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + Ok(()) + } + + async fn instantiate( + &self, + _config: PluginConfiguration, + _cancellation_token: CancellationToken, + _plugin_dir: &PD, + ) -> Result + where + PD: 'async_trait, + { + Ok(LogService {}.finish()) + } +} + +/// The actual "critical" plugin implementation +struct LogService {} + +/// The CriticalService can receive Heartbeat objects, thus it needs a Handle +/// implementation +#[async_trait] +impl Handle for LogService { + async fn handle_message( + &self, + message: AnyMessage, + _sender: ReplySenderFor, + ) -> Result<(), PluginError> { + println!("LogService: Received Message: {:?}", message); + Ok(()) + } +} + +impl PluginDeclaration for LogService { + type HandledMessages = AnyMessages; +} + +/// Because the CriticalService is of course a Plugin, it needs an implementation for that as well. +#[async_trait] +impl Plugin for LogService { + async fn start(&mut self) -> Result<(), PluginError> { + println!("CriticalService: Setting up critical service!"); + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + println!("CriticalService: Shutting down critical service service!"); + Ok(()) + } +} + +// The following pieces of code would be implemented by a "core" component, that is responsible for +// setting up plugins and their communication. +// +// Plugin authors do not need to write this code, but need a basic understanding what it does and +// how it works. +// As this is an example, we implement it here to showcase how it is done. +// + +/// Helper type for keeping information about plugins during runtime +#[derive(Debug)] +struct PluginInfo { + types: Vec, + receiver: Option, + sender: tedge_api::address::MessageSender, +} + +/// The type that provides the communication infrastructure to the plugins. +#[derive(Debug)] +struct Communication { + plugins: HashMap, +} + +impl Communication { + pub fn declare_plugin>(&mut self, name: &str) { + let (sender, receiver) = tokio::sync::mpsc::channel(10); + self.plugins.insert( + name.to_owned(), + PluginInfo { + types: PB::kind_message_types().into_types(), + sender, + receiver: Some(receiver), + }, + ); + } +} + +impl PluginDirectory for Communication { + fn get_address_for( + &self, + name: &str, + ) -> Result, tedge_api::error::DirectoryError> { + let asked_types: Vec<_> = MB::get_ids().into_iter().collect(); + + let plug = self.plugins.get(name).unwrap_or_else(|| { + // This is an example, so we panic!() here. + // In real-world, we would do some reporting and return an error + panic!( + "Didn't find plugin with name {}, got: {:?}", + name, + self.plugins.keys().collect::>() + ) + }); + + if !asked_types + .iter() + .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type))) + { + // This is an example, so we panic!() here + // In real-world, we would do some reporting and return an error + panic!( + "Asked for {:#?} but plugin {} only has types {:#?}", + asked_types, name, plug.types, + ); + } else { + Ok(Address::new(plug.sender.clone())) + } + } + + fn get_address_for_core(&self) -> Address { + todo!() + } +} + +/// Helper function +async fn build_critical_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { + let csb = LogServiceBuilder; + + let config = toml::from_str("").unwrap(); + + csb.instantiate(config, cancel_token, comms).await.unwrap() +} + +/// Helper function +async fn build_heartbeat_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { + let hsb = HeartbeatServiceBuilder; + + let config = toml::from_str( + r#" + interval = 5000 + plugins = ["critical-service"] + "#, + ) + .unwrap(); + + hsb.instantiate(config, cancel_token, comms).await.unwrap() +} + +#[tokio::main] +async fn main() { + // This implementation now ties everything together + // + // This would be implemented in a CLI binary using the "core" implementation to boot things up. + // + // Here, we just tie everything together in the minimal possible way, to showcase how such a + // setup would basically work. + + let mut comms = Communication { + plugins: HashMap::new(), + }; + + // in a main(), the core would be told what plugins are available. + // This would, in a real-world scenario, not happen on the "communication" type directly. + // Still, this needs to be done by a main()-author. + comms.declare_plugin::("critical-service"); + comms.declare_plugin::("heartbeat"); + + // The following would all be handled by the core implementation, a main() author would only + // need to call some kind of "run everything" function + + let cancel_token = CancellationToken::new(); + + let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; + let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; + + heartbeat.plugin_mut().start().await.unwrap(); + critical_service.plugin_mut().start().await.unwrap(); + + let mut recv = comms + .plugins + .get_mut("heartbeat") + .unwrap() + .receiver + .take() + .unwrap(); + + let hb_cancel_token = cancel_token.child_token(); + let hb_handle = tokio::task::spawn(async move { + let hb = heartbeat; + + loop { + tokio::select! { + Some(msg) = recv.recv() => { + hb.handle_message(msg).await.unwrap(); + } + _ = hb_cancel_token.cancelled() => break, + } + } + + hb + }); + + let mut recv = comms + .plugins + .get_mut("critical-service") + .unwrap() + .receiver + .take() + .unwrap(); + + let cs_cancel_token = cancel_token.child_token(); + let cs_handle = tokio::task::spawn(async move { + let cs = critical_service; + + loop { + tokio::select! { + Some(msg) = recv.recv() => { + cs.handle_message(msg).await.unwrap(); + } + _ = cs_cancel_token.cancelled() => break, + } + } + + cs + }); + + println!("Core: Stopping everything in 10 seconds!"); + tokio::time::sleep(Duration::from_secs(12)).await; + + println!("Core: SHUTTING DOWN"); + cancel_token.cancel(); + + let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); + + heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + critical_service + .unwrap() + .plugin_mut() + .shutdown() + .await + .unwrap(); + + println!("Core: Shut down"); +} -- cgit v1.2.3