diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-05 14:31:04 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-30 13:54:48 +0200 |
commit | 33dd6e68f81e0ed51b95c90d175bebb567df0d8b (patch) | |
tree | 446afefdab9932dab5698dcaff5220422ba1ca66 | |
parent | 08a32f3294d65b4011e54838fc67c792cf8f3ff0 (diff) |
Add plugin_maxpost-merge/plugin-max
This patch adds a "max" plugin for filtering a timeframe for a maximum
measurement.
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | Cargo.lock | 16 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/plugin_max/Cargo.toml | 20 | ||||
-rw-r--r-- | plugins/plugin_max/README.md | 31 | ||||
-rw-r--r-- | plugins/plugin_max/src/lib.rs | 174 |
5 files changed, 242 insertions, 0 deletions
@@ -2498,6 +2498,22 @@ dependencies = [ ] [[package]] +name = "plugin_max" +version = "0.1.0" +dependencies = [ + "async-trait", + "miette", + "serde", + "tedge_api", + "tedge_lib", + "thiserror", + "tokio", + "tokio-util 0.7.3", + "toml", + "tracing", +] + +[[package]] name = "plugin_measurement_filter" version = "0.1.0" dependencies = [ @@ -10,6 +10,7 @@ members = [ "plugins/plugin_httpstop", "plugins/plugin_inotify", "plugins/plugin_log", + "plugins/plugin_max", "plugins/plugin_measurement_filter", "plugins/plugin_mqtt", "plugins/plugin_mqtt_measurement_bridge", diff --git a/plugins/plugin_max/Cargo.toml b/plugins/plugin_max/Cargo.toml new file mode 100644 index 00000000..a045e578 --- /dev/null +++ b/plugins/plugin_max/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "plugin_max" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.57" +miette = "4.4" +serde = { version = "1.0.136", features = ["derive"] } +thiserror = "1" +tokio = { version = "1", features = ["macros", "rt", "sync", "time"] } +tokio-util = "0.7.0" +toml = "0.5" +tracing = "0.1" + +tedge_api = { path = "../../crates/core/tedge_api" } +tedge_lib = { path = "../../crates/core/tedge_lib" } + diff --git a/plugins/plugin_max/README.md b/plugins/plugin_max/README.md new file mode 100644 index 00000000..414759d9 --- /dev/null +++ b/plugins/plugin_max/README.md @@ -0,0 +1,31 @@ +# plugin_max + +This showcases a rather simplistic "max" plugin, that collects incoming +measuremens (messages of kind `Measurement`) and periodically sends out the +maximum of the collected values of the last timeframe. + +## Note + +Currently, only integer measurements are supported. + + +## Configuration + +The configuration of the plugin can have the following fields + +* `timeframe`: How long to collect messages before sending them out. + E.G.: "1min" +* `target`: Whom to send the average to + + +Example configuration: + +```toml +# For a reference what format is supported here, please see +# https://docs.rs/humantime/latest/humantime/ +timeframe = "1min" + +target = "my_other_plugin" +``` + + diff --git a/plugins/plugin_max/src/lib.rs b/plugins/plugin_max/src/lib.rs new file mode 100644 index 00000000..6fab9001 --- /dev/null +++ b/plugins/plugin_max/src/lib.rs @@ -0,0 +1,174 @@ +use std::sync::Arc; +use std::time::Duration; + +use tedge_api::plugin::PluginExt; +use tokio_util::sync::CancellationToken; + +use tedge_api::address::ReplySenderFor; +use tedge_api::plugin::Handle; +use tedge_api::Address; +use tedge_api::Plugin; +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; +use tedge_lib::measurement::Measurement; +use tedge_lib::measurement::MeasurementValue; + +use tokio::sync::RwLock; + +pub struct MaxPluginBuilder; + +#[derive(serde::Deserialize, Debug, tedge_api::Config)] +struct MaxConfig { + /// The duration of the time window to calculate the average for + timeframe: tedge_lib::config::Humantime, + + /// The name of the plugin to send the result to + target: tedge_lib::config::Address, +} + +#[derive(Debug, miette::Diagnostic, thiserror::Error)] +enum Error { + #[error("Failed to parse configuration")] + ConfigParseFailed(#[from] toml::de::Error), + + #[error("Failed to send value to target address")] + SendingValueFailed, +} + +#[async_trait::async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for MaxPluginBuilder { + fn kind_name() -> &'static str { + "max" + } + + fn kind_configuration() -> Option<tedge_api::ConfigDescription> { + Some(<MaxConfig as tedge_api::AsConfig>::as_config()) + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: MaxConfig| ()) + .map_err(Error::from) + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<tedge_api::plugin::BuiltPlugin, PluginError> { + let config = config.try_into::<MaxConfig>().map_err(Error::from)?; + + let address = config.target.build(plugin_dir)?; + Ok(MaxPlugin::new(address, config.timeframe.into_duration(), cancellation_token).finish()) + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + MaxPlugin::get_handled_types() + } +} + +tedge_api::make_receiver_bundle!(struct MeasurementReceiver(Measurement)); + +#[derive(Debug)] +struct MaxPlugin { + addr: Address<MeasurementReceiver>, + timeframe: Duration, + value: Arc<RwLock<f64>>, + cancellation_token: CancellationToken, +} + +impl tedge_api::plugin::PluginDeclaration for MaxPlugin { + type HandledMessages = (Measurement,); +} + +impl MaxPlugin { + fn new(addr: Address<MeasurementReceiver>, timeframe: Duration, cancellation_token: CancellationToken) -> Self { + Self { + addr, + timeframe, + value: Arc::new(RwLock::new(0.0)), + cancellation_token, + } + } +} + +#[async_trait::async_trait] +impl Plugin for MaxPlugin { + #[tracing::instrument(name = "plugin.max.main")] + async fn main(&self) -> Result<(), PluginError> { + loop { + match tokio::time::timeout(self.timeframe, self.cancellation_token.cancelled()).await { + Err(_elapsed) => { + let msmt = { + let max = self.value.write().await; + Measurement::new("max".to_string(), MeasurementValue::Float(*max)) + }; + + self.addr + .send_and_wait(msmt) + .await + .map_err(|_| Error::SendingValueFailed)?; + } + Ok(_) => { // cancelled + break + }, + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Handle<Measurement> for MaxPlugin { + #[tracing::instrument(name = "plugin.max.handle_message")] + async fn handle_message( + &self, + message: Measurement, + _reply: ReplySenderFor<Measurement>, + ) -> Result<(), PluginError> { + let value = match message.value() { + MeasurementValue::Float(f) => Some(f), + other => { + tracing::error!( + "Received measurement that I cannot handle: {} = {}", + message.name(), + measurement_to_str(other) + ); + None + } + }; + + if let Some(value) = value { + let mut val = self.value.write().await; + if *value > *val { + *val = *value; + } + } + + Ok(()) + } +} + +fn measurement_to_str(val: &MeasurementValue) -> &'static str { + match val { + MeasurementValue::Bool(_) => "Bool", + MeasurementValue::Float(_) => "Float", + MeasurementValue::Text(_) => "Str", + MeasurementValue::List(_) => "List", + MeasurementValue::Map(_) => "Map", + _ => "Unknown", + } +} |