summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-07-15 17:28:05 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-08-30 13:52:16 +0200
commit49aac61a251a95db7bc8e8b3589caa1a99db682d (patch)
tree835ae93fa00b14f09a38f687ad5cd0aca1921646 /crates/core
parent1f898e1459a1ab451aac7ef09bd1bf0585172124 (diff)
parented1808a9be40210b4d4d3e304f2582a7883b3211 (diff)
Merge 'feature/add_tedge_api_only' into integrate-feature-api
This merge introduces the thin-edge.io API definition to the codebase. The merge is done to an integration-branch because of the longevity of the `feature/add_tedge_api_only` branch, which branched off of `main` in February 2022. Because of the long development time, `Cargo.lock` has now conflicts. We cannot rebase the feature branch anymore, as it would only be effort that can be prevented by an integration branch such as this. So, here we introduce the API. This API does _only_ define an interface how thin-edge.io plugins can be written in a way so they can be used within the thin-edge.io ecosystem. That means, by using this API, a plugin author gets the following things for free (these are high-level bullet points): * Running their plugin concurrently to other components/plugins of thin-edge.io * Sending messages to other components of thin-edge.io * These messages are typed. This means that they are normal rust objects. A plugin author does not have to concern themselves with serialization and deserialization of messages, as there is none * Message passing is lightweight, no external processes are needed and sending tons (literally thousands) of messages is not a problem * Receiving messages from other components of thin-edge.io * Concurrently * handling them in an asynchronous way * without de/serialization overhead * Safety from crashes. If a plugin crashes, it does not bring down the rest of the application. All other plugins continue to run * Compatibility of components is a compiletime assurance! * A clear shutdown path, if a shutdown was requested for the application Besides these rather technical points, we get the following (very high level) benefits: * Error handling is uniform and streamlined. Providing excellent error messages to the user is a matter of writing down good error messages, not of implementing good error handling * Logging is uniform over all components of the software * Tracing how a message propagated through the software is easily doable, as only one process is involved and the excellent `tracing` ecosystem provides the heavy lifting for us * runtime performance analytics a developer might want to do is easy with `tracing` as well * High performance is ensured via the `tokio` runtime, having tons of tasks running in a concurrent way is handled by the runtime, a developer has not to concern themselves with it All changes that are merged with this commit are made under the DCO and _not_ under any CLA. DCO: Developer Certificate of Origin Version 1.1 Copyright (C) 2004, 2006 The Linux Foundation and its contributors. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. Developer's Certificate of Origin 1.1 By making a contribution to this project, I certify that: (a) The contribution was created in whole or in part by me and I have the right to submit it under the open source license indicated in the file; or (b) The contribution is based upon previous work that, to the best of my knowledge, is covered under an appropriate open source license and I have the right under that license to submit that work with modifications, whether created in whole or in part by me, under the same open source license (unless I am permitted to submit under a different license), as indicated in the file; or (c) The contribution was provided directly to me by some other person who certified (a), (b) or (c) and I have not modified it. (d) I understand and agree that this project and the contribution are public and that a record of the contribution (including all personal information I submit with it, including my sign-off) is maintained indefinitely and may be redistributed consistent with this project or the open source license(s) involved. Signed-off-by: Marcel Müller <m.mueller@ifm.com> Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/tedge_api/Cargo.toml29
-rw-r--r--crates/core/tedge_api/README.md137
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs498
-rw-r--r--crates/core/tedge_api/examples/print_config.rs150
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs444
-rw-r--r--crates/core/tedge_api/goals.md363
-rw-r--r--crates/core/tedge_api/src/address.rs472
-rw-r--r--crates/core/tedge_api/src/config.rs387
-rw-r--r--crates/core/tedge_api/src/error.rs17
-rw-r--r--crates/core/tedge_api/src/lib.rs39
-rw-r--r--crates/core/tedge_api/src/message.rs187
-rw-r--r--crates/core/tedge_api/src/plugin.rs699
-rw-r--r--crates/core/tedge_api/tedge_config_derive/Cargo.toml15
-rw-r--r--crates/core/tedge_api/tedge_config_derive/src/lib.rs382
-rw-r--r--crates/core/tedge_api/tests/derive_config.rs76
15 files changed, 3895 insertions, 0 deletions
diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml
new file mode 100644
index 00000000..06184c32
--- /dev/null
+++ b/crates/core/tedge_api/Cargo.toml
@@ -0,0 +1,29 @@
+[package]
+name = "tedge_api"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-trait = "0.1.52"
+downcast-rs = "1.2.0"
+futures = "0.3.21"
+miette = "4.4.0"
+thiserror = "1.0.30"
+tokio = { version = "1.16.1", features = ["sync", "time"] }
+tokio-util = "0.7.0"
+toml = "0.5.8"
+serde = { version = "1.0.136", features = ["derive"] }
+pretty = { version = "0.11.3", features = ["termcolor"] }
+termcolor = "1.1.3"
+termimad = "0.20.1"
+nu-ansi-term = "0.45.1"
+tedge_config_derive = { version = "0.1.0", path = "tedge_config_derive" }
+tracing = "0.1"
+bevy_reflect = "0.7.0"
+
+[dev-dependencies]
+pretty_assertions = "1.2.1"
+static_assertions = "1.1.0"
+tokio = { version = "1.16.1", features = ["full"] }
diff --git a/crates/core/tedge_api/README.md b/crates/core/tedge_api/README.md
new file mode 100644
index 00000000..07a4a5bc
--- /dev/null
+++ b/crates/core/tedge_api/README.md
@@ -0,0 +1,137 @@
+# Thin Edge API
+
+thin-edge is made up out of "Plugins"[^1] which pass messages to eachother.
+These plugins run on a "Core", which handles the message passing and the plugin
+lifecycle.
+This crate defines the interfaces a plugin author needs to implement so that a
+plugin can be built into thin-edge.
+
+
+## What a Plugin is
+
+A Plugin is a piece of code that covers some usecase. That usecase has to be
+exposed to the thin-edge.io ecosystem with the following functionalities (from a
+high level):
+
+* There's a function to "instantiate" a `Plugin`
+* There's a function to "start" a `Plugin`
+* A plugin can expose a number of API endpoints that it can receive messages on
+* There's a function for cleanly shutting a `Plugin` down
+
+The implementation of what we call the "core" of thin-edge.io is then
+responsible of instantiating a plugin upon user request, start it, and send it
+messages or forward messages the plugin emits to other plugins.
+
+Messages are just objects that are defined via Rust structures. See below for
+more information.
+
+If shutdown of thin-edge.io is requested, the core is also responsible of
+shutting a plugin down.
+
+The core is responsible to protect each plugin from crashes of other plugins and
+guarantee safe operation of all plugins in the ecosystem.
+
+
+## API to implement
+
+To implement a Plugin and bring functionality to the thin-edge.io ecosystem,
+the following API endpoints need to be implemented:
+
+* The `PluginBuilder` trait defines the interface that will be used to
+ instantiate a `Plugin`
+* The `Plugin` trait defines the interface that is used to start the
+ instantiated `Plugin` and to shut it down
+* The `Handle` trait defines the interfaces a `Plugin` instance can receive
+ messages on. This trait can be implemented multiple times for each `Plugin`,
+ one time for each message type this plugin is able to receive.
+
+The following (simplified) diagram should describe this in a visual way:
+
+<!--
+the "aquamarine" crate does not yet support rendering this in rustdoc.
+See: https://github.com/mersinvald/aquamarine/issues/19
+-->
+```mermaid
+classDiagram
+ class PluginBuilder
+ <<interface>> PluginBuilder
+ PluginBuilder : +instantiate() ~Plugin~
+
+ class Plugin
+ <<interface>> Plugin
+ Plugin : +start()
+ Plugin : +shutdown()
+
+ class Handle
+ <<interface>> Handle~Message~
+ Handle : +handle_message(~Message~ message)
+
+ class MyPluginBuilder
+ MyPluginBuilder <|-- PluginBuilder : implements
+
+ class MyPlugin
+ MyPlugin <|-- Plugin : implements
+ MyPlugin <|-- Handle~MyMessage~ : implements
+```
+
+## What a Message is
+
+A message can be anything that is able to implement the `Message` trait.
+
+This trait does not require the message to implement functionality, it just
+requires an implementing structure to implement `Debug` and `Send` and some
+others.
+
+For example:
+
+```rust
+use tedge_api::Message;
+
+#[derive(Debug, bevy_reflect::TypeUuid)]
+#[uuid = "b60dd50c-ccef-4204-b370-18bbbb68d6e2"]
+struct Value(f64);
+
+impl Message for Value {}
+```
+
+## How messages are send
+
+Messages can be send between plugins, but can also be send to the core of
+thin-edge.io and to the plugin itself (a Plugin can send messages to itself).
+
+To be able to send a Message, an `Address` of that Plugin needs to be known.
+That `Address` can be fetched during the instantiation of the Plugin. The
+`PluginBuilder::instantiate()` function gets a reference to a `PluginDirectory`.
+That `PluginDirectory` is basically an "address book", that can be asked for
+addresses to other plugins, by specifying their name.
+That name usually has to be configured.
+
+
+## Plugin lifecycle
+
+The above illustrated how a plugin lifecycle looks like. The following diagram
+is here to visualize the words from above (minus some details that are not
+required for understanding the grand scheme of things):
+
+```mermaid
+sequenceDiagram
+ thinedge->>+PluginBuilder: instantiate()
+ PluginBuilder->>Configuration: get_target_address()
+ Configuration->>PluginBuilder: target_address
+ PluginBuilder->>PluginDirectory: get_address_for<MeasurementReceiver>(target_address)
+ PluginDirectory->>PluginBuilder: Address<MeasurementReceiver>
+ PluginBuilder->>+Plugin: new(Address<MeasurementReceiver>)
+ Plugin-->PluginBuilder: Plugin
+ PluginBuilder->thinedge : Plugin
+ thinedge->>+Plugin : start()
+ Plugin-->-thinedge : start
+ loop Lifecycle
+ thinedge-->Plugin : handle_message()
+ end
+ thinedge->>+Plugin : shutdown()
+ Plugin-->-thinedge : shutdown
+```
+
+
+[^1]: Name is subject to change.
+
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
new file mode 100644
index 00000000..41dc7e74
--- /dev/null
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -0,0 +1,498 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+
+use async_trait::async_trait;
+use bevy_reflect::TypeUuid;
+use futures::FutureExt;
+use tedge_api::{
+ address::ReplySenderFor,
+ message::{AcceptsReplies, Message, MessageType},
+ plugin::{BuiltPlugin, Handle, PluginDeclaration, PluginExt},
+ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
+ PluginError,
+};
+use tokio::sync::RwLock;
+
+#[derive(Debug, TypeUuid)]
+#[uuid = "94916be9-17ba-4bca-a3a0-408d33136fed"]
+/// A message that represents a heartbeat that gets sent to plugins
+struct Heartbeat;
+impl Message for Heartbeat {}
+impl AcceptsReplies for Heartbeat {
+ type Reply = HeartbeatStatus;
+}
+
+#[derive(Debug, TypeUuid)]
+#[uuid = "a6d03c65-51bf-4f89-b383-c67c9ed8533b"]
+/// The reply for a heartbeat
+enum HeartbeatStatus {
+ Alive,
+ Degraded,
+}
+impl Message for HeartbeatStatus {}
+
+/// A PluginBuilder that gets used to build a HeartbeatService plugin instance
+#[derive(Debug)]
+struct HeartbeatServiceBuilder;
+
+#[derive(miette::Diagnostic, thiserror::Error, Debug)]
+enum HeartbeatBuildError {
+ #[error(transparent)]
+ TomlParse(#[from] toml::de::Error),
+}
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
+ fn kind_name() -> &'static str {
+ todo!()
+ }
+
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
+ HeartbeatService::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ _config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ Ok(())
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ PD: 'async_trait,
+ {
+ let hb_config: HeartbeatConfig =
+ toml::Value::try_into(config).map_err(HeartbeatBuildError::from)?;
+ let monitored_services = hb_config
+ .plugins
+ .iter()
+ .map(|name| {
+ plugin_dir
+ .get_address_for::<HeartbeatMessages>(name)
+ .map(|addr| (name.clone(), addr))
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(HeartbeatService::new(
+ Duration::from_millis(hb_config.interval),
+ monitored_services,
+ cancellation_token,
+ )
+ .finish())
+ }
+}
+
+/// The configuration a HeartbeatServices can receive is represented by this type
+#[derive(serde::Deserialize, Debug)]
+struct HeartbeatConfig {
+ interval: u64,
+ plugins: Vec<String>,
+}
+
+/// The HeartbeatService type represents the actual plugin
+struct HeartbeatService {
+ interval_duration: Duration,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
+ cancel_token: CancellationToken,
+}
+
+impl PluginDeclaration for HeartbeatService {
+ type HandledMessages = ();
+}
+
+#[async_trait]
+impl Plugin for HeartbeatService {
+ /// The setup function of the HeartbeatService can be used by the plugin author to setup for
+ /// example a connection to an external service. In this example, it is simply used to send the
+ /// heartbeat
+ ///
+ /// Because this example is _simple_, we do not spawn a background task that periodically sends
+ /// the heartbeat. In a real world scenario, that background task would be started here.
+ async fn start(&mut self) -> Result<(), PluginError> {
+ println!(
+ "HeartbeatService: Setting up heartbeat service with interval: {:?}!",
+ self.interval_duration
+ );
+
+ for service in &self.monitored_services {
+ let mut interval = tokio::time::interval(self.interval_duration);
+ let service = service.clone();
+ let cancel_token = self.cancel_token.child_token();
+ tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = interval.tick() => {}
+ _ = cancel_token.cancelled() => {
+ break
+ }
+ }
+ println!(
+ "HeartbeatService: Sending heartbeat to service: {:?}",
+ service
+ );
+ tokio::select! {
+ reply = service
+ .1
+ .send_and_wait(Heartbeat)
+ .then(|answer| {
+ answer.unwrap()
+ .wait_for_reply(Duration::from_millis(100))}
+ ) => {
+ match reply
+ {
+ Ok(HeartbeatStatus::Alive) => {
+ println!("HeartbeatService: Received all is well!")
+ }
+ Ok(HeartbeatStatus::Degraded) => {
+ println!(
+ "HeartbeatService: Oh-oh! Plugin '{}' is not doing well",
+ service.0
+ )
+ }
+
+ Err(reply_error) => {
+ println!(
+ "HeartbeatService: Critical error for '{}'! {reply_error}",
+ service.0
+ )
+ }
+ }
+ }
+
+ _ = cancel_token.cancelled() => {
+ break
+ }
+ }
+ }
+ });
+ }
+ Ok(())
+ }
+
+ /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ println!("HeartbeatService: Shutting down heartbeat service!");
+ Ok(())
+ }
+}
+
+impl HeartbeatService {
+ fn new(
+ interval_duration: Duration,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
+ cancel_token: CancellationToken,
+ ) -> Self {
+ Self {
+ interval_duration,
+ monitored_services,
+ cancel_token,
+ }
+ }
+}
+
+/// A plugin that receives heartbeats
+struct CriticalServiceBuilder;
+
+// declare a set of messages that the CriticalService can receive.
+// In this example, it can only receive a Heartbeat.
+tedge_api::make_receiver_bundle!(struct HeartbeatMessages(Heartbeat));
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder {
+ fn kind_name() -> &'static str {
+ todo!()
+ }
+
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
+ CriticalService::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ _config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ Ok(())
+ }
+
+ async fn instantiate(
+ &self,
+ _config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ _plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ PD: 'async_trait,
+ {
+ Ok(CriticalService {
+ status: tokio::sync::Mutex::new(true),
+ }
+ .finish())
+ }
+}
+
+/// The actual "critical" plugin implementation
+struct CriticalService {
+ status: tokio::sync::Mutex<bool>,
+}
+
+/// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat>
+/// implementation
+#[async_trait]
+impl Handle<Heartbeat> for CriticalService {
+ async fn handle_message(
+ &self,
+ _message: Heartbeat,
+ sender: ReplySenderFor<Heartbeat>,
+ ) -> Result<(), PluginError> {
+ println!("CriticalService: Received Heartbeat!");
+ let mut status = self.status.lock().await;
+
+ let _ = sender.reply(if *status {
+ println!("CriticalService: Sending back alive!");
+ HeartbeatStatus::Alive
+ } else {
+ println!("CriticalService: Sending back degraded!");
+ HeartbeatStatus::Degraded
+ });
+
+ *status = !*status;
+ Ok(())
+ }
+}
+
+impl PluginDeclaration for CriticalService {
+ type HandledMessages = (Heartbeat,);
+}
+
+/// Because the CriticalService is of course a Plugin, it needs an implementation for that as well.
+#[async_trait]
+impl Plugin for CriticalService {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ println!("CriticalService: Setting up critical service!");
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ println!("CriticalService: Shutting down critical service service!");
+ Ok(())
+ }
+}
+
+//
+// The following pieces of code would be implemented by a "core" component, that is responsible for
+// setting up plugins and their communication.
+//
+// Plugin authors do not need to write this code, but need a basic understanding what it does and
+// how it works.
+// As this is an example, we implement it here to showcase how it is done.
+//
+
+/// Helper type for keeping information about plugins during runtime
+struct PluginInfo {
+ types: Vec<MessageType>,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
+}
+
+/// The type that provides the communication infrastructure to the plugins.
+#[derive(Debug)]
+struct Communication {
+ plugins: HashMap<String, PluginInfo>,
+}
+
+impl Communication {
+ pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
+ let sender = Arc::new(RwLock::new(None));
+ self.plugins.insert(
+ name.to_owned(),
+ PluginInfo {
+ types: PB::kind_message_types().into_types(),
+ sender,
+ },
+ );
+ }
+}
+
+impl PluginDirectory for Communication {
+ fn get_address_for<MB: tedge_api::address::ReceiverBundle>(
+ &self,
+ name: &str,
+ ) -> Result<Address<MB>, tedge_api::error::DirectoryError> {
+ let asked_types: Vec<_> = MB::get_ids().into_iter().collect();
+
+ let plug = self.plugins.get(name).unwrap_or_else(|| {
+ // This is an example, so we panic!() here.
+ // In real-world, we would do some reporting and return an error
+ panic!(
+ "Didn't find plugin with name {}, got: {:?}",
+ name,
+ self.plugins.keys().collect::<Vec<_>>()
+ )
+ });
+
+ if !asked_types
+ .iter()
+ .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type)))
+ {
+ // This is an example, so we panic!() here
+ // In real-world, we would do some reporting and return an error
+ panic!(
+ "Asked for {:#?} but plugin {} only has types {:#?}",
+ asked_types, name, plug.types,
+ );
+ } else {
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
+ }
+ }
+
+ fn get_address_for_core(&self) -> Address<tedge_api::CoreMessages> {
+ todo!()
+ }
+}
+
+/// Helper function
+async fn build_critical_plugin(
+ comms: &mut Communication,
+ cancel_token: CancellationToken,
+) -> BuiltPlugin {
+ let csb = CriticalServiceBuilder;
+
+ let config = toml::from_str("").unwrap();
+
+ csb.instantiate(config, cancel_token, comms).await.unwrap()
+}
+
+/// Helper function
+async fn build_heartbeat_plugin(
+ comms: &mut Communication,
+ cancel_token: CancellationToken,
+) -> BuiltPlugin {
+ let hsb = HeartbeatServiceBuilder;
+
+ let config = toml::from_str(
+ r#"
+ interval = 5000
+ plugins = ["critical-service"]
+ "#,
+ )
+ .unwrap();
+
+ hsb.instantiate(config, cancel_token, comms).await.unwrap()
+}
+
+#[tokio::main]
+async fn main() {
+ // This implementation now ties everything together
+ //
+ // This would be implemented in a CLI binary using the "core" implementation to boot things up.
+ //
+ // Here, we just tie everything together in the minimal possible way, to showcase how such a
+ // setup would basically work.
+
+ let mut comms = Communication {
+ plugins: HashMap::new(),
+ };
+
+ // in a main(), the core would be told what plugins are available.
+ // This would, in a real-world scenario, not happen on the "communication" type directly.
+ // Still, this needs to be done by a main()-author.
+ comms.declare_plugin::<CriticalServiceBuilder>("critical-service");
+ comms.declare_plugin::<HeartbeatServiceBuilder>("heartbeat");
+
+ // The following would all be handled by the core implementation, a main() author would only
+ // need to call some kind of "run everything" function
+
+ let cancel_token = CancellationToken::new();
+
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
+ .unwrap();
+
+ let recv = comms.plugins.get("heartbeat").unwrap();
+
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
+ }
+ .boxed()
+ }));
+ }
+
+ let recv = comms.plugins.get("critical-service").unwrap();
+
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
+ }
+ .boxed()
+ }));
+ }
+
+ println!("Core: Stopping everything in 10 seconds!");
+ tokio::time::sleep(Duration::from_secs(12)).await;
+
+ println!("Core: SHUTTING DOWN");
+ cancel_token.cancel();
+
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
+
+ println!("Core: Shut down");
+}
diff --git a/crates/core/tedge_api/examples/print_config.rs b/crates/core/tedge_api/examples/print_config.rs
new file mode 100644
index 00000000..8be855e0
--- /dev/null
+++ b/crates/core/tedge_api/examples/print_config.rs
@@ -0,0 +1,150 @@
+#![allow(dead_code, unused)]
+use std::collections::HashMap;
+
+use nu_ansi_term::Color;
+use pretty::Arena;
+use tedge_api::{
+ config::{AsConfig, ConfigDescription, ConfigKind},
+ Config,
+};
+struct Port(u64);
+
+impl AsConfig for Port {
+ fn as_config() -> ConfigDescription {
+ ConfigDescription::new(
+ String::from("Integer"),
+ ConfigKind::Integer,
+ Some("A TCP port number is an integer between 0 and 65535"),
+ )
+ }
+}
+
+struct VHost;
+
+impl AsConfig for VHost {
+ fn as_config() -> ConfigDescription {
+ ConfigDescription::new(
+ String::from("VHost"),
+ ConfigKind::Struct(vec![("name", None, String::as_config())]),
+ Some("A virtual host definition"),
+ )
+ }
+}
+
+fn main() {
+ let arena = Arena::new();
+
+ let doc = Vec::<String>::as_config();
+ let rendered_doc = doc.as_terminal_doc(&arena);
+
+ let mut output = String::new();
+
+ rendered_doc.render_fmt(80, &mut output).unwrap();
+
+ println!(
+ "------- Output for {}",
+ std::any::type_name::<Vec<String>>()
+ );
+ println!("{}", output);
+
+ let arena = Arena::new();
+
+ let doc = ConfigDescription::new(
+ String::from("ServerConfig"),
+ ConfigKind::Struct(vec![
+ ("port", None, Port::as_config()),
+ ("interface", None, String::as_config()),
+ ("virtual_hosts", None, Vec::<VHost>::as_config()),
+ ("headers", None, HashMap::<String, String>::as_config()),
+ ]),
+ Some("Specify how the server should be started\n\n## Note\n\nThis is a reallly really loooooooooooooooooong loooooooooooooooooooong new *line*."),
+ );
+ let rendered_doc = doc.as_terminal_doc(&arena);
+
+ let mut output = String::new();
+
+ rendered_doc.render_fmt(80, &mut output).unwrap();
+
+ println!(
+ "Configuration for {} plugin kinds",
+ Color::White.bold().paint(doc.name())
+ );
+ println!(
+ "{}",
+ Color::White.dimmed().bold().paint(format!(
+ "=================={}=============",
+ std::iter::repeat('=')
+ .take(doc.name().len())
+ .collect::<String>()
+ ))
+ );
+ println!("------- Output for ServerConfig");
+ println!("{}", output);
+ let arena = Arena::new();
+
+ #[derive(Config)]
+ #[config(tag = "type")]
+ /// An Nginx virtual host