summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_api/examples
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-03-19 12:18:12 +0100
committerMarcel Müller <m.mueller@ifm.com>2022-03-21 08:53:49 +0100
commit196f7600234bc131956fad295959928153ded638 (patch)
treea22ca17b606cd17c7b628ecc5f4acce1a43464b1 /crates/core/tedge_api/examples
parent2dc415263c691d021f8e60f5f72a0800e9b975de (diff)
Add reply functionality to messages
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/examples')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs122
1 files changed, 84 insertions, 38 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 12554c17..d2fc2f21 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -6,6 +6,8 @@ use std::{
use async_trait::async_trait;
use tedge_api::{
+ address::ReplySender,
+ message::NoReply,
plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt},
Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError,
};
@@ -13,15 +15,19 @@ use tedge_api::{
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
struct Heartbeat;
-impl Message for Heartbeat {}
+impl Message for Heartbeat {
+ type Reply = HeartbeatStatus;
+}
/// The reply for a heartbeat
#[derive(Debug)]
-enum HeartbeatStatusReply {
+enum HeartbeatStatus {
Alive,
Degraded,
}
-impl Message for HeartbeatStatusReply {}
+impl Message for HeartbeatStatus {
+ type Reply = NoReply;
+}
/// A PluginBuilder that gets used to build a HeartbeatService plugin instance
#[derive(Debug)]
@@ -37,7 +43,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
where
Self: Sized,
{
- HandleTypes::get_handlers_for::<(HeartbeatStatusReply,), HeartbeatService>()
+ HandleTypes::empty()
}
async fn verify_configuration(
@@ -59,13 +65,17 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
let monitored_services = hb_config
.plugins
.iter()
- .map(|name| tedge_comms.get_address_for::<HeartbeatMessages>(name))
+ .map(|name| {
+ tedge_comms
+ .get_address_for::<HeartbeatMessages>(name)
+ .map(|addr| (name.clone(), addr))
+ })
.collect::<Result<Vec<_>, _>>()?;
Ok(HeartbeatService::new(
Duration::from_millis(hb_config.interval),
monitored_services,
)
- .into_untyped::<(HeartbeatStatusReply,)>())
+ .into_untyped::<()>())
}
}
@@ -79,7 +89,7 @@ struct HeartbeatConfig {
/// The HeartbeatService type represents the actual plugin
struct HeartbeatService {
interval_duration: Duration,
- monitored_services: Vec<Address<HeartbeatMessages>>,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
}
#[async_trait]
@@ -92,28 +102,54 @@ impl Plugin for HeartbeatService {
/// the heartbeat. In a real world scenario, that background task would be started here.
async fn setup(&mut self) -> Result<(), PluginError> {
println!(
- "Setting up heartbeat service with interval: {:?}!",
+ "HeartbeatService: Setting up heartbeat service with interval: {:?}!",
self.interval_duration
);
- let mut interval = tokio::time::interval(self.interval_duration);
- let services = self.monitored_services.clone();
-
- tokio::spawn(async move {
- loop {
- interval.tick().await;
- for service in &services {
- println!("Sending heartbeat to service: {:?}", service);
- service.send(Heartbeat).await.unwrap();
+ for service in &self.monitored_services {
+ let mut interval = tokio::time::interval(self.interval_duration);
+ let service = service.clone();
+ tokio::spawn(async move {
+ loop {
+ interval.tick().await;
+ println!(
+ "HeartbeatService: Sending heartbeat to service: {:?}",
+ service
+ );
+ match service
+ .1
+ .send(Heartbeat)
+ .await
+ .unwrap()
+ .wait_for_reply(Duration::from_millis(100))
+ .await
+ {
+ Ok(HeartbeatStatus::Alive) => {
+ println!("HeartbeatService: Received all is well!")
+ }
+ Ok(HeartbeatStatus::Degraded) => {
+ println!(
+ "HeartbeatService: Oh-oh! Plugin '{}' is not doing well",
+ service.0
+ )
+ }
+
+ Err(reply_error) => {
+ println!(
+ "HeartbeatService: Critical error for '{}'! {reply_error}",
+ service.0
+ )
+ }
+ }
}
- }
- });
+ });
+ }
Ok(())
}
/// A plugin author can use this shutdown function to clean resources when thin-edge shuts down
async fn shutdown(&mut self) -> Result<(), PluginError> {
- println!("Shutting down heartbeat service!");
+ println!("HeartbeatService: Shutting down heartbeat service!");
Ok(())
}
}
@@ -121,7 +157,7 @@ impl Plugin for HeartbeatService {
impl HeartbeatService {
fn new(
interval_duration: Duration,
- monitored_services: Vec<Address<HeartbeatMessages>>,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
) -> Self {
Self {
interval_duration,
@@ -130,16 +166,6 @@ impl HeartbeatService {
}
}
-/// The Handle<HeartbeatStatusReply> implementation is called when the HeartbeatService receives a
-/// HeartbeatStatusReply
-#[async_trait]
-impl Handle<HeartbeatStatusReply> for HeartbeatService {
- async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> {
- println!("Received HeartbeatReply!");
- Ok(())
- }
-}
-
/// A plugin that receives heartbeats
struct CriticalServiceBuilder;
@@ -175,19 +201,39 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder {
where
PD: 'async_trait,
{
- Ok(CriticalService {}.into_untyped::<(Heartbeat,)>())
+ Ok(CriticalService {
+ status: tokio::sync::Mutex::new(true),
+ }
+ .into_untyped::<(Heartbeat,)>())
}
}
/// The actual "critical" plugin implementation
-struct CriticalService;
+struct CriticalService {
+ status: tokio::sync::Mutex<bool>,
+}
/// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat>
/// implementation
#[async_trait]
impl Handle<Heartbeat> for CriticalService {
- async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> {
- println!("Received Heartbeat!");
+ async fn handle_message(
+ &self,
+ _message: Heartbeat,
+ sender: ReplySender<HeartbeatStatus>,
+ ) -> Result<(), PluginError> {
+ println!("CriticalService: Received Heartbeat!");
+ let mut status = self.status.lock().await;
+
+ let _ = sender.reply(if *status {
+ println!("CriticalService: Sending back alive!");
+ HeartbeatStatus::Alive
+ } else {
+ println!("CriticalService: Sending back degraded!");
+ HeartbeatStatus::Degraded
+ });
+
+ *status = !*status;
Ok(())
}
}
@@ -196,12 +242,12 @@ impl Handle<Heartbeat> for CriticalService {
#[async_trait]
impl Plugin for CriticalService {
async fn setup(&mut self) -> Result<(), PluginError> {
- println!("Setting up critical service!");
+ println!("CriticalService: Setting up critical service!");
Ok(())
}
async fn shutdown(&mut self) -> Result<(), PluginError> {
- println!("Shutting down critical service service!");
+ println!("CriticalService: Shutting down critical service service!");
Ok(())
}
}
@@ -302,7 +348,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
let config = toml::from_str(
r#"
- interval = 500
+ interval = 5000
plugins = ["critical-service"]
"#,
)