diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-03-21 10:21:59 +0100 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-03-21 10:21:59 +0100 |
commit | 760bd96d459947dc10fcb9e415ae46669f1a04e5 (patch) | |
tree | 5924f7b02ce54bf311de9e3f50fa53502b599139 | |
parent | 065762123d9d85e8d02256ed6c698581f6f267ef (diff) |
Add cancellation token
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r-- | Cargo.lock | 21 | ||||
-rw-r--r-- | crates/core/tedge_api/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 111 | ||||
-rw-r--r-- | crates/core/tedge_api/src/plugin.rs | 2 |
4 files changed, 101 insertions, 34 deletions
@@ -995,7 +995,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.6.9", "tracing", ] @@ -2162,7 +2162,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls 0.23.2", - "tokio-util", + "tokio-util 0.6.9", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -2704,6 +2704,7 @@ dependencies = [ "static_assertions", "thiserror", "tokio", + "tokio-util 0.7.0", "toml", "uuid", ] @@ -3067,6 +3068,20 @@ dependencies = [ ] [[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] name = "toml" version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3366,7 +3381,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", - "tokio-util", + "tokio-util 0.6.9", "tower-service", "tracing", ] diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml index 9309f2bc..26cfac70 100644 --- a/crates/core/tedge_api/Cargo.toml +++ b/crates/core/tedge_api/Cargo.toml @@ -12,6 +12,7 @@ downcast-rs = "1.2.0" futures = "0.3.21" thiserror = "1.0.30" tokio = { version = "1.16.1", features = ["sync"] } +tokio-util = "0.7.0" toml = "0.5.8" uuid = { version = "0.8", features = ["serde", "v4"] } diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 3a680b41..ec12393b 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -5,12 +5,14 @@ use std::{ }; use async_trait::async_trait; +use futures::FutureExt; use tedge_api::{ address::ReplySender, message::NoReply, plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt}, Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio_util::sync::CancellationToken; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -56,6 +58,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder { async fn instantiate( &self, config: PluginConfiguration, + cancellation_token: CancellationToken, tedge_comms: &PD, ) -> Result<BuiltPlugin, PluginError> where @@ -74,6 +77,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder { Ok(HeartbeatService::new( Duration::from_millis(hb_config.interval), monitored_services, + cancellation_token, ) .into_untyped::<()>()) } @@ -90,6 +94,7 @@ struct HeartbeatConfig { struct HeartbeatService { interval_duration: Duration, monitored_services: Vec<(String, Address<HeartbeatMessages>)>, + cancel_token: CancellationToken, } #[async_trait] @@ -109,36 +114,50 @@ impl Plugin for HeartbeatService { for service in &self.monitored_services { let mut interval = tokio::time::interval(self.interval_duration); let service = service.clone(); + let cancel_token = self.cancel_token.child_token(); tokio::spawn(async move { loop { - interval.tick().await; + tokio::select! { + _ = interval.tick() => {} + _ = cancel_token.cancelled() => { + break + } + } println!( "HeartbeatService: Sending heartbeat to service: {:?}", service ); - match service + tokio::select! { + reply = service .1 .send(Heartbeat) - .await - .unwrap() - .wait_for_reply(Duration::from_millis(100)) - .await - { - Ok(HeartbeatStatus::Alive) => { - println!("HeartbeatService: Received all is well!") - } - Ok(HeartbeatStatus::Degraded) => { - println!( - "HeartbeatService: Oh-oh! Plugin '{}' is not doing well", - service.0 - ) + .then(|answer| { + answer.unwrap() + .wait_for_reply(Duration::from_millis(100))} + ) => { + match reply + { + Ok(HeartbeatStatus::Alive) => { + println!("HeartbeatService: Received all is well!") + } + Ok(HeartbeatStatus::Degraded) => { + println!( + "HeartbeatService: Oh-oh! Plugin '{}' is not doing well", + service.0 + ) + } + + Err(reply_error) => { + println!( + "HeartbeatService: Critical error for '{}'! {reply_error}", + service.0 + ) + } + } } - Err(reply_error) => { - println!( - "HeartbeatService: Critical error for '{}'! {reply_error}", - service.0 - ) + _ = cancel_token.cancelled() => { + break } } } @@ -158,10 +177,12 @@ impl HeartbeatService { fn new( interval_duration: Duration, monitored_services: Vec<(String, Address<HeartbeatMessages>)>, + cancel_token: CancellationToken, ) -> Self { Self { interval_duration, monitored_services, + cancel_token, } } } @@ -196,6 +217,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder { async fn instantiate( &self, _config: PluginConfiguration, + _cancellation_token: CancellationToken, _tedge_comms: &PD, ) -> Result<BuiltPlugin, PluginError> where @@ -334,16 +356,22 @@ impl PluginDirectory for Communication { } /// Helper function -async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin { +async fn build_critical_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { let csb = CriticalServiceBuilder; let config = toml::from_str("").unwrap(); - csb.instantiate(config, comms).await.unwrap() + csb.instantiate(config, cancel_token, comms).await.unwrap() } /// Helper function -async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { +async fn build_heartbeat_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { let hsb = HeartbeatServiceBuilder; let config = toml::from_str( @@ -354,7 +382,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { ) .unwrap(); - hsb.instantiate(config, comms).await.unwrap() + hsb.instantiate(config, cancel_token, comms).await.unwrap() } #[tokio::main] @@ -379,8 +407,10 @@ async fn main() { // The following would all be handled by the core implementation, a main() author would only // need to call some kind of "run everything" function - let mut heartbeat = build_heartbeat_plugin(&mut comms).await; - let mut critical_service = build_critical_plugin(&mut comms).await; + let cancel_token = CancellationToken::new(); + + let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; + let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; heartbeat.plugin_mut().setup().await.unwrap(); critical_service.plugin_mut().setup().await.unwrap(); @@ -393,11 +423,17 @@ async fn main() { .take() .unwrap(); + let hb_cancel_token = cancel_token.child_token(); let hb_handle = tokio::task::spawn(async move { let hb = heartbeat; - while let Some(msg) = recv.recv().await { - hb.handle_message(msg).await.unwrap(); + loop { + tokio::select! { + Some(msg) = recv.recv() => { + hb.handle_message(msg).await.unwrap(); + } + _ = hb_cancel_token.cancelled() => break, + } } hb @@ -411,17 +447,28 @@ async fn main() { .take() .unwrap(); + let cs_cancel_token = cancel_token.child_token(); let cs_handle = tokio::task::spawn(async move { let cs = critical_service; - while let Some(msg) = recv.recv().await { - println!("Critical service received message!"); - cs.handle_message(msg).await.unwrap(); + loop { + tokio::select! { + Some(msg) = recv.recv() => { + cs.handle_message(msg).await.unwrap(); + } + _ = cs_cancel_token.cancelled() => break, + } } cs }); + println!("Core: Stopping everything in 10 seconds!"); + tokio::time::sleep(Duration::from_secs(12)).await; + + println!("Core: SHUTTING DOWN"); + cancel_token.cancel(); + let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); @@ -431,4 +478,6 @@ async fn main() { .shutdown() .await .unwrap(); + + println!("Core: Shut down"); } diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs index 3be6bacb..34a3ceef 100644 --- a/crates/core/tedge_api/src/plugin.rs +++ b/crates/core/tedge_api/src/plugin.rs @@ -5,6 +5,7 @@ //! 2. Create your plugin struct that implements `Plugin` use futures::future::BoxFuture; +use tokio_util::sync::CancellationToken; use std::{ any::{Any, TypeId}, collections::HashSet, @@ -228,6 +229,7 @@ pub trait PluginBuilder<PD: PluginDirectory>: Sync + Send + 'static { async fn instantiate( &self, config: PluginConfiguration, + cancellation_token: CancellationToken, core_comms: &PD, ) -> Result<BuiltPlugin, PluginError> where |