From c50d5c73d4d8cdcf6827a8702d0da618cfe5111e Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 18 Mar 2022 09:07:27 +0100 Subject: Add documentation to example Signed-off-by: Matthias Beyer --- crates/core/tedge_api/examples/heartbeat.rs | 111 ++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 24 deletions(-) (limited to 'crates/core/tedge_api/examples/heartbeat.rs') diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 5ef99d53..12554c17 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,18 +1,21 @@ use std::{ any::TypeId, collections::{HashMap, HashSet}, + time::Duration, }; use async_trait::async_trait; use tedge_api::{ plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt}, - Address, PluginDirectory, Plugin, PluginBuilder, PluginConfiguration, PluginError, + Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +/// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] struct Heartbeat; impl Message for Heartbeat {} +/// The reply for a heartbeat #[derive(Debug)] enum HeartbeatStatusReply { Alive, @@ -20,14 +23,13 @@ enum HeartbeatStatusReply { } impl Message for HeartbeatStatusReply {} +/// A PluginBuilder that gets used to build a HeartbeatService plugin instance #[derive(Debug)] struct HeartbeatServiceBuilder; -type HeartbeatMessages = (HeartbeatStatusReply,); - #[async_trait] impl PluginBuilder for HeartbeatServiceBuilder { - fn kind_name(&self) -> &'static str { + fn kind_name() -> &'static str { todo!() } @@ -57,41 +59,59 @@ impl PluginBuilder for HeartbeatServiceBuilder { let monitored_services = hb_config .plugins .iter() - .map(|name| tedge_comms.get_address_for::(name)) + .map(|name| tedge_comms.get_address_for::(name)) .collect::, _>>()?; - Ok( - HeartbeatService::new(hb_config, monitored_services) - .into_untyped::(), + Ok(HeartbeatService::new( + Duration::from_millis(hb_config.interval), + monitored_services, ) + .into_untyped::<(HeartbeatStatusReply,)>()) } } +/// The configuration a HeartbeatServices can receive is represented by this type #[derive(serde::Deserialize, Debug)] struct HeartbeatConfig { interval: u64, plugins: Vec, } +/// The HeartbeatService type represents the actual plugin struct HeartbeatService { - config: HeartbeatConfig, - monitored_services: Vec>, + interval_duration: Duration, + monitored_services: Vec>, } #[async_trait] impl Plugin for HeartbeatService { + /// The setup function of the HeartbeatService can be used by the plugin author to setup for + /// example a connection to an external service. In this example, it is simply used to send the + /// heartbeat + /// + /// Because this example is _simple_, we do not spawn a background task that periodically sends + /// the heartbeat. In a real world scenario, that background task would be started here. async fn setup(&mut self) -> Result<(), PluginError> { println!( - "Setting up heartbeat service with interval: {}!", - self.config.interval + "Setting up heartbeat service with interval: {:?}!", + self.interval_duration ); - for service in &self.monitored_services { - println!("Sending heartbeat to service"); - service.send(Heartbeat).await.unwrap(); - } + let mut interval = tokio::time::interval(self.interval_duration); + let services = self.monitored_services.clone(); + + tokio::spawn(async move { + loop { + interval.tick().await; + for service in &services { + println!("Sending heartbeat to service: {:?}", service); + service.send(Heartbeat).await.unwrap(); + } + } + }); Ok(()) } + /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down async fn shutdown(&mut self) -> Result<(), PluginError> { println!("Shutting down heartbeat service!"); Ok(()) @@ -100,16 +120,18 @@ impl Plugin for HeartbeatService { impl HeartbeatService { fn new( - config: HeartbeatConfig, - monitored_services: Vec>, + interval_duration: Duration, + monitored_services: Vec>, ) -> Self { Self { - config, + interval_duration, monitored_services, } } } +/// The Handle implementation is called when the HeartbeatService receives a +/// HeartbeatStatusReply #[async_trait] impl Handle for HeartbeatService { async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> { @@ -118,13 +140,16 @@ impl Handle for HeartbeatService { } } +/// A plugin that receives heartbeats struct CriticalServiceBuilder; -tedge_api::make_message_bundle!(struct CriticalServiceMessage(Heartbeat)); +// declare a set of messages that the CriticalService can receive. +// In this example, it can only receive a Heartbeat. +tedge_api::make_message_bundle!(struct HeartbeatMessages(Heartbeat)); #[async_trait] impl PluginBuilder for CriticalServiceBuilder { - fn kind_name(&self) -> &'static str { + fn kind_name() -> &'static str { todo!() } @@ -154,8 +179,11 @@ impl PluginBuilder for CriticalServiceBuilder { } } +/// The actual "critical" plugin implementation struct CriticalService; +/// The CriticalService can receive Heartbeat objects, thus it needs a Handle +/// implementation #[async_trait] impl Handle for CriticalService { async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> { @@ -164,6 +192,7 @@ impl Handle for CriticalService { } } +/// Because the CriticalService is of course a Plugin, it needs an implementation for that as well. #[async_trait] impl Plugin for CriticalService { async fn setup(&mut self) -> Result<(), PluginError> { @@ -177,6 +206,16 @@ impl Plugin for CriticalService { } } +// +// The following pieces of code would be implemented by a "core" component, that is responsible for +// setting up plugins and their communication. +// +// Plugin authors do not need to write this code, but need a basic understanding what it does and +// how it works. +// As this is an example, we implement it here to showcase how it is done. +// + +/// Helper type for keeping information about plugins during runtime #[derive(Debug)] struct PluginInfo { types: HashSet<(&'static str, TypeId)>, @@ -194,6 +233,7 @@ impl Clone for PluginInfo { } } +/// The type that provides the communication infrastructure to the plugins. #[derive(Clone, Debug)] struct Communication { plugins: HashMap, @@ -221,6 +261,8 @@ impl PluginDirectory for Communication { let types = MB::get_ids().into_iter().collect(); let plug = self.plugins.get(name).unwrap_or_else(|| { + // This is an example, so we panic!() here. + // In real-world, we would do some reporting and return an error panic!( "Didn't find plugin with name {}, got: {:?}", name, @@ -229,6 +271,8 @@ impl PluginDirectory for Communication { }); if !plug.types.is_superset(&types) { + // This is an example, so we panic!() here + // In real-world, we would do some reporting and return an error panic!( "Asked for {:#?} but plugin {} only has types {:#?}", types, name, plug.types, @@ -237,8 +281,13 @@ impl PluginDirectory for Communication { Ok(Address::new(plug.sender.clone())) } } + + fn get_address_for_core(&self) -> Result, PluginError> { + todo!() + } } +/// Helper function async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin { let csb = CriticalServiceBuilder; @@ -247,12 +296,13 @@ async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin { csb.instantiate(config, comms).await.unwrap() } +/// Helper function async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { let hsb = HeartbeatServiceBuilder; let config = toml::from_str( r#" - interval = 200 + interval = 500 plugins = ["critical-service"] "#, ) @@ -263,13 +313,26 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { #[tokio::main] async fn main() { + // This implementation now ties everything together + // + // This would be implemented in a CLI binary using the "core" implementation to boot things up. + // + // Here, we just tie everything together in the minimal possible way, to showcase how such a + // setup would basically work. + let mut comms = Communication { plugins: HashMap::new(), }; + // in a main(), the core would be told what plugins are available. + // This would, in a real-world scenario, not happen on the "communication" type directly. + // Still, this needs to be done by a main()-author. comms.declare_plugin::("critical-service"); comms.declare_plugin::("heartbeat"); + // The following would all be handled by the core implementation, a main() author would only + // need to call some kind of "run everything" function + let mut heartbeat = build_heartbeat_plugin(&mut comms).await; let mut critical_service = build_critical_plugin(&mut comms).await; @@ -287,7 +350,7 @@ async fn main() { let hb_handle = tokio::task::spawn(async move { let hb = heartbeat; - for msg in recv.recv().await { + while let Some(msg) = recv.recv().await { hb.handle_message(msg).await.unwrap(); } @@ -305,7 +368,7 @@ async fn main() { let cs_handle = tokio::task::spawn(async move { let cs = critical_service; - for msg in recv.recv().await { + while let Some(msg) = recv.recv().await { println!("Critical service received message!"); cs.handle_message(msg).await.unwrap(); } -- cgit v1.2.3