summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_api/examples
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-03-21 10:21:59 +0100
committerMarcel Müller <m.mueller@ifm.com>2022-03-21 10:21:59 +0100
commit760bd96d459947dc10fcb9e415ae46669f1a04e5 (patch)
tree5924f7b02ce54bf311de9e3f50fa53502b599139 /crates/core/tedge_api/examples
parent065762123d9d85e8d02256ed6c698581f6f267ef (diff)
Add cancellation token
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/examples')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs111
1 files changed, 80 insertions, 31 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 3a680b41..ec12393b 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -5,12 +5,14 @@ use std::{
};
use async_trait::async_trait;
+use futures::FutureExt;
use tedge_api::{
address::ReplySender,
message::NoReply,
plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt},
Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError,
};
+use tokio_util::sync::CancellationToken;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -56,6 +58,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
async fn instantiate(
&self,
config: PluginConfiguration,
+ cancellation_token: CancellationToken,
tedge_comms: &PD,
) -> Result<BuiltPlugin, PluginError>
where
@@ -74,6 +77,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
Ok(HeartbeatService::new(
Duration::from_millis(hb_config.interval),
monitored_services,
+ cancellation_token,
)
.into_untyped::<()>())
}
@@ -90,6 +94,7 @@ struct HeartbeatConfig {
struct HeartbeatService {
interval_duration: Duration,
monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
+ cancel_token: CancellationToken,
}
#[async_trait]
@@ -109,36 +114,50 @@ impl Plugin for HeartbeatService {
for service in &self.monitored_services {
let mut interval = tokio::time::interval(self.interval_duration);
let service = service.clone();
+ let cancel_token = self.cancel_token.child_token();
tokio::spawn(async move {
loop {
- interval.tick().await;
+ tokio::select! {
+ _ = interval.tick() => {}
+ _ = cancel_token.cancelled() => {
+ break
+ }
+ }
println!(
"HeartbeatService: Sending heartbeat to service: {:?}",
service
);
- match service
+ tokio::select! {
+ reply = service
.1
.send(Heartbeat)
- .await
- .unwrap()
- .wait_for_reply(Duration::from_millis(100))
- .await
- {
- Ok(HeartbeatStatus::Alive) => {
- println!("HeartbeatService: Received all is well!")
- }
- Ok(HeartbeatStatus::Degraded) => {
- println!(
- "HeartbeatService: Oh-oh! Plugin '{}' is not doing well",
- service.0
- )
+ .then(|answer| {
+ answer.unwrap()
+ .wait_for_reply(Duration::from_millis(100))}
+ ) => {
+ match reply
+ {
+ Ok(HeartbeatStatus::Alive) => {
+ println!("HeartbeatService: Received all is well!")
+ }
+ Ok(HeartbeatStatus::Degraded) => {
+ println!(
+ "HeartbeatService: Oh-oh! Plugin '{}' is not doing well",
+ service.0
+ )
+ }
+
+ Err(reply_error) => {
+ println!(
+ "HeartbeatService: Critical error for '{}'! {reply_error}",
+ service.0
+ )
+ }
+ }
}
- Err(reply_error) => {
- println!(
- "HeartbeatService: Critical error for '{}'! {reply_error}",
- service.0
- )
+ _ = cancel_token.cancelled() => {
+ break
}
}
}
@@ -158,10 +177,12 @@ impl HeartbeatService {
fn new(
interval_duration: Duration,
monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
+ cancel_token: CancellationToken,
) -> Self {
Self {
interval_duration,
monitored_services,
+ cancel_token,
}
}
}
@@ -196,6 +217,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder {
async fn instantiate(
&self,
_config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
_tedge_comms: &PD,
) -> Result<BuiltPlugin, PluginError>
where
@@ -334,16 +356,22 @@ impl PluginDirectory for Communication {
}
/// Helper function
-async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin {
+async fn build_critical_plugin(
+ comms: &mut Communication,
+ cancel_token: CancellationToken,
+) -> BuiltPlugin {
let csb = CriticalServiceBuilder;
let config = toml::from_str("").unwrap();
- csb.instantiate(config, comms).await.unwrap()
+ csb.instantiate(config, cancel_token, comms).await.unwrap()
}
/// Helper function
-async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
+async fn build_heartbeat_plugin(
+ comms: &mut Communication,
+ cancel_token: CancellationToken,
+) -> BuiltPlugin {
let hsb = HeartbeatServiceBuilder;
let config = toml::from_str(
@@ -354,7 +382,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
)
.unwrap();
- hsb.instantiate(config, comms).await.unwrap()
+ hsb.instantiate(config, cancel_token, comms).await.unwrap()
}
#[tokio::main]
@@ -379,8 +407,10 @@ async fn main() {
// The following would all be handled by the core implementation, a main() author would only
// need to call some kind of "run everything" function
- let mut heartbeat = build_heartbeat_plugin(&mut comms).await;
- let mut critical_service = build_critical_plugin(&mut comms).await;
+ let cancel_token = CancellationToken::new();
+
+ let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
heartbeat.plugin_mut().setup().await.unwrap();
critical_service.plugin_mut().setup().await.unwrap();
@@ -393,11 +423,17 @@ async fn main() {
.take()
.unwrap();
+ let hb_cancel_token = cancel_token.child_token();
let hb_handle = tokio::task::spawn(async move {
let hb = heartbeat;
- while let Some(msg) = recv.recv().await {
- hb.handle_message(msg).await.unwrap();
+ loop {
+ tokio::select! {
+ Some(msg) = recv.recv() => {
+ hb.handle_message(msg).await.unwrap();
+ }
+ _ = hb_cancel_token.cancelled() => break,
+ }
}
hb
@@ -411,17 +447,28 @@ async fn main() {
.take()
.unwrap();
+ let cs_cancel_token = cancel_token.child_token();
let cs_handle = tokio::task::spawn(async move {
let cs = critical_service;
- while let Some(msg) = recv.recv().await {
- println!("Critical service received message!");
- cs.handle_message(msg).await.unwrap();
+ loop {
+ tokio::select! {
+ Some(msg) = recv.recv() => {
+ cs.handle_message(msg).await.unwrap();
+ }
+ _ = cs_cancel_token.cancelled() => break,
+ }
}
cs
});
+ println!("Core: Stopping everything in 10 seconds!");
+ tokio::time::sleep(Duration::from_secs(12)).await;
+
+ println!("Core: SHUTTING DOWN");
+ cancel_token.cancel();
+
let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
@@ -431,4 +478,6 @@ async fn main() {
.shutdown()
.await
.unwrap();
+
+ println!("Core: Shut down");
}