From 196f7600234bc131956fad295959928153ded638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=BCller?= Date: Sat, 19 Mar 2022 12:18:12 +0100 Subject: Add reply functionality to messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marcel Müller --- crates/core/tedge_api/examples/heartbeat.rs | 122 +++++++++++++++++++--------- 1 file changed, 84 insertions(+), 38 deletions(-) (limited to 'crates/core/tedge_api/examples') diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 12554c17..d2fc2f21 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -6,6 +6,8 @@ use std::{ use async_trait::async_trait; use tedge_api::{ + address::ReplySender, + message::NoReply, plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt}, Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; @@ -13,15 +15,19 @@ use tedge_api::{ /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] struct Heartbeat; -impl Message for Heartbeat {} +impl Message for Heartbeat { + type Reply = HeartbeatStatus; +} /// The reply for a heartbeat #[derive(Debug)] -enum HeartbeatStatusReply { +enum HeartbeatStatus { Alive, Degraded, } -impl Message for HeartbeatStatusReply {} +impl Message for HeartbeatStatus { + type Reply = NoReply; +} /// A PluginBuilder that gets used to build a HeartbeatService plugin instance #[derive(Debug)] @@ -37,7 +43,7 @@ impl PluginBuilder for HeartbeatServiceBuilder { where Self: Sized, { - HandleTypes::get_handlers_for::<(HeartbeatStatusReply,), HeartbeatService>() + HandleTypes::empty() } async fn verify_configuration( @@ -59,13 +65,17 @@ impl PluginBuilder for HeartbeatServiceBuilder { let monitored_services = hb_config .plugins .iter() - .map(|name| tedge_comms.get_address_for::(name)) + .map(|name| { + tedge_comms + .get_address_for::(name) + .map(|addr| (name.clone(), addr)) + }) .collect::, _>>()?; Ok(HeartbeatService::new( Duration::from_millis(hb_config.interval), monitored_services, ) - .into_untyped::<(HeartbeatStatusReply,)>()) + .into_untyped::<()>()) } } @@ -79,7 +89,7 @@ struct HeartbeatConfig { /// The HeartbeatService type represents the actual plugin struct HeartbeatService { interval_duration: Duration, - monitored_services: Vec>, + monitored_services: Vec<(String, Address)>, } #[async_trait] @@ -92,28 +102,54 @@ impl Plugin for HeartbeatService { /// the heartbeat. In a real world scenario, that background task would be started here. async fn setup(&mut self) -> Result<(), PluginError> { println!( - "Setting up heartbeat service with interval: {:?}!", + "HeartbeatService: Setting up heartbeat service with interval: {:?}!", self.interval_duration ); - let mut interval = tokio::time::interval(self.interval_duration); - let services = self.monitored_services.clone(); - - tokio::spawn(async move { - loop { - interval.tick().await; - for service in &services { - println!("Sending heartbeat to service: {:?}", service); - service.send(Heartbeat).await.unwrap(); + for service in &self.monitored_services { + let mut interval = tokio::time::interval(self.interval_duration); + let service = service.clone(); + tokio::spawn(async move { + loop { + interval.tick().await; + println!( + "HeartbeatService: Sending heartbeat to service: {:?}", + service + ); + match service + .1 + .send(Heartbeat) + .await + .unwrap() + .wait_for_reply(Duration::from_millis(100)) + .await + { + Ok(HeartbeatStatus::Alive) => { + println!("HeartbeatService: Received all is well!") + } + Ok(HeartbeatStatus::Degraded) => { + println!( + "HeartbeatService: Oh-oh! Plugin '{}' is not doing well", + service.0 + ) + } + + Err(reply_error) => { + println!( + "HeartbeatService: Critical error for '{}'! {reply_error}", + service.0 + ) + } + } } - } - }); + }); + } Ok(()) } /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down async fn shutdown(&mut self) -> Result<(), PluginError> { - println!("Shutting down heartbeat service!"); + println!("HeartbeatService: Shutting down heartbeat service!"); Ok(()) } } @@ -121,7 +157,7 @@ impl Plugin for HeartbeatService { impl HeartbeatService { fn new( interval_duration: Duration, - monitored_services: Vec>, + monitored_services: Vec<(String, Address)>, ) -> Self { Self { interval_duration, @@ -130,16 +166,6 @@ impl HeartbeatService { } } -/// The Handle implementation is called when the HeartbeatService receives a -/// HeartbeatStatusReply -#[async_trait] -impl Handle for HeartbeatService { - async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> { - println!("Received HeartbeatReply!"); - Ok(()) - } -} - /// A plugin that receives heartbeats struct CriticalServiceBuilder; @@ -175,19 +201,39 @@ impl PluginBuilder for CriticalServiceBuilder { where PD: 'async_trait, { - Ok(CriticalService {}.into_untyped::<(Heartbeat,)>()) + Ok(CriticalService { + status: tokio::sync::Mutex::new(true), + } + .into_untyped::<(Heartbeat,)>()) } } /// The actual "critical" plugin implementation -struct CriticalService; +struct CriticalService { + status: tokio::sync::Mutex, +} /// The CriticalService can receive Heartbeat objects, thus it needs a Handle /// implementation #[async_trait] impl Handle for CriticalService { - async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> { - println!("Received Heartbeat!"); + async fn handle_message( + &self, + _message: Heartbeat, + sender: ReplySender, + ) -> Result<(), PluginError> { + println!("CriticalService: Received Heartbeat!"); + let mut status = self.status.lock().await; + + let _ = sender.reply(if *status { + println!("CriticalService: Sending back alive!"); + HeartbeatStatus::Alive + } else { + println!("CriticalService: Sending back degraded!"); + HeartbeatStatus::Degraded + }); + + *status = !*status; Ok(()) } } @@ -196,12 +242,12 @@ impl Handle for CriticalService { #[async_trait] impl Plugin for CriticalService { async fn setup(&mut self) -> Result<(), PluginError> { - println!("Setting up critical service!"); + println!("CriticalService: Setting up critical service!"); Ok(()) } async fn shutdown(&mut self) -> Result<(), PluginError> { - println!("Shutting down critical service service!"); + println!("CriticalService: Shutting down critical service service!"); Ok(()) } } @@ -302,7 +348,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { let config = toml::from_str( r#" - interval = 500 + interval = 5000 plugins = ["critical-service"] "#, ) -- cgit v1.2.3