summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-04-08 12:30:52 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-10 15:24:56 +0200
commit274717453df9f210483180a8c3bf2de036ce7ba0 (patch)
treecd59743abe02b75c9e99d4a5190c440e7860155f
parent2e76f2727129387343d82917bc5881227bb5f881 (diff)
Add new plugin for notifications
This patch adds a plugin that can be used to generate a notification if a message is received. This is helpful in combination with the filter plugin: * Filter measurements for value > 100, if value is greater (condition matches), forward the message to the notification plugin Otherwise send to plugin P * The notification plugin generates a notification if it receives a message and sends this warning to a configured plugin. The message itself is forwarded (to P). Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock26
-rw-r--r--Cargo.toml1
-rw-r--r--plugins/plugin_notification/Cargo.toml24
-rw-r--r--plugins/plugin_notification/README.md17
-rw-r--r--plugins/plugin_notification/src/builder.rs61
-rw-r--r--plugins/plugin_notification/src/config.rs56
-rw-r--r--plugins/plugin_notification/src/lib.rs6
-rw-r--r--plugins/plugin_notification/src/plugin.rs69
8 files changed, 249 insertions, 11 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 538af395..17eae958 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2217,30 +2217,35 @@ dependencies = [
]
[[package]]
-name = "plugin_measurement_threshold"
+name = "plugin_mqtt"
version = "0.1.0"
dependencies = [
- "anyhow",
"async-trait",
+ "backoff 0.4.0",
+ "futures",
"log",
+ "miette",
+ "paho-mqtt",
"serde",
"tedge_api",
"tedge_lib",
+ "thiserror",
+ "tokio",
"tokio-util 0.7.0",
+ "toml",
"tracing",
]
[[package]]
-name = "plugin_mqtt"
+name = "plugin_mqtt_measurement_bridge"
version = "0.1.0"
dependencies = [
"async-trait",
- "backoff 0.4.0",
- "futures",
"log",
"miette",
- "paho-mqtt",
+ "plugin_mqtt",
"serde",
+ "serde_json",
"tedge_api",
"tedge_lib",
"thiserror",
@@ -2251,19 +2256,18 @@ dependencies = [
]
[[package]]
-name = "plugin_mqtt_measurement_bridge"
+name = "plugin_notification"
version = "0.1.0"
dependencies = [
"async-trait",
+ "float-cmp 0.9.0",
"log",
"miette",
- "plugin_mqtt",
+ "nom",
"serde",
- "serde_json",
+ "serde_with",
"tedge_api",
"tedge_lib",
- "thiserror",
- "tokio",
"tokio-util 0.7.0",
"toml",
"tracing",
diff --git a/Cargo.toml b/Cargo.toml
index 0fc7fc0f..ae3478ff 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ members = [
"plugins/plugin_measurement_filter",
"plugins/plugin_mqtt",
"plugins/plugin_mqtt_measurement_bridge",
+ "plugins/plugin_notification",
"plugins/plugin_sysstat",
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
diff --git a/plugins/plugin_notification/Cargo.toml b/plugins/plugin_notification/Cargo.toml
new file mode 100644
index 00000000..b08e97f7
--- /dev/null
+++ b/plugins/plugin_notification/Cargo.toml
@@ -0,0 +1,24 @@
+[package]
+name = "plugin_notification"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-trait = "0.1"
+log = { version = "0.4", features = ["serde"] }
+nom = "7"
+miette = "4.4"
+serde = { version = "1.0.136", features = ["derive"] }
+serde_with = "1"
+tokio-util = "0.7.0"
+tracing = "0.1"
+
+tedge_api = { path = "../../crates/core/tedge_api" }
+tedge_lib = { path = "../../crates/core/tedge_lib" }
+
+[dev-dependencies]
+float-cmp = "0.9"
+toml = "0.5"
+
diff --git a/plugins/plugin_notification/README.md b/plugins/plugin_notification/README.md
new file mode 100644
index 00000000..cd73d675
--- /dev/null
+++ b/plugins/plugin_notification/README.md
@@ -0,0 +1,17 @@
+# plugin_notification
+
+This plugin receives measurements that it forwards to another plugin.
+Every time it receives a measurement, a third plugin is notified.
+
+
+## Configuration
+
+Example configuration of the plugin:
+
+```toml
+forward_to = "other_plugin_name"
+notify = "to_notify_plugin_name"
+raise = "info" # one of "info", "warning", "error"
+raise_message = "Some freeform text"
+```
+
diff --git a/plugins/plugin_notification/src/builder.rs b/plugins/plugin_notification/src/builder.rs
new file mode 100644
index 00000000..c4d68ae0
--- /dev/null
+++ b/plugins/plugin_notification/src/builder.rs
@@ -0,0 +1,61 @@
+use async_trait::async_trait;
+use tokio_util::sync::CancellationToken;
+
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+use tedge_api::plugin::BuiltPlugin;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::plugin::PluginExt;
+use tedge_lib::measurement::Measurement;
+use tedge_lib::notification::Notification;
+
+use crate::config::Config;
+use crate::plugin::NotificationPlugin;
+
+pub struct NotificationPluginBuilder;
+
+tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement));
+tedge_api::make_receiver_bundle!(pub struct NotificationReceiver(Notification));
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for NotificationPluginBuilder {
+ fn kind_name() -> &'static str {
+ "notification"
+ }
+
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized,
+ {
+ NotificationPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into::<Config>()
+ .map_err(|_| miette::miette!("Failed to parse measurement threshold configuration"))
+ .map(|_| ())
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ let config = config
+ .try_into::<Config>()
+ .map_err(|_| miette::miette!("Failed to parse measurement threshold configuration"))?;
+
+ let forward_addr = plugin_dir.get_address_for(&config.forward_to)?;
+ let notify_addr = plugin_dir.get_address_for(&config.notify)?;
+ Ok(NotificationPlugin::new(forward_addr, notify_addr, config.raise, config.raise_message).finish())
+ }
+}
+
diff --git a/plugins/plugin_notification/src/config.rs b/plugins/plugin_notification/src/config.rs
new file mode 100644
index 00000000..95078a7a
--- /dev/null
+++ b/plugins/plugin_notification/src/config.rs
@@ -0,0 +1,56 @@
+use tedge_lib::notification::Notification;
+
+#[derive(Debug, serde::Deserialize)]
+pub struct Config {
+ pub(crate) forward_to: String,
+
+ pub(crate) notify: String,
+
+ pub(crate) raise: NotificationType,
+ pub(crate) raise_message: String,
+}
+
+#[derive(Clone, Copy, Debug, serde::Deserialize)]
+#[cfg_attr(test, derive(PartialEq))]
+pub enum NotificationType {
+ #[serde(rename = "info")]
+ Info,
+
+ #[serde(rename = "warning")]
+ Warning,
+
+ #[serde(rename = "error")]
+ Error,
+}
+
+impl NotificationType {
+ pub(crate) fn into_notification(self, message: String) -> Notification {
+ match self {
+ NotificationType::Info => Notification::info(message),
+ NotificationType::Warning => Notification::warning(message),
+ NotificationType::Error => Notification::error(message),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_config_valid() {
+ let config = r#"
+ forward_to = "foo"
+ notify = "bar"
+ raise = "info"
+ raise_message = "it is getting warm here"
+ "#;
+
+ let cfg: Config = toml::from_str(config).unwrap();
+ assert_eq!(cfg.forward_to, "foo");
+ assert_eq!(cfg.notify, "bar");
+ assert_eq!(cfg.raise, NotificationType::Info);
+ assert_eq!(cfg.raise_message, String::from("it is getting warm here"));
+ }
+}
+
diff --git a/plugins/plugin_notification/src/lib.rs b/plugins/plugin_notification/src/lib.rs
new file mode 100644
index 00000000..85114ef8
--- /dev/null
+++ b/plugins/plugin_notification/src/lib.rs
@@ -0,0 +1,6 @@
+mod builder;
+mod config;
+mod plugin;
+
+pub use crate::builder::NotificationPluginBuilder;
+pub use crate::plugin::NotificationPlugin;
diff --git a/plugins/plugin_notification/src/plugin.rs b/plugins/plugin_notification/src/plugin.rs
new file mode 100644
index 00000000..09c8a930
--- /dev/null
+++ b/plugins/plugin_notification/src/plugin.rs
@@ -0,0 +1,69 @@
+use async_trait::async_trait;
+
+use tedge_api::plugin::Handle;
+use tedge_api::Address;
+use tedge_api::Plugin;
+use tedge_api::PluginError;
+use tedge_lib::measurement::Measurement;
+use tracing::trace;
+
+use crate::builder::MeasurementReceiver;
+use crate::builder::NotificationReceiver;
+use crate::config::NotificationType;
+
+pub struct NotificationPlugin {
+ target_addr: Address<MeasurementReceiver>,
+ notify_addr: Address<NotificationReceiver>,
+
+ raise: NotificationType,
+ raise_msg: String,
+}
+
+impl tedge_api::plugin::PluginDeclaration for NotificationPlugin {
+ type HandledMessages = (Measurement,);
+}
+
+impl NotificationPlugin {
+ pub fn new(
+ target_addr: Address<MeasurementReceiver>,
+ notify_addr: Address<NotificationReceiver>,
+ raise: NotificationType,
+ raise_msg: String,
+ ) -> Self {
+ Self {
+ target_addr,
+ notify_addr,
+ raise,
+ raise_msg,
+ }
+ }
+}
+
+#[async_trait]
+impl Plugin for NotificationPlugin {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ trace!("Shutdown");
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Handle<Measurement> for NotificationPlugin {
+ async fn handle_message(
+ &self,
+ message: Measurement,
+ _sender: tedge_api::address::ReplySenderFor<Measurement>,
+ ) -> Result<(), PluginError> {
+ trace!("Received measurement = {:?}", message);
+ trace!("Sending notification for measurement = {:?}", message);
+ let _ = self.notify_addr.send_and_wait(self.raise.clone().into_notification(self.raise_msg.to_string())).await;
+
+ trace!("Forwarding measurement = {:?}", message);
+ let _ = self.target_addr.send_and_wait(message).await;
+ Ok(())
+ }
+}