From 760bd96d459947dc10fcb9e415ae46669f1a04e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=BCller?= Date: Mon, 21 Mar 2022 10:21:59 +0100 Subject: Add cancellation token MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marcel Müller --- crates/core/tedge_api/examples/heartbeat.rs | 111 ++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 31 deletions(-) (limited to 'crates/core/tedge_api/examples') diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 3a680b41..ec12393b 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -5,12 +5,14 @@ use std::{ }; use async_trait::async_trait; +use futures::FutureExt; use tedge_api::{ address::ReplySender, message::NoReply, plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt}, Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio_util::sync::CancellationToken; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -56,6 +58,7 @@ impl PluginBuilder for HeartbeatServiceBuilder { async fn instantiate( &self, config: PluginConfiguration, + cancellation_token: CancellationToken, tedge_comms: &PD, ) -> Result where @@ -74,6 +77,7 @@ impl PluginBuilder for HeartbeatServiceBuilder { Ok(HeartbeatService::new( Duration::from_millis(hb_config.interval), monitored_services, + cancellation_token, ) .into_untyped::<()>()) } @@ -90,6 +94,7 @@ struct HeartbeatConfig { struct HeartbeatService { interval_duration: Duration, monitored_services: Vec<(String, Address)>, + cancel_token: CancellationToken, } #[async_trait] @@ -109,36 +114,50 @@ impl Plugin for HeartbeatService { for service in &self.monitored_services { let mut interval = tokio::time::interval(self.interval_duration); let service = service.clone(); + let cancel_token = self.cancel_token.child_token(); tokio::spawn(async move { loop { - interval.tick().await; + tokio::select! { + _ = interval.tick() => {} + _ = cancel_token.cancelled() => { + break + } + } println!( "HeartbeatService: Sending heartbeat to service: {:?}", service ); - match service + tokio::select! { + reply = service .1 .send(Heartbeat) - .await - .unwrap() - .wait_for_reply(Duration::from_millis(100)) - .await - { - Ok(HeartbeatStatus::Alive) => { - println!("HeartbeatService: Received all is well!") - } - Ok(HeartbeatStatus::Degraded) => { - println!( - "HeartbeatService: Oh-oh! Plugin '{}' is not doing well", - service.0 - ) + .then(|answer| { + answer.unwrap() + .wait_for_reply(Duration::from_millis(100))} + ) => { + match reply + { + Ok(HeartbeatStatus::Alive) => { + println!("HeartbeatService: Received all is well!") + } + Ok(HeartbeatStatus::Degraded) => { + println!( + "HeartbeatService: Oh-oh! Plugin '{}' is not doing well", + service.0 + ) + } + + Err(reply_error) => { + println!( + "HeartbeatService: Critical error for '{}'! {reply_error}", + service.0 + ) + } + } } - Err(reply_error) => { - println!( - "HeartbeatService: Critical error for '{}'! {reply_error}", - service.0 - ) + _ = cancel_token.cancelled() => { + break } } } @@ -158,10 +177,12 @@ impl HeartbeatService { fn new( interval_duration: Duration, monitored_services: Vec<(String, Address)>, + cancel_token: CancellationToken, ) -> Self { Self { interval_duration, monitored_services, + cancel_token, } } } @@ -196,6 +217,7 @@ impl PluginBuilder for CriticalServiceBuilder { async fn instantiate( &self, _config: PluginConfiguration, + _cancellation_token: CancellationToken, _tedge_comms: &PD, ) -> Result where @@ -334,16 +356,22 @@ impl PluginDirectory for Communication { } /// Helper function -async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin { +async fn build_critical_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { let csb = CriticalServiceBuilder; let config = toml::from_str("").unwrap(); - csb.instantiate(config, comms).await.unwrap() + csb.instantiate(config, cancel_token, comms).await.unwrap() } /// Helper function -async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { +async fn build_heartbeat_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { let hsb = HeartbeatServiceBuilder; let config = toml::from_str( @@ -354,7 +382,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { ) .unwrap(); - hsb.instantiate(config, comms).await.unwrap() + hsb.instantiate(config, cancel_token, comms).await.unwrap() } #[tokio::main] @@ -379,8 +407,10 @@ async fn main() { // The following would all be handled by the core implementation, a main() author would only // need to call some kind of "run everything" function - let mut heartbeat = build_heartbeat_plugin(&mut comms).await; - let mut critical_service = build_critical_plugin(&mut comms).await; + let cancel_token = CancellationToken::new(); + + let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; + let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; heartbeat.plugin_mut().setup().await.unwrap(); critical_service.plugin_mut().setup().await.unwrap(); @@ -393,11 +423,17 @@ async fn main() { .take() .unwrap(); + let hb_cancel_token = cancel_token.child_token(); let hb_handle = tokio::task::spawn(async move { let hb = heartbeat; - while let Some(msg) = recv.recv().await { - hb.handle_message(msg).await.unwrap(); + loop { + tokio::select! { + Some(msg) = recv.recv() => { + hb.handle_message(msg).await.unwrap(); + } + _ = hb_cancel_token.cancelled() => break, + } } hb @@ -411,17 +447,28 @@ async fn main() { .take() .unwrap(); + let cs_cancel_token = cancel_token.child_token(); let cs_handle = tokio::task::spawn(async move { let cs = critical_service; - while let Some(msg) = recv.recv().await { - println!("Critical service received message!"); - cs.handle_message(msg).await.unwrap(); + loop { + tokio::select! { + Some(msg) = recv.recv() => { + cs.handle_message(msg).await.unwrap(); + } + _ = cs_cancel_token.cancelled() => break, + } } cs }); + println!("Core: Stopping everything in 10 seconds!"); + tokio::time::sleep(Duration::from_secs(12)).await; + + println!("Core: SHUTTING DOWN"); + cancel_token.cancel(); + let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); @@ -431,4 +478,6 @@ async fn main() { .shutdown() .await .unwrap(); + + println!("Core: Shut down"); } -- cgit v1.2.3