summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-08-05 14:31:04 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-08-30 13:54:48 +0200
commit33dd6e68f81e0ed51b95c90d175bebb567df0d8b (patch)
tree446afefdab9932dab5698dcaff5220422ba1ca66
parent08a32f3294d65b4011e54838fc67c792cf8f3ff0 (diff)
Add plugin_maxpost-merge/plugin-max
This patch adds a "max" plugin for filtering a timeframe for a maximum measurement. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock16
-rw-r--r--Cargo.toml1
-rw-r--r--plugins/plugin_max/Cargo.toml20
-rw-r--r--plugins/plugin_max/README.md31
-rw-r--r--plugins/plugin_max/src/lib.rs174
5 files changed, 242 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0aa66dcb..619cc77a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2498,6 +2498,22 @@ dependencies = [
]
[[package]]
+name = "plugin_max"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "miette",
+ "serde",
+ "tedge_api",
+ "tedge_lib",
+ "thiserror",
+ "tokio",
+ "tokio-util 0.7.3",
+ "toml",
+ "tracing",
+]
+
+[[package]]
name = "plugin_measurement_filter"
version = "0.1.0"
dependencies = [
diff --git a/Cargo.toml b/Cargo.toml
index c2a5b5f6..ce762185 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@ members = [
"plugins/plugin_httpstop",
"plugins/plugin_inotify",
"plugins/plugin_log",
+ "plugins/plugin_max",
"plugins/plugin_measurement_filter",
"plugins/plugin_mqtt",
"plugins/plugin_mqtt_measurement_bridge",
diff --git a/plugins/plugin_max/Cargo.toml b/plugins/plugin_max/Cargo.toml
new file mode 100644
index 00000000..a045e578
--- /dev/null
+++ b/plugins/plugin_max/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "plugin_max"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-trait = "0.1.57"
+miette = "4.4"
+serde = { version = "1.0.136", features = ["derive"] }
+thiserror = "1"
+tokio = { version = "1", features = ["macros", "rt", "sync", "time"] }
+tokio-util = "0.7.0"
+toml = "0.5"
+tracing = "0.1"
+
+tedge_api = { path = "../../crates/core/tedge_api" }
+tedge_lib = { path = "../../crates/core/tedge_lib" }
+
diff --git a/plugins/plugin_max/README.md b/plugins/plugin_max/README.md
new file mode 100644
index 00000000..414759d9
--- /dev/null
+++ b/plugins/plugin_max/README.md
@@ -0,0 +1,31 @@
+# plugin_max
+
+This showcases a rather simplistic "max" plugin, that collects incoming
+measuremens (messages of kind `Measurement`) and periodically sends out the
+maximum of the collected values of the last timeframe.
+
+## Note
+
+Currently, only integer measurements are supported.
+
+
+## Configuration
+
+The configuration of the plugin can have the following fields
+
+* `timeframe`: How long to collect messages before sending them out.
+ E.G.: "1min"
+* `target`: Whom to send the average to
+
+
+Example configuration:
+
+```toml
+# For a reference what format is supported here, please see
+# https://docs.rs/humantime/latest/humantime/
+timeframe = "1min"
+
+target = "my_other_plugin"
+```
+
+
diff --git a/plugins/plugin_max/src/lib.rs b/plugins/plugin_max/src/lib.rs
new file mode 100644
index 00000000..6fab9001
--- /dev/null
+++ b/plugins/plugin_max/src/lib.rs
@@ -0,0 +1,174 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use tedge_api::plugin::PluginExt;
+use tokio_util::sync::CancellationToken;
+
+use tedge_api::address::ReplySenderFor;
+use tedge_api::plugin::Handle;
+use tedge_api::Address;
+use tedge_api::Plugin;
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+use tedge_lib::measurement::Measurement;
+use tedge_lib::measurement::MeasurementValue;
+
+use tokio::sync::RwLock;
+
+pub struct MaxPluginBuilder;
+
+#[derive(serde::Deserialize, Debug, tedge_api::Config)]
+struct MaxConfig {
+ /// The duration of the time window to calculate the average for
+ timeframe: tedge_lib::config::Humantime,
+
+ /// The name of the plugin to send the result to
+ target: tedge_lib::config::Address,
+}
+
+#[derive(Debug, miette::Diagnostic, thiserror::Error)]
+enum Error {
+ #[error("Failed to parse configuration")]
+ ConfigParseFailed(#[from] toml::de::Error),
+
+ #[error("Failed to send value to target address")]
+ SendingValueFailed,
+}
+
+#[async_trait::async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for MaxPluginBuilder {
+ fn kind_name() -> &'static str {
+ "max"
+ }
+
+ fn kind_configuration() -> Option<tedge_api::ConfigDescription> {
+ Some(<MaxConfig as tedge_api::AsConfig>::as_config())
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into()
+ .map(|_: MaxConfig| ())
+ .map_err(Error::from)
+ .map_err(PluginError::from)
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<tedge_api::plugin::BuiltPlugin, PluginError> {
+ let config = config.try_into::<MaxConfig>().map_err(Error::from)?;
+
+ let address = config.target.build(plugin_dir)?;
+ Ok(MaxPlugin::new(address, config.timeframe.into_duration(), cancellation_token).finish())
+ }
+
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
+ MaxPlugin::get_handled_types()
+ }
+}
+
+tedge_api::make_receiver_bundle!(struct MeasurementReceiver(Measurement));
+
+#[derive(Debug)]
+struct MaxPlugin {
+ addr: Address<MeasurementReceiver>,
+ timeframe: Duration,
+ value: Arc<RwLock<f64>>,
+ cancellation_token: CancellationToken,
+}
+
+impl tedge_api::plugin::PluginDeclaration for MaxPlugin {
+ type HandledMessages = (Measurement,);
+}
+
+impl MaxPlugin {
+ fn new(addr: Address<MeasurementReceiver>, timeframe: Duration, cancellation_token: CancellationToken) -> Self {
+ Self {
+ addr,
+ timeframe,
+ value: Arc::new(RwLock::new(0.0)),
+ cancellation_token,
+ }
+ }
+}
+
+#[async_trait::async_trait]
+impl Plugin for MaxPlugin {
+ #[tracing::instrument(name = "plugin.max.main")]
+ async fn main(&self) -> Result<(), PluginError> {
+ loop {
+ match tokio::time::timeout(self.timeframe, self.cancellation_token.cancelled()).await {
+ Err(_elapsed) => {
+ let msmt = {
+ let max = self.value.write().await;
+ Measurement::new("max".to_string(), MeasurementValue::Float(*max))
+ };
+
+ self.addr
+ .send_and_wait(msmt)
+ .await
+ .map_err(|_| Error::SendingValueFailed)?;
+ }
+ Ok(_) => { // cancelled
+ break
+ },
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[async_trait::async_trait]
+impl Handle<Measurement> for MaxPlugin {
+ #[tracing::instrument(name = "plugin.max.handle_message")]
+ async fn handle_message(
+ &self,
+ message: Measurement,
+ _reply: ReplySenderFor<Measurement>,
+ ) -> Result<(), PluginError> {
+ let value = match message.value() {
+ MeasurementValue::Float(f) => Some(f),
+ other => {
+ tracing::error!(
+ "Received measurement that I cannot handle: {} = {}",
+ message.name(),
+ measurement_to_str(other)
+ );
+ None
+ }
+ };
+
+ if let Some(value) = value {
+ let mut val = self.value.write().await;
+ if *value > *val {
+ *val = *value;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+fn measurement_to_str(val: &MeasurementValue) -> &'static str {
+ match val {
+ MeasurementValue::Bool(_) => "Bool",
+ MeasurementValue::Float(_) => "Float",
+ MeasurementValue::Text(_) => "Str",
+ MeasurementValue::List(_) => "List",
+ MeasurementValue::Map(_) => "Map",
+ _ => "Unknown",
+ }
+}