summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_api/examples/heartbeat.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-03-18 09:07:27 +0100
committerMarcel Müller <m.mueller@ifm.com>2022-03-21 08:53:49 +0100
commitc50d5c73d4d8cdcf6827a8702d0da618cfe5111e (patch)
treed0b289d3102d99345b1b2d646f691e5b431d38b1 /crates/core/tedge_api/examples/heartbeat.rs
parentfcb446ddea5e9f51eb985cd6303170c6956d505e (diff)
Add documentation to example
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/examples/heartbeat.rs')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs111
1 files changed, 87 insertions, 24 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 5ef99d53..12554c17 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,18 +1,21 @@
use std::{
any::TypeId,
collections::{HashMap, HashSet},
+ time::Duration,
};
use async_trait::async_trait;
use tedge_api::{
plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt},
- Address, PluginDirectory, Plugin, PluginBuilder, PluginConfiguration, PluginError,
+ Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError,
};
+/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
struct Heartbeat;
impl Message for Heartbeat {}
+/// The reply for a heartbeat
#[derive(Debug)]
enum HeartbeatStatusReply {
Alive,
@@ -20,14 +23,13 @@ enum HeartbeatStatusReply {
}
impl Message for HeartbeatStatusReply {}
+/// A PluginBuilder that gets used to build a HeartbeatService plugin instance
#[derive(Debug)]
struct HeartbeatServiceBuilder;
-type HeartbeatMessages = (HeartbeatStatusReply,);
-
#[async_trait]
impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
- fn kind_name(&self) -> &'static str {
+ fn kind_name() -> &'static str {
todo!()
}
@@ -57,41 +59,59 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
let monitored_services = hb_config
.plugins
.iter()
- .map(|name| tedge_comms.get_address_for::<CriticalServiceMessage>(name))
+ .map(|name| tedge_comms.get_address_for::<HeartbeatMessages>(name))
.collect::<Result<Vec<_>, _>>()?;
- Ok(
- HeartbeatService::new(hb_config, monitored_services)
- .into_untyped::<HeartbeatMessages>(),
+ Ok(HeartbeatService::new(
+ Duration::from_millis(hb_config.interval),
+ monitored_services,
)
+ .into_untyped::<(HeartbeatStatusReply,)>())
}
}
+/// The configuration a HeartbeatServices can receive is represented by this type
#[derive(serde::Deserialize, Debug)]
struct HeartbeatConfig {
interval: u64,
plugins: Vec<String>,
}
+/// The HeartbeatService type represents the actual plugin
struct HeartbeatService {
- config: HeartbeatConfig,
- monitored_services: Vec<Address<CriticalServiceMessage>>,
+ interval_duration: Duration,
+ monitored_services: Vec<Address<HeartbeatMessages>>,
}
#[async_trait]
impl Plugin for HeartbeatService {
+ /// The setup function of the HeartbeatService can be used by the plugin author to setup for
+ /// example a connection to an external service. In this example, it is simply used to send the
+ /// heartbeat
+ ///
+ /// Because this example is _simple_, we do not spawn a background task that periodically sends
+ /// the heartbeat. In a real world scenario, that background task would be started here.
async fn setup(&mut self) -> Result<(), PluginError> {
println!(
- "Setting up heartbeat service with interval: {}!",
- self.config.interval
+ "Setting up heartbeat service with interval: {:?}!",
+ self.interval_duration
);
- for service in &self.monitored_services {
- println!("Sending heartbeat to service");
- service.send(Heartbeat).await.unwrap();
- }
+ let mut interval = tokio::time::interval(self.interval_duration);
+ let services = self.monitored_services.clone();
+
+ tokio::spawn(async move {
+ loop {
+ interval.tick().await;
+ for service in &services {
+ println!("Sending heartbeat to service: {:?}", service);
+ service.send(Heartbeat).await.unwrap();
+ }
+ }
+ });
Ok(())
}
+ /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down
async fn shutdown(&mut self) -> Result<(), PluginError> {
println!("Shutting down heartbeat service!");
Ok(())
@@ -100,16 +120,18 @@ impl Plugin for HeartbeatService {
impl HeartbeatService {
fn new(
- config: HeartbeatConfig,
- monitored_services: Vec<Address<CriticalServiceMessage>>,
+ interval_duration: Duration,
+ monitored_services: Vec<Address<HeartbeatMessages>>,
) -> Self {
Self {
- config,
+ interval_duration,
monitored_services,
}
}
}
+/// The Handle<HeartbeatStatusReply> implementation is called when the HeartbeatService receives a
+/// HeartbeatStatusReply
#[async_trait]
impl Handle<HeartbeatStatusReply> for HeartbeatService {
async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> {
@@ -118,13 +140,16 @@ impl Handle<HeartbeatStatusReply> for HeartbeatService {
}
}
+/// A plugin that receives heartbeats
struct CriticalServiceBuilder;
-tedge_api::make_message_bundle!(struct CriticalServiceMessage(Heartbeat));
+// declare a set of messages that the CriticalService can receive.
+// In this example, it can only receive a Heartbeat.
+tedge_api::make_message_bundle!(struct HeartbeatMessages(Heartbeat));
#[async_trait]
impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder {
- fn kind_name(&self) -> &'static str {
+ fn kind_name() -> &'static str {
todo!()
}
@@ -154,8 +179,11 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder {
}
}
+/// The actual "critical" plugin implementation
struct CriticalService;
+/// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat>
+/// implementation
#[async_trait]
impl Handle<Heartbeat> for CriticalService {
async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> {
@@ -164,6 +192,7 @@ impl Handle<Heartbeat> for CriticalService {
}
}
+/// Because the CriticalService is of course a Plugin, it needs an implementation for that as well.
#[async_trait]
impl Plugin for CriticalService {
async fn setup(&mut self) -> Result<(), PluginError> {
@@ -177,6 +206,16 @@ impl Plugin for CriticalService {
}
}
+//
+// The following pieces of code would be implemented by a "core" component, that is responsible for
+// setting up plugins and their communication.
+//
+// Plugin authors do not need to write this code, but need a basic understanding what it does and
+// how it works.
+// As this is an example, we implement it here to showcase how it is done.
+//
+
+/// Helper type for keeping information about plugins during runtime
#[derive(Debug)]
struct PluginInfo {
types: HashSet<(&'static str, TypeId)>,
@@ -194,6 +233,7 @@ impl Clone for PluginInfo {
}
}
+/// The type that provides the communication infrastructure to the plugins.
#[derive(Clone, Debug)]
struct Communication {
plugins: HashMap<String, PluginInfo>,
@@ -221,6 +261,8 @@ impl PluginDirectory for Communication {
let types = MB::get_ids().into_iter().collect();
let plug = self.plugins.get(name).unwrap_or_else(|| {
+ // This is an example, so we panic!() here.
+ // In real-world, we would do some reporting and return an error
panic!(
"Didn't find plugin with name {}, got: {:?}",
name,
@@ -229,6 +271,8 @@ impl PluginDirectory for Communication {
});
if !plug.types.is_superset(&types) {
+ // This is an example, so we panic!() here
+ // In real-world, we would do some reporting and return an error
panic!(
"Asked for {:#?} but plugin {} only has types {:#?}",
types, name, plug.types,
@@ -237,8 +281,13 @@ impl PluginDirectory for Communication {
Ok(Address::new(plug.sender.clone()))
}
}
+
+ fn get_address_for_core(&self) -> Result<Address<tedge_api::CoreMessages>, PluginError> {
+ todo!()
+ }
}
+/// Helper function
async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin {
let csb = CriticalServiceBuilder;
@@ -247,12 +296,13 @@ async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin {
csb.instantiate(config, comms).await.unwrap()
}
+/// Helper function
async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
let hsb = HeartbeatServiceBuilder;
let config = toml::from_str(
r#"
- interval = 200
+ interval = 500
plugins = ["critical-service"]
"#,
)
@@ -263,13 +313,26 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
#[tokio::main]
async fn main() {
+ // This implementation now ties everything together
+ //
+ // This would be implemented in a CLI binary using the "core" implementation to boot things up.
+ //
+ // Here, we just tie everything together in the minimal possible way, to showcase how such a
+ // setup would basically work.
+
let mut comms = Communication {
plugins: HashMap::new(),
};
+ // in a main(), the core would be told what plugins are available.
+ // This would, in a real-world scenario, not happen on the "communication" type directly.
+ // Still, this needs to be done by a main()-author.
comms.declare_plugin::<CriticalServiceBuilder>("critical-service");
comms.declare_plugin::<HeartbeatServiceBuilder>("heartbeat");
+ // The following would all be handled by the core implementation, a main() author would only
+ // need to call some kind of "run everything" function
+
let mut heartbeat = build_heartbeat_plugin(&mut comms).await;
let mut critical_service = build_critical_plugin(&mut comms).await;
@@ -287,7 +350,7 @@ async fn main() {
let hb_handle = tokio::task::spawn(async move {
let hb = heartbeat;
- for msg in recv.recv().await {
+ while let Some(msg) = recv.recv().await {
hb.handle_message(msg).await.unwrap();
}
@@ -305,7 +368,7 @@ async fn main() {
let cs_handle = tokio::task::spawn(async move {
let cs = critical_service;
- for msg in recv.recv().await {
+ while let Some(msg) = recv.recv().await {
println!("Critical service received message!");
cs.handle_message(msg).await.unwrap();
}