From e0b49f03d6f9c21adc1150f2dfb90c40e68884ce Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 8 Apr 2022 20:32:55 +0200 Subject: Add plugin_measurement_filter This patch initially adds a "measurement_filter" plugin, which can be used to extract values from measurement messages and apply a filter function on them. If the filter matches, the plugin forwards the message to a certain other plugin, if not it can optionally send the message to an alternative plugin. Signed-off-by: Matthias Beyer --- Cargo.lock | 86 ++++++++++++ Cargo.toml | 1 + plugins/plugin_measurement_filter/Cargo.toml | 21 +++ plugins/plugin_measurement_filter/README.md | 55 ++++++++ plugins/plugin_measurement_filter/src/builder.rs | 70 ++++++++++ plugins/plugin_measurement_filter/src/config.rs | 36 +++++ plugins/plugin_measurement_filter/src/extractor.rs | 152 +++++++++++++++++++++ plugins/plugin_measurement_filter/src/filter.rs | 122 +++++++++++++++++ plugins/plugin_measurement_filter/src/lib.rs | 8 ++ plugins/plugin_measurement_filter/src/plugin.rs | 79 +++++++++++ 10 files changed, 630 insertions(+) create mode 100644 plugins/plugin_measurement_filter/Cargo.toml create mode 100644 plugins/plugin_measurement_filter/README.md create mode 100644 plugins/plugin_measurement_filter/src/builder.rs create mode 100644 plugins/plugin_measurement_filter/src/config.rs create mode 100644 plugins/plugin_measurement_filter/src/extractor.rs create mode 100644 plugins/plugin_measurement_filter/src/filter.rs create mode 100644 plugins/plugin_measurement_filter/src/lib.rs create mode 100644 plugins/plugin_measurement_filter/src/plugin.rs diff --git a/Cargo.lock b/Cargo.lock index 02435664..37f9ca4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,41 @@ dependencies = [ "syn 1.0.82", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.32", + "quote 1.0.10", + "strsim 0.10.0", + "syn 1.0.82", +] + +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core", + "quote 1.0.10", + "syn 1.0.82", +] + [[package]] name = "data-encoding" version = "2.3.2" @@ -1270,6 +1305,12 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -2063,6 +2104,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "plugin_measurement_filter" +version = "0.1.0" +dependencies = [ + "async-trait", + "miette", + "serde", + "serde_with", + "tedge_api", + "tedge_lib", + "tokio", + "tokio-util 0.7.0", + "toml", + "tracing", +] + [[package]] name = "plugin_sm" version = "0.5.2" @@ -2654,6 +2711,12 @@ dependencies = [ "base64", ] +[[package]] +name = "rustversion" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -2803,6 +2866,29 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "946fa04a8ac43ff78a1f4b811990afb9ddbdf5890b46d6dda0ba1998230138b7" +dependencies = [ + "rustversion", + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling", + "proc-macro2 1.0.32", + "quote 1.0.10", + "syn 1.0.82", +] + [[package]] name = "serial_test" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 42724e2d..09770b9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "plugins/plugin_httpstop", "plugins/plugin_inotify", "plugins/plugin_log", + "plugins/plugin_measurement_filter", "plugins/plugin_sysstat", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", diff --git a/plugins/plugin_measurement_filter/Cargo.toml b/plugins/plugin_measurement_filter/Cargo.toml new file mode 100644 index 00000000..bb80818d --- /dev/null +++ b/plugins/plugin_measurement_filter/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "plugin_measurement_filter" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +miette = "4.4" +async-trait = "0.1" +tracing = "0.1" +serde = { version = "1", features = ["derive"] } +serde_with = "1.12" +tokio = { version = "1", features = ["rt"] } +tokio-util = "0.7.0" + +tedge_api = { path = "../../crates/core/tedge_api" } +tedge_lib = { path = "../../crates/core/tedge_lib" } + +[dev-dependencies] +toml = "0.5" diff --git a/plugins/plugin_measurement_filter/README.md b/plugins/plugin_measurement_filter/README.md new file mode 100644 index 00000000..d695150b --- /dev/null +++ b/plugins/plugin_measurement_filter/README.md @@ -0,0 +1,55 @@ +# plugin_measurement_filter + +The "measurement_filter" plugin can be used to extract values from measurement +messages and apply a filter function on them. If the filter matches, the plugin +forwards the message to a certain plugin, if not it can optionally send the +message to an alternative plugin. + + +## Configuration + +The plugin configuration is made out of four values: + +* The target plugin to send messages to that are filtered +* An optional alternative plugin that messages get send to that are "filtered + out" +* An extractor, that must be used to extract a value from a measurement message + for filtering +* A filter predicate + +An example would look like this: + +```toml +target = "logger" +filtered_target = "some_other_plugin" # optional + +# Extract the value from messages named "temperature" at field "fahrenheit" +extractor = "temperature.fahrenheit" + +# Messages with fahrenheit > 70 are send to "target", all others to +# "filtered_target" or dropped +more_than = 70.0 +``` + + +## Available filter predicates + +```toml +# Boolean +is = true + +# Float +less_than = 10.0 +more_than = 10.0 + +# String +contains = "foo" +excludes = "foo" +``` + +One of them must be used, using multiple is not supported. + +If a filter does not match the expected type (e.g. the value to filter is a +boolean, but you try to filter with `more_than = 10.0`) the filter +implementation will return false. + diff --git a/plugins/plugin_measurement_filter/src/builder.rs b/plugins/plugin_measurement_filter/src/builder.rs new file mode 100644 index 00000000..05b7e369 --- /dev/null +++ b/plugins/plugin_measurement_filter/src/builder.rs @@ -0,0 +1,70 @@ +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; + +use tedge_api::plugin::BuiltPlugin; +use tedge_api::plugin::HandleTypes; +use tedge_api::plugin::PluginExt; +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; +use tedge_lib::measurement::Measurement; + +use crate::config::MeasurementFilterConfig; +use crate::plugin::MeasurementFilterPlugin; + +tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement)); + +pub struct MeasurementFilterPluginBuilder; + +#[async_trait] +impl PluginBuilder for MeasurementFilterPluginBuilder +where + PD: PluginDirectory, +{ + fn kind_name() -> &'static str { + "measurement_filter" + } + + fn kind_message_types() -> HandleTypes + where + Self: Sized, + { + MeasurementFilterPlugin::get_handled_types() + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: MeasurementFilterConfig| ()) + .map_err(|_| miette::miette!("Failed to parse filter configuration")) + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result { + let config = config + .try_into::() + .map_err(|_| miette::miette!("Failed to parse filter configuration"))?; + + let main_addr = plugin_dir.get_address_for(&config.target)?; + let filtered_addr = config + .filtered_target + .as_ref() + .map(|filtered| plugin_dir.get_address_for(filtered)) + .transpose()?; + + Ok({ + MeasurementFilterPlugin::new(main_addr, filtered_addr, config.extractor, config.filter) + .finish() + }) + } +} diff --git a/plugins/plugin_measurement_filter/src/config.rs b/plugins/plugin_measurement_filter/src/config.rs new file mode 100644 index 00000000..46536bd3 --- /dev/null +++ b/plugins/plugin_measurement_filter/src/config.rs @@ -0,0 +1,36 @@ +#[derive(Debug, serde::Deserialize)] +#[serde_with::serde_as] +pub struct MeasurementFilterConfig { + pub(crate) target: String, + pub(crate) filtered_target: Option, + + #[serde_as(as = "TryFromInto")] + pub(crate) extractor: crate::extractor::Extractor, + + #[serde(flatten)] + pub(crate) filter: crate::filter::Filter, +} + +#[cfg(test)] +mod tests { + use super::MeasurementFilterConfig; + use crate::extractor::Token; + use crate::filter::Filter; + + #[test] + fn test_deserialize() { + let s = r#" + target = "foo" + filtered_target = "bar" + extractor = "foo.bar" + is = true + "#; + + let c: MeasurementFilterConfig = toml::from_str(s).unwrap(); + assert_eq!(c.target, "foo"); + assert_eq!(c.filtered_target, Some("bar".to_string())); + assert_eq!(c.extractor.0, vec![Token::Key("foo".to_string()), Token::Key("bar".to_string())]); + assert_eq!(c.filter, Filter::Is(true)); + } +} + diff --git a/plugins/plugin_measurement_filter/src/extractor.rs b/plugins/plugin_measurement_filter/src/extractor.rs new file mode 100644 index 00000000..08a533ba --- /dev/null +++ b/plugins/plugin_measurement_filter/src/extractor.rs @@ -0,0 +1,152 @@ +use tracing::trace; + +#[derive(Debug, serde_with::DeserializeFromStr)] +pub struct Extractor(pub(crate) Vec); + +impl From for String { + fn from(e: Extractor) -> String { + e.0.iter().map(Token::to_string).collect() + } +} + +impl std::str::FromStr for Extractor { + type Err = String; + + fn from_str(s: &str) -> Result { + // TODO: Make this implementation bullet-proof with nom or something like that + let v = s.split('.') + .map(Token::try_from) + .collect::, _>>()?; + + if v.is_empty() || v.get(0).map(|t| t.is_empty()).unwrap_or(false) { + return Err("Empty extractor".to_string()) + } + + Ok(Extractor(v)) + } +} + +#[derive(Debug, serde::Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub enum Token { + Key(String), + Index(usize), +} + +impl Token { + /// Helper for checking whether a Token::Key is empty + fn is_empty(&self) -> bool { + match self { + Token::Key(s) => s.is_empty(), + _ => false, + } + } +} + +impl ToString for Token { + fn to_string(&self) -> String { + match self { + Token::Key(s) => s.to_string(), + Token::Index(u) => u.to_string(), + } + } +} + +impl TryFrom<&str> for Token { + type Error = String; + + fn try_from(s: &str) -> Result { + use std::str::FromStr; + match usize::from_str(s) { + Ok(u) => Ok(Token::Index(u)), + Err(_) => Ok(Token::Key(s.to_string())), + } + } +} + +pub trait Extractable { + type Output: Sized; + fn extract<'a>(&'a self, extractor: &[Token]) -> Option<&'a Self::Output>; +} + +use tedge_lib::measurement::Measurement; +use tedge_lib::measurement::MeasurementValue; + +impl Extractable for Measurement { + type Output = MeasurementValue; + + fn extract<'a>(&'a self, extractor: &[Token]) -> Option<&'a Self::Output> { + if let Some(next_token) = extractor.get(0) { + match next_token { + Token::Index(_) => None, + Token::Key(key) => if key == self.name() { + self.value().extract(&extractor[1..]) + } else { + None + } + } + } else { + None + } + } +} + +impl Extractable for tedge_lib::measurement::MeasurementValue { + type Output = tedge_lib::measurement::MeasurementValue; + + fn extract<'a>(&'a self, extractor: &[Token]) -> Option<&'a Self::Output> { + if let Some(next_token) = extractor.get(0) { + match (next_token, self) { + (Token::Index(idx), MeasurementValue::List(lst)) => { + trace!("Fetching '{}' from {:?}", idx, lst); + lst.get(*idx).and_then(|v| v.extract(&extractor[1..])) + }, + (Token::Key(key), MeasurementValue::Map(map)) => { + trace!("Fetching '{}' from {:?}", key, map); + map.get(key).and_then(|v| v.extract(&extractor[1..])) + }, + + (_, MeasurementValue::Bool(_)) => None, + (_, MeasurementValue::Float(_)) => None, + (_, MeasurementValue::Text(_)) => None, + (Token::Index(_), _) => None, + (Token::Key(_), _) => None, + } + } else { + trace!("Found value: {:?}", self); + Some(self) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + + #[test] + fn test_extracting_empty_string() { + assert!(Extractor::from_str("").is_err()); + } + + #[test] + fn test_extracting_single_key() { + let ext = Extractor::from_str("foo").unwrap().0; + let exp = vec![Token::Key("foo".to_string())]; + assert_eq!(ext, exp); + } + + #[test] + fn test_extracting_key_index() { + let ext = Extractor::from_str("foo.5").unwrap().0; + let exp = vec![Token::Key("foo".to_string()), Token::Index(5)]; + assert_eq!(ext, exp); + } + + #[test] + fn test_extracting_key_index_key() { + let ext = Extractor::from_str("foo.5.bar").unwrap().0; + let exp = vec![Token::Key("foo".to_string()), Token::Index(5), Token::Key("bar".to_string())]; + assert_eq!(ext, exp); + } +} diff --git a/plugins/plugin_measurement_filter/src/filter.rs b/plugins/plugin_measurement_filter/src/filter.rs new file mode 100644 index 00000000..7ccf7341 --- /dev/null +++ b/plugins/plugin_measurement_filter/src/filter.rs @@ -0,0 +1,122 @@ +use tedge_lib::measurement::MeasurementValue; +use tracing::trace; + +#[derive(Debug, serde::Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub enum Filter { + #[serde(rename = "is")] + Is(bool), + + #[serde(rename = "less_than")] + LessThan(f64), + + #[serde(rename = "more_than")] + MoreThan(f64), + + #[serde(rename = "contains")] + Contains(String), + + #[serde(rename = "excludes")] + Excludes(String), +} + +pub trait Filterable { + fn apply_filter(&self, filter: &Filter) -> bool; +} + +impl Filterable for MeasurementValue { + fn apply_filter(&self, filter: &Filter) -> bool { + trace!("Filtering with {:?}: {:?}", filter, self); + match (self, filter) { + (MeasurementValue::Bool(b1), Filter::Is(b2)) => b1 == b2, + (MeasurementValue::Bool(_), _) => false, + + (MeasurementValue::Float(f1), Filter::LessThan(f2)) => f1 < f2, + (MeasurementValue::Float(f1), Filter::MoreThan(f2)) => f1 > f2, + (MeasurementValue::Float(_), _) => false, + + (MeasurementValue::Text(t1), Filter::Contains(t2)) => t1.contains(t2), + (MeasurementValue::Text(t1), Filter::Excludes(t2)) => !t1.contains(t2), + (MeasurementValue::Text(_), _) => false, + + (_, _) => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_filter_msmt_bool() { + let msmt = MeasurementValue::Bool(false); + let filt = Filter::Is(false); + assert!(msmt.apply_filter(&filt)); + + let filt = Filter::Is(true); + assert!(!msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_msmt_lt() { + let msmt = MeasurementValue::Float(10.0); + let filt = Filter::LessThan(20.0); + assert!(msmt.apply_filter(&filt)); + + let filt = Filter::LessThan(5.0); + assert!(!msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_msmt_gt() { + let msmt = MeasurementValue::Float(10.0); + let filt = Filter::MoreThan(20.0); + assert!(!msmt.apply_filter(&filt)); + + let filt = Filter::MoreThan(5.0); + assert!(msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_msmt_contains() { + let msmt = MeasurementValue::Text("foobar".to_string()); + let filt = Filter::Contains("oob".to_string()); + assert!(msmt.apply_filter(&filt)); + + let filt = Filter::Contains("kittens".to_string()); + assert!(!msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_msmt_excludes() { + let msmt = MeasurementValue::Text("foobar".to_string()); + let filt = Filter::Excludes("oob".to_string()); + assert!(!msmt.apply_filter(&filt)); + + let filt = Filter::Excludes("kittens".to_string()); + assert!(msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_nonmatching_bool() { + let msmt = MeasurementValue::Bool(false); + let filt = Filter::Excludes("kittens".to_string()); + assert!(!msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_nonmatching_float() { + let msmt = MeasurementValue::Float(1.0); + let filt = Filter::Excludes("kittens".to_string()); + assert!(!msmt.apply_filter(&filt)); + } + + #[test] + fn test_filter_nonmatching_string() { + let msmt = MeasurementValue::Text("foobar".to_string()); + let filt = Filter::Is(true); + assert!(!msmt.apply_filter(&filt)); + } + +} diff --git a/plugins/plugin_measurement_filter/src/lib.rs b/plugins/plugin_measurement_filter/src/lib.rs new file mode 100644 index 00000000..95bd51e3 --- /dev/null +++ b/plugins/plugin_measurement_filter/src/lib.rs @@ -0,0 +1,8 @@ +mod builder; +mod config; +mod extractor; +mod filter; +mod plugin; + +pub use builder::MeasurementFilterPluginBuilder; +pub use plugin::MeasurementFilterPlugin; diff --git a/plugins/plugin_measurement_filter/src/plugin.rs b/plugins/plugin_measurement_filter/src/plugin.rs new file mode 100644 index 00000000..02a4f396 --- /dev/null +++ b/plugins/plugin_measurement_filter/src/plugin.rs @@ -0,0 +1,79 @@ +use async_trait::async_trait; +use tracing::debug; + +use tedge_api::address::Address; +use tedge_api::address::ReplySender; +use tedge_api::error::PluginError; +use tedge_api::plugin::Handle; +use tedge_api::plugin::Message; +use tedge_api::plugin::Plugin; +use tedge_lib::measurement::Measurement; +use tracing::trace; + +use crate::builder::MeasurementReceiver; +use crate::extractor::Extractable; +use crate::filter::Filterable; + +pub struct MeasurementFilterPlugin { + target: Address, + filtered_target: Option>, + + extractor: crate::extractor::Extractor, + filter: crate::filter::Filter, +} + +impl MeasurementFilterPlugin { + pub fn new( + target: Address, + filtered_target: Option>, + extractor: crate::extractor::Extractor, + filter: crate::filter::Filter, + ) -> Self { + Self { + target, + filtered_target, + extractor, + filter, + } + } +} + +impl tedge_api::plugin::PluginDeclaration for MeasurementFilterPlugin { + type HandledMessages = (Measurement,); +} + +#[async_trait] +impl Plugin for MeasurementFilterPlugin { + async fn start(&mut self) -> Result<(), PluginError> { + debug!("Setting up filter plugin"); + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + debug!("Shutting down filter plugin!"); + Ok(()) + } +} + +#[async_trait] +impl Handle for MeasurementFilterPlugin { + async fn handle_message( + &self, + message: Measurement, + _sender: ReplySender<::Reply>, + ) -> Result<(), PluginError> { + trace!("Extracting with {:?} from {:?}", self.extractor, message); + if let Some(value) = message.extract(&self.extractor.0) { + trace!("Applying filter {:?} to value {:?}", self.filter, value); + if value.apply_filter(&self.filter) { + let _ = self.target.send(message).await; + } else { + if let Some(ftarget) = self.filtered_target.as_ref() { + let _ = ftarget.send(message).await; + } + } + } + Ok(()) + } +} + -- cgit v1.2.3 From a2b9bdd90f9be9393a6b42a475c6b74fc51da0df Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 12 Apr 2022 14:41:40 +0200 Subject: Add measurement-filter plugin in CLI Signed-off-by: Matthias Beyer --- Cargo.lock | 1 + tedge/Cargo.toml | 3 +++ tedge/example-config.toml | 23 ++++++++++++++++++++++- tedge/src/main.rs | 6 ++++++ 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 37f9ca4c..0e58ab56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3102,6 +3102,7 @@ dependencies = [ "plugin_httpstop", "plugin_inotify", "plugin_log", + "plugin_measurement_filter", "plugin_sysstat", "tedge_api", "tedge_core", diff --git a/tedge/Cargo.toml b/tedge/Cargo.toml index a7eb3b40..15efdb9e 100644 --- a/tedge/Cargo.toml +++ b/tedge/Cargo.toml @@ -26,6 +26,7 @@ plugin_log = { path = "../plugins/plugin_log", optional = true } plugin_sysstat = { path = "../plugins/plugin_sysstat", optional = true } plugin_inotify = { path = "../plugins/plugin_inotify", optional = true } plugin_httpstop = { path = "../plugins/plugin_httpstop", optional = true } +plugin_measurement_filter = { path = "../plugins/plugin_measurement_filter", optional = true } [features] @@ -36,6 +37,7 @@ default = [ "builtin_plugin_sysstat", "builtin_plugin_inotify", "builtin_plugin_httpstop", + "builtin_plugin_measurement_filter", ] core_debugging = ["console-subscriber"] @@ -45,4 +47,5 @@ builtin_plugin_log = ["plugin_log"] builtin_plugin_sysstat = ["plugin_sysstat"] builtin_plugin_inotify = ["plugin_inotify"] builtin_plugin_httpstop = ["plugin_httpstop"] +builtin_plugin_measurement_filter = ["plugin_measurement_filter"] diff --git a/tedge/example-config.toml b/tedge/example-config.toml index 3195b9b7..54e0c8a8 100644 --- a/tedge/example-config.toml +++ b/tedge/example-config.toml @@ -15,11 +15,32 @@ acknowledge = false setup_logger = false +[plugins.warn-logging] +kind = "log" + +[plugins.warn-logging.configuration] +name = "logging" +level = "warn" +acknowledge = false +setup_logger = false + + +[plugins.filter-high-cpu] +kind = "measurement_filter" + +[plugins.filter-high-cpu.configuration] +target = "warn-logging" +filtered_target = "logging" +extractor = "global_processor_info.cpu_usage" +more_than = 12.0 + + + [plugins.syscpu] kind = "sysinfo" [plugins.syscpu.configuration.cpu] -send_to = ["logging"] +send_to = ["filter-high-cpu"] interval_ms = 1000 diff --git a/tedge/src/main.rs b/tedge/src/main.rs index 53fe68ec..8296dcff 100644 --- a/tedge/src/main.rs +++ b/tedge/src/main.rs @@ -88,6 +88,12 @@ async fn main() -> miette::Result<()> { plugin_httpstop::HttpStopPluginBuilder, plugin_httpstop::HttpStopPluginBuilder ); + let application = register_plugin!( + application, + "builtin_plugin_measurement_filter", + plugin_measurement_filter::MeasurementFilterPluginBuilder, + plugin_measurement_filter::MeasurementFilterPluginBuilder + ); let (cancel_sender, application) = application.with_config(config).into_diagnostic()?; info!("Application built"); -- cgit v1.2.3