diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-04-08 12:30:52 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-10 15:24:56 +0200 |
commit | 274717453df9f210483180a8c3bf2de036ce7ba0 (patch) | |
tree | cd59743abe02b75c9e99d4a5190c440e7860155f | |
parent | 2e76f2727129387343d82917bc5881227bb5f881 (diff) |
Add new plugin for notifications
This patch adds a plugin that can be used to generate a notification if
a message is received.
This is helpful in combination with the filter plugin:
* Filter measurements for value > 100, if value is greater (condition
matches), forward the message to the notification plugin
Otherwise send to plugin P
* The notification plugin generates a notification if it receives a
message and sends this warning to a configured plugin.
The message itself is forwarded (to P).
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | Cargo.lock | 26 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/plugin_notification/Cargo.toml | 24 | ||||
-rw-r--r-- | plugins/plugin_notification/README.md | 17 | ||||
-rw-r--r-- | plugins/plugin_notification/src/builder.rs | 61 | ||||
-rw-r--r-- | plugins/plugin_notification/src/config.rs | 56 | ||||
-rw-r--r-- | plugins/plugin_notification/src/lib.rs | 6 | ||||
-rw-r--r-- | plugins/plugin_notification/src/plugin.rs | 69 |
8 files changed, 249 insertions, 11 deletions
@@ -2217,30 +2217,35 @@ dependencies = [ ] [[package]] -name = "plugin_measurement_threshold" +name = "plugin_mqtt" version = "0.1.0" dependencies = [ - "anyhow", "async-trait", + "backoff 0.4.0", + "futures", "log", + "miette", + "paho-mqtt", "serde", "tedge_api", "tedge_lib", + "thiserror", + "tokio", "tokio-util 0.7.0", + "toml", "tracing", ] [[package]] -name = "plugin_mqtt" +name = "plugin_mqtt_measurement_bridge" version = "0.1.0" dependencies = [ "async-trait", - "backoff 0.4.0", - "futures", "log", "miette", - "paho-mqtt", + "plugin_mqtt", "serde", + "serde_json", "tedge_api", "tedge_lib", "thiserror", @@ -2251,19 +2256,18 @@ dependencies = [ ] [[package]] -name = "plugin_mqtt_measurement_bridge" +name = "plugin_notification" version = "0.1.0" dependencies = [ "async-trait", + "float-cmp 0.9.0", "log", "miette", - "plugin_mqtt", + "nom", "serde", - "serde_json", + "serde_with", "tedge_api", "tedge_lib", - "thiserror", - "tokio", "tokio-util 0.7.0", "toml", "tracing", @@ -11,6 +11,7 @@ members = [ "plugins/plugin_measurement_filter", "plugins/plugin_mqtt", "plugins/plugin_mqtt_measurement_bridge", + "plugins/plugin_notification", "plugins/plugin_sysstat", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", diff --git a/plugins/plugin_notification/Cargo.toml b/plugins/plugin_notification/Cargo.toml new file mode 100644 index 00000000..b08e97f7 --- /dev/null +++ b/plugins/plugin_notification/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "plugin_notification" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +log = { version = "0.4", features = ["serde"] } +nom = "7" +miette = "4.4" +serde = { version = "1.0.136", features = ["derive"] } +serde_with = "1" +tokio-util = "0.7.0" +tracing = "0.1" + +tedge_api = { path = "../../crates/core/tedge_api" } +tedge_lib = { path = "../../crates/core/tedge_lib" } + +[dev-dependencies] +float-cmp = "0.9" +toml = "0.5" + diff --git a/plugins/plugin_notification/README.md b/plugins/plugin_notification/README.md new file mode 100644 index 00000000..cd73d675 --- /dev/null +++ b/plugins/plugin_notification/README.md @@ -0,0 +1,17 @@ +# plugin_notification + +This plugin receives measurements that it forwards to another plugin. +Every time it receives a measurement, a third plugin is notified. + + +## Configuration + +Example configuration of the plugin: + +```toml +forward_to = "other_plugin_name" +notify = "to_notify_plugin_name" +raise = "info" # one of "info", "warning", "error" +raise_message = "Some freeform text" +``` + diff --git a/plugins/plugin_notification/src/builder.rs b/plugins/plugin_notification/src/builder.rs new file mode 100644 index 00000000..c4d68ae0 --- /dev/null +++ b/plugins/plugin_notification/src/builder.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; + +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; +use tedge_api::plugin::BuiltPlugin; +use tedge_api::plugin::HandleTypes; +use tedge_api::plugin::PluginExt; +use tedge_lib::measurement::Measurement; +use tedge_lib::notification::Notification; + +use crate::config::Config; +use crate::plugin::NotificationPlugin; + +pub struct NotificationPluginBuilder; + +tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement)); +tedge_api::make_receiver_bundle!(pub struct NotificationReceiver(Notification)); + +#[async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for NotificationPluginBuilder { + fn kind_name() -> &'static str { + "notification" + } + + fn kind_message_types() -> HandleTypes + where + Self: Sized, + { + NotificationPlugin::get_handled_types() + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into::<Config>() + .map_err(|_| miette::miette!("Failed to parse measurement threshold configuration")) + .map(|_| ()) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<BuiltPlugin, PluginError> { + let config = config + .try_into::<Config>() + .map_err(|_| miette::miette!("Failed to parse measurement threshold configuration"))?; + + let forward_addr = plugin_dir.get_address_for(&config.forward_to)?; + let notify_addr = plugin_dir.get_address_for(&config.notify)?; + Ok(NotificationPlugin::new(forward_addr, notify_addr, config.raise, config.raise_message).finish()) + } +} + diff --git a/plugins/plugin_notification/src/config.rs b/plugins/plugin_notification/src/config.rs new file mode 100644 index 00000000..95078a7a --- /dev/null +++ b/plugins/plugin_notification/src/config.rs @@ -0,0 +1,56 @@ +use tedge_lib::notification::Notification; + +#[derive(Debug, serde::Deserialize)] +pub struct Config { + pub(crate) forward_to: String, + + pub(crate) notify: String, + + pub(crate) raise: NotificationType, + pub(crate) raise_message: String, +} + +#[derive(Clone, Copy, Debug, serde::Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub enum NotificationType { + #[serde(rename = "info")] + Info, + + #[serde(rename = "warning")] + Warning, + + #[serde(rename = "error")] + Error, +} + +impl NotificationType { + pub(crate) fn into_notification(self, message: String) -> Notification { + match self { + NotificationType::Info => Notification::info(message), + NotificationType::Warning => Notification::warning(message), + NotificationType::Error => Notification::error(message), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_valid() { + let config = r#" + forward_to = "foo" + notify = "bar" + raise = "info" + raise_message = "it is getting warm here" + "#; + + let cfg: Config = toml::from_str(config).unwrap(); + assert_eq!(cfg.forward_to, "foo"); + assert_eq!(cfg.notify, "bar"); + assert_eq!(cfg.raise, NotificationType::Info); + assert_eq!(cfg.raise_message, String::from("it is getting warm here")); + } +} + diff --git a/plugins/plugin_notification/src/lib.rs b/plugins/plugin_notification/src/lib.rs new file mode 100644 index 00000000..85114ef8 --- /dev/null +++ b/plugins/plugin_notification/src/lib.rs @@ -0,0 +1,6 @@ +mod builder; +mod config; +mod plugin; + +pub use crate::builder::NotificationPluginBuilder; +pub use crate::plugin::NotificationPlugin; diff --git a/plugins/plugin_notification/src/plugin.rs b/plugins/plugin_notification/src/plugin.rs new file mode 100644 index 00000000..09c8a930 --- /dev/null +++ b/plugins/plugin_notification/src/plugin.rs @@ -0,0 +1,69 @@ +use async_trait::async_trait; + +use tedge_api::plugin::Handle; +use tedge_api::Address; +use tedge_api::Plugin; +use tedge_api::PluginError; +use tedge_lib::measurement::Measurement; +use tracing::trace; + +use crate::builder::MeasurementReceiver; +use crate::builder::NotificationReceiver; +use crate::config::NotificationType; + +pub struct NotificationPlugin { + target_addr: Address<MeasurementReceiver>, + notify_addr: Address<NotificationReceiver>, + + raise: NotificationType, + raise_msg: String, +} + +impl tedge_api::plugin::PluginDeclaration for NotificationPlugin { + type HandledMessages = (Measurement,); +} + +impl NotificationPlugin { + pub fn new( + target_addr: Address<MeasurementReceiver>, + notify_addr: Address<NotificationReceiver>, + raise: NotificationType, + raise_msg: String, + ) -> Self { + Self { + target_addr, + notify_addr, + raise, + raise_msg, + } + } +} + +#[async_trait] +impl Plugin for NotificationPlugin { + async fn start(&mut self) -> Result<(), PluginError> { + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + trace!("Shutdown"); + Ok(()) + } +} + +#[async_trait] +impl Handle<Measurement> for NotificationPlugin { + async fn handle_message( + &self, + message: Measurement, + _sender: tedge_api::address::ReplySenderFor<Measurement>, + ) -> Result<(), PluginError> { + trace!("Received measurement = {:?}", message); + trace!("Sending notification for measurement = {:?}", message); + let _ = self.notify_addr.send_and_wait(self.raise.clone().into_notification(self.raise_msg.to_string())).await; + + trace!("Forwarding measurement = {:?}", message); + let _ = self.target_addr.send_and_wait(message).await; + Ok(()) + } +} |