summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-04-14 15:58:53 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-04-14 15:58:53 +0200
commit8ba3d69e75e602f90abb8489d17658ca37aed444 (patch)
tree11df14938cb8a15dc9489841283e9604c3e771e5
parentbb8f1c8267fe48dfd6221f59f86f590d2fe51300 (diff)
parenta2b9bdd90f9be9393a6b42a475c6b74fc51da0df (diff)
Merge branch 'feature/add_tedge_api/plugin-filter' into feature/add_tedge_api_impl
Integrate the "measurement_filter" plugin into the implementation branch. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock87
-rw-r--r--Cargo.toml1
-rw-r--r--plugins/plugin_measurement_filter/Cargo.toml21
-rw-r--r--plugins/plugin_measurement_filter/README.md55
-rw-r--r--plugins/plugin_measurement_filter/src/builder.rs70
-rw-r--r--plugins/plugin_measurement_filter/src/config.rs36
-rw-r--r--plugins/plugin_measurement_filter/src/extractor.rs152
-rw-r--r--plugins/plugin_measurement_filter/src/filter.rs122
-rw-r--r--plugins/plugin_measurement_filter/src/lib.rs8
-rw-r--r--plugins/plugin_measurement_filter/src/plugin.rs79
-rw-r--r--tedge/Cargo.toml3
-rw-r--r--tedge/example-config.toml23
-rw-r--r--tedge/src/main.rs6
13 files changed, 662 insertions, 1 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 02435664..0e58ab56 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -689,6 +689,41 @@ dependencies = [
]
[[package]]
+name = "darling"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
+dependencies = [
+ "darling_core",
+ "darling_macro",
+]
+
+[[package]]
+name = "darling_core"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
+dependencies = [
+ "fnv",
+ "ident_case",
+ "proc-macro2 1.0.32",
+ "quote 1.0.10",
+ "strsim 0.10.0",
+ "syn 1.0.82",
+]
+
+[[package]]
+name = "darling_macro"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
+dependencies = [
+ "darling_core",
+ "quote 1.0.10",
+ "syn 1.0.82",
+]
+
+[[package]]
name = "data-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1271,6 +1306,12 @@ dependencies = [
]
[[package]]
+name = "ident_case"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
+
+[[package]]
name = "idna"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2064,6 +2105,22 @@ dependencies = [
]
[[package]]
+name = "plugin_measurement_filter"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "miette",
+ "serde",
+ "serde_with",
+ "tedge_api",
+ "tedge_lib",
+ "tokio",
+ "tokio-util 0.7.0",
+ "toml",
+ "tracing",
+]
+
+[[package]]
name = "plugin_sm"
version = "0.5.2"
dependencies = [
@@ -2655,6 +2712,12 @@ dependencies = [
]
[[package]]
+name = "rustversion"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f"
+
+[[package]]
name = "rusty-fork"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2804,6 +2867,29 @@ dependencies = [
]
[[package]]
+name = "serde_with"
+version = "1.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "946fa04a8ac43ff78a1f4b811990afb9ddbdf5890b46d6dda0ba1998230138b7"
+dependencies = [
+ "rustversion",
+ "serde",
+ "serde_with_macros",
+]
+
+[[package]]
+name = "serde_with_macros"
+version = "1.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082"
+dependencies = [
+ "darling",
+ "proc-macro2 1.0.32",
+ "quote 1.0.10",
+ "syn 1.0.82",
+]
+
+[[package]]
name = "serial_test"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3016,6 +3102,7 @@ dependencies = [
"plugin_httpstop",
"plugin_inotify",
"plugin_log",
+ "plugin_measurement_filter",
"plugin_sysstat",
"tedge_api",
"tedge_core",
diff --git a/Cargo.toml b/Cargo.toml
index 42724e2d..09770b9a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,7 @@ members = [
"plugins/plugin_httpstop",
"plugins/plugin_inotify",
"plugins/plugin_log",
+ "plugins/plugin_measurement_filter",
"plugins/plugin_sysstat",
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
diff --git a/plugins/plugin_measurement_filter/Cargo.toml b/plugins/plugin_measurement_filter/Cargo.toml
new file mode 100644
index 00000000..bb80818d
--- /dev/null
+++ b/plugins/plugin_measurement_filter/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "plugin_measurement_filter"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+miette = "4.4"
+async-trait = "0.1"
+tracing = "0.1"
+serde = { version = "1", features = ["derive"] }
+serde_with = "1.12"
+tokio = { version = "1", features = ["rt"] }
+tokio-util = "0.7.0"
+
+tedge_api = { path = "../../crates/core/tedge_api" }
+tedge_lib = { path = "../../crates/core/tedge_lib" }
+
+[dev-dependencies]
+toml = "0.5"
diff --git a/plugins/plugin_measurement_filter/README.md b/plugins/plugin_measurement_filter/README.md
new file mode 100644
index 00000000..d695150b
--- /dev/null
+++ b/plugins/plugin_measurement_filter/README.md
@@ -0,0 +1,55 @@
+# plugin_measurement_filter
+
+The "measurement_filter" plugin can be used to extract values from measurement
+messages and apply a filter function on them. If the filter matches, the plugin
+forwards the message to a certain plugin, if not it can optionally send the
+message to an alternative plugin.
+
+
+## Configuration
+
+The plugin configuration is made out of four values:
+
+* The target plugin to send messages to that are filtered
+* An optional alternative plugin that messages get send to that are "filtered
+ out"
+* An extractor, that must be used to extract a value from a measurement message
+ for filtering
+* A filter predicate
+
+An example would look like this:
+
+```toml
+target = "logger"
+filtered_target = "some_other_plugin" # optional
+
+# Extract the value from messages named "temperature" at field "fahrenheit"
+extractor = "temperature.fahrenheit"
+
+# Messages with fahrenheit > 70 are send to "target", all others to
+# "filtered_target" or dropped
+more_than = 70.0
+```
+
+
+## Available filter predicates
+
+```toml
+# Boolean
+is = true
+
+# Float
+less_than = 10.0
+more_than = 10.0
+
+# String
+contains = "foo"
+excludes = "foo"
+```
+
+One of them must be used, using multiple is not supported.
+
+If a filter does not match the expected type (e.g. the value to filter is a
+boolean, but you try to filter with `more_than = 10.0`) the filter
+implementation will return false.
+
diff --git a/plugins/plugin_measurement_filter/src/builder.rs b/plugins/plugin_measurement_filter/src/builder.rs
new file mode 100644
index 00000000..05b7e369
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/builder.rs
@@ -0,0 +1,70 @@
+use async_trait::async_trait;
+use tokio_util::sync::CancellationToken;
+
+use tedge_api::plugin::BuiltPlugin;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::plugin::PluginExt;
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+use tedge_lib::measurement::Measurement;
+
+use crate::config::MeasurementFilterConfig;
+use crate::plugin::MeasurementFilterPlugin;
+
+tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement));
+
+pub struct MeasurementFilterPluginBuilder;
+
+#[async_trait]
+impl<PD> PluginBuilder<PD> for MeasurementFilterPluginBuilder
+where
+ PD: PluginDirectory,
+{
+ fn kind_name() -> &'static str {
+ "measurement_filter"
+ }
+
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized,
+ {
+ MeasurementFilterPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into()
+ .map(|_: MeasurementFilterConfig| ())
+ .map_err(|_| miette::miette!("Failed to parse filter configuration"))
+ .map_err(PluginError::from)
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ let config = config
+ .try_into::<MeasurementFilterConfig>()
+ .map_err(|_| miette::miette!("Failed to parse filter configuration"))?;
+
+ let main_addr = plugin_dir.get_address_for(&config.target)?;
+ let filtered_addr = config
+ .filtered_target
+ .as_ref()
+ .map(|filtered| plugin_dir.get_address_for(filtered))
+ .transpose()?;
+
+ Ok({
+ MeasurementFilterPlugin::new(main_addr, filtered_addr, config.extractor, config.filter)
+ .finish()
+ })
+ }
+}
diff --git a/plugins/plugin_measurement_filter/src/config.rs b/plugins/plugin_measurement_filter/src/config.rs
new file mode 100644
index 00000000..46536bd3
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/config.rs
@@ -0,0 +1,36 @@
+#[derive(Debug, serde::Deserialize)]
+#[serde_with::serde_as]
+pub struct MeasurementFilterConfig {
+ pub(crate) target: String,
+ pub(crate) filtered_target: Option<String>,
+
+ #[serde_as(as = "TryFromInto<String>")]
+ pub(crate) extractor: crate::extractor::Extractor,
+
+ #[serde(flatten)]
+ pub(crate) filter: crate::filter::Filter,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::MeasurementFilterConfig;
+ use crate::extractor::Token;
+ use crate::filter::Filter;
+
+ #[test]
+ fn test_deserialize() {
+ let s = r#"
+ target = "foo"
+ filtered_target = "bar"
+ extractor = "foo.bar"
+ is = true
+ "#;
+
+ let c: MeasurementFilterConfig = toml::from_str(s).unwrap();
+ assert_eq!(c.target, "foo");
+ assert_eq!(c.filtered_target, Some("bar".to_string()));
+ assert_eq!(c.extractor.0, vec![Token::Key("foo".to_string()), Token::Key("bar".to_string())]);
+ assert_eq!(c.filter, Filter::Is(true));
+ }
+}
+
diff --git a/plugins/plugin_measurement_filter/src/extractor.rs b/plugins/plugin_measurement_filter/src/extractor.rs
new file mode 100644
index 00000000..08a533ba
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/extractor.rs
@@ -0,0 +1,152 @@
+use tracing::trace;
+
+#[derive(Debug, serde_with::DeserializeFromStr)]
+pub struct Extractor(pub(crate) Vec<Token>);
+
+impl From<Extractor> for String {
+ fn from(e: Extractor) -> String {
+ e.0.iter().map(Token::to_string).collect()
+ }
+}
+
+impl std::str::FromStr for Extractor {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Extractor, Self::Err> {
+ // TODO: Make this implementation bullet-proof with nom or something like that
+ let v = s.split('.')
+ .map(Token::try_from)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ if v.is_empty() || v.get(0).map(|t| t.is_empty()).unwrap_or(false) {
+ return Err("Empty extractor".to_string())
+ }
+
+ Ok(Extractor(v))
+ }
+}
+
+#[derive(Debug, serde::Deserialize)]
+#[cfg_attr(test, derive(PartialEq))]
+pub enum Token {
+ Key(String),
+ Index(usize),
+}
+
+impl Token {
+ /// Helper for checking whether a Token::Key is empty
+ fn is_empty(&self) -> bool {
+ match self {
+ Token::Key(s) => s.is_empty(),
+ _ => false,
+ }
+ }
+}
+
+impl ToString for Token {
+ fn to_string(&self) -> String {
+ match self {
+ Token::Key(s) => s.to_string(),
+ Token::Index(u) => u.to_string(),
+ }
+ }
+}
+
+impl TryFrom<&str> for Token {
+ type Error = String;
+
+ fn try_from(s: &str) -> Result<Token, Self::Error> {
+ use std::str::FromStr;
+ match usize::from_str(s) {
+ Ok(u) => Ok(Token::Index(u)),
+ Err(_) => Ok(Token::Key(s.to_string())),
+ }
+ }
+}
+
+pub trait Extractable {
+ type Output: Sized;
+ fn extract<'a>(&'a self, extractor: &[Token]) -> Option<&'a Self::Output>;
+}
+
+use tedge_lib::measurement::Measurement;
+use tedge_lib::measurement::MeasurementValue;
+
+impl Extractable for Measurement {
+ type Output = MeasurementValue;
+
+ fn extract<'a>(&'a self, extractor: &[Token]) -> Option<&'a Self::Output> {
+ if let Some(next_token) = extractor.get(0) {
+ match next_token {
+ Token::Index(_) => None,
+ Token::Key(key) => if key == self.name() {
+ self.value().extract(&extractor[1..])
+ } else {
+ None
+ }
+ }
+ } else {
+ None
+ }
+ }
+}
+
+impl Extractable for tedge_lib::measurement::MeasurementValue {
+ type Output = tedge_lib::measurement::MeasurementValue;
+
+ fn extract<'a>(&'a self, extractor: &[Token]) -> Option<&'a Self::Output> {
+ if let Some(next_token) = extractor.get(0) {
+ match (next_token, self) {
+ (Token::Index(idx), MeasurementValue::List(lst)) => {
+ trace!("Fetching '{}' from {:?}", idx, lst);
+ lst.get(*idx).and_then(|v| v.extract(&extractor[1..]))
+ },
+ (Token::Key(key), MeasurementValue::Map(map)) => {
+ trace!("Fetching '{}' from {:?}", key, map);
+ map.get(key).and_then(|v| v.extract(&extractor[1..]))
+ },
+
+ (_, MeasurementValue::Bool(_)) => None,
+ (_, MeasurementValue::Float(_)) => None,
+ (_, MeasurementValue::Text(_)) => None,
+ (Token::Index(_), _) => None,
+ (Token::Key(_), _) => None,
+ }
+ } else {
+ trace!("Found value: {:?}", self);
+ Some(self)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::str::FromStr;
+
+ #[test]
+ fn test_extracting_empty_string() {
+ assert!(Extractor::from_str("").is_err());
+ }
+
+ #[test]
+ fn test_extracting_single_key() {
+ let ext = Extractor::from_str("foo").unwrap().0;
+ let exp = vec![Token::Key("foo".to_string())];
+ assert_eq!(ext, exp);
+ }
+
+ #[test]
+ fn test_extracting_key_index() {
+ let ext = Extractor::from_str("foo.5").unwrap().0;
+ let exp = vec![Token::Key("foo".to_string()), Token::Index(5)];
+ assert_eq!(ext, exp);
+ }
+
+ #[test]
+ fn test_extracting_key_index_key() {
+ let ext = Extractor::from_str("foo.5.bar").unwrap().0;
+ let exp = vec![Token::Key("foo".to_string()), Token::Index(5), Token::Key("bar".to_string())];
+ assert_eq!(ext, exp);
+ }
+}
diff --git a/plugins/plugin_measurement_filter/src/filter.rs b/plugins/plugin_measurement_filter/src/filter.rs
new file mode 100644
index 00000000..7ccf7341
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/filter.rs
@@ -0,0 +1,122 @@
+use tedge_lib::measurement::MeasurementValue;
+use tracing::trace;
+
+#[derive(Debug, serde::Deserialize)]
+#[cfg_attr(test, derive(PartialEq))]
+pub enum Filter {
+ #[serde(rename = "is")]
+ Is(bool),
+
+ #[serde(rename = "less_than")]
+ LessThan(f64),
+
+ #[serde(rename = "more_than")]
+ MoreThan(f64),
+
+ #[serde(rename = "contains")]
+ Contains(String),
+
+ #[serde(rename = "excludes")]
+ Excludes(String),
+}
+
+pub trait Filterable {
+ fn apply_filter(&self, filter: &Filter) -> bool;
+}
+
+impl Filterable for MeasurementValue {
+ fn apply_filter(&self, filter: &Filter) -> bool {
+ trace!("Filtering with {:?}: {:?}", filter, self);
+ match (self, filter) {
+ (MeasurementValue::Bool(b1), Filter::Is(b2)) => b1 == b2,
+ (MeasurementValue::Bool(_), _) => false,
+
+ (MeasurementValue::Float(f1), Filter::LessThan(f2)) => f1 < f2,
+ (MeasurementValue::Float(f1), Filter::MoreThan(f2)) => f1 > f2,
+ (MeasurementValue::Float(_), _) => false,
+
+ (MeasurementValue::Text(t1), Filter::Contains(t2)) => t1.contains(t2),
+ (MeasurementValue::Text(t1), Filter::Excludes(t2)) => !t1.contains(t2),
+ (MeasurementValue::Text(_), _) => false,
+
+ (_, _) => false,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_filter_msmt_bool() {
+ let msmt = MeasurementValue::Bool(false);
+ let filt = Filter::Is(false);
+ assert!(msmt.apply_filter(&filt));
+
+ let filt = Filter::Is(true);
+ assert!(!msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_msmt_lt() {
+ let msmt = MeasurementValue::Float(10.0);
+ let filt = Filter::LessThan(20.0);
+ assert!(msmt.apply_filter(&filt));
+
+ let filt = Filter::LessThan(5.0);
+ assert!(!msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_msmt_gt() {
+ let msmt = MeasurementValue::Float(10.0);
+ let filt = Filter::MoreThan(20.0);
+ assert!(!msmt.apply_filter(&filt));
+
+ let filt = Filter::MoreThan(5.0);
+ assert!(msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_msmt_contains() {
+ let msmt = MeasurementValue::Text("foobar".to_string());
+ let filt = Filter::Contains("oob".to_string());
+ assert!(msmt.apply_filter(&filt));
+
+ let filt = Filter::Contains("kittens".to_string());
+ assert!(!msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_msmt_excludes() {
+ let msmt = MeasurementValue::Text("foobar".to_string());
+ let filt = Filter::Excludes("oob".to_string());
+ assert!(!msmt.apply_filter(&filt));
+
+ let filt = Filter::Excludes("kittens".to_string());
+ assert!(msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_nonmatching_bool() {
+ let msmt = MeasurementValue::Bool(false);
+ let filt = Filter::Excludes("kittens".to_string());
+ assert!(!msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_nonmatching_float() {
+ let msmt = MeasurementValue::Float(1.0);
+ let filt = Filter::Excludes("kittens".to_string());
+ assert!(!msmt.apply_filter(&filt));
+ }
+
+ #[test]
+ fn test_filter_nonmatching_string() {
+ let msmt = MeasurementValue::Text("foobar".to_string());
+ let filt = Filter::Is(true);
+ assert!(!msmt.apply_filter(&filt));
+ }
+
+}
diff --git a/plugins/plugin_measurement_filter/src/lib.rs b/plugins/plugin_measurement_filter/src/lib.rs
new file mode 100644
index 00000000..95bd51e3
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/lib.rs
@@ -0,0 +1,8 @@
+mod builder;
+mod config;
+mod extractor;
+mod filter;
+mod plugin;
+
+pub use builder::MeasurementFilterPluginBuilder;
+pub use plugin::MeasurementFilterPlugin;
diff --git a/plugins/plugin_measurement_filter/src/plugin.rs b/plugins/plugin_measurement_filter/src/plugin.rs
new file mode 100644
index 00000000..02a4f396
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/plugin.rs
@@ -0,0 +1,79 @@
+use async_trait::async_trait;
+use tracing::debug;
+
+use tedge_api::address::Address;
+use tedge_api::address::ReplySender;
+use tedge_api::error::PluginError;
+use tedge_api::plugin::Handle;
+use tedge_api::plugin::Message;
+use tedge_api::plugin::Plugin;
+use tedge_lib::measurement::Measurement;
+use tracing::trace;
+
+use crate::builder::MeasurementReceiver;
+use crate::extractor::Extractable;
+use crate::filter::Filterable;
+
+pub struct MeasurementFilterPlugin {
+ target: Address<MeasurementReceiver>,
+ filtered_target: Option<Address<MeasurementReceiver>>,
+
+ extractor: crate::extractor::Extractor,
+ filter: crate::filter::Filter,
+}
+
+impl MeasurementFilterPlugin {
+ pub fn new(
+ target: Address<MeasurementReceiver>,
+ filtered_target: Option<Address<MeasurementReceiver>>,
+ extractor: crate::extractor::Extractor,
+ filter: crate::filter::Filter,
+ ) -> Self {
+ Self {
+ target,
+ filtered_target,
+ extractor,
+ filter,
+ }
+ }
+}
+
+impl tedge_api::plugin::PluginDeclaration for MeasurementFilterPlugin {
+ type HandledMessages = (Measurement,);
+}
+
+#[async_trait]
+impl Plugin for MeasurementFilterPlugin {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ debug!("Setting up filter plugin");
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ debug!("Shutting down filter plugin!");
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Handle<Measurement> for MeasurementFilterPlugin {
+ async fn handle_message(
+ &self,
+ message: Measurement,
+ _sender: ReplySender<<Measurement as Message>::Reply>,
+ ) -> Result<(), PluginError> {
+ trace!("Extracting with {:?} from {:?}", self.extractor, message);
+ if let Some(value) = message.extract(&self.extractor.0) {
+ trace!("Applying filter {:?} to value {:?}", self.filter, value);
+ if value.apply_filter(&self.filter) {
+ let _ = self.target.send(message).await;
+ } else {
+ if let Some(ftarget) = self.filtered_target.as_ref() {
+ let _ = ftarget.send(message).await;
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
diff --git a/tedge/Cargo.toml b/tedge/Cargo.toml
index a7eb3b40..15efdb9e 100644
--- a/tedge/Cargo.toml
+++ b/tedge/Cargo.toml
@@ -26,6 +26,7 @@ plugin_log = { path = "../plugins/plugin_log", optional = true }
plugin_sysstat = { path = "../plugins/plugin_sysstat", optional = true }
plugin_inotify = { path = "../plugins/plugin_inotify", optional = true }
plugin_httpstop = { path = "../plugins/plugin_httpstop", optional = true }
+plugin_measurement_filter = { path = "../plugins/plugin_measurement_filter", optional = true }
[features]
@@ -36,6 +37,7 @@ default = [
"builtin_plugin_sysstat",
"builtin_plugin_inotify",
"builtin_plugin_httpstop",
+ "builtin_plugin_measurement_filter",
]
core_debugging = ["console-subscriber"]
@@ -45,4 +47,5 @@ builtin_plugin_log = ["plugin_log"]
builtin_plugin_sysstat = ["plugin_sysstat"]
builtin_plugin_inotify = ["plugin_inotify"]
builtin_plugin_httpstop = ["plugin_httpstop"]
+builtin_plugin_measurement_filter = ["plugin_measurement_filter"]
diff --git a/tedge/example-config.toml b/tedge/example-config.toml
index 3195b9b7..54e0c8a8 100644
--- a/tedge/example-config.toml
+++ b/tedge/example-config.toml
@@ -15,11 +15,32 @@ acknowledge = false
setup_logger = false
+[plugins.warn-logging]
+kind = "log"
+
+[plugins.warn-logging.configuration]
+name = "logging"
+level = "warn"
+acknowledge = false
+setup_logger = false
+
+
+[plugins.filter-high-cpu]
+kind = "measurement_filter"
+
+[plugins.filter-high-cpu.configuration]
+target = "warn-logging"
+filtered_target = "logging"
+extractor = "global_processor_info.cpu_usage"
+more_than = 12.0
+
+
+
[plugins.syscpu]
kind = "sysinfo"
[plugins.syscpu.configuration.cpu]
-send_to = ["logging"]
+send_to = ["filter-high-cpu"]
interval_ms = 1000
diff --git a/tedge/src/main.rs b/tedge/src/main.rs
index 53fe68ec..8296dcff 100644
--- a/tedge/src/main.rs
+++ b/tedge/src/main.rs
@@ -88,6 +88,12 @@ async fn main() -> miette::Result<()> {
plugin_httpstop::HttpStopPluginBuilder,
plugin_httpstop::HttpStopPluginBuilder
);
+ let application = register_plugin!(
+ application,
+ "builtin_plugin_measurement_filter",
+ plugin_measurement_filter::MeasurementFilterPluginBuilder,
+ plugin_measurement_filter::MeasurementFilterPluginBuilder
+ );
let (cancel_sender, application) = application.with_config(config).into_diagnostic()?;
info!("Application built");