diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-04-21 12:40:53 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-04-21 13:10:13 +0200 |
commit | 589aa039d9980f8cf3d5052d22a04ebd4bd888a3 (patch) | |
tree | d570ef6adb234e05ad7d5739c319d1e7abd0bf88 | |
parent | 11b361d02e2fd9201660e3ac3dadf88cc15cfbc4 (diff) |
Add moneo mapper plugin
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | Cargo.lock | 46 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/plugin_moneo_mapper/Cargo.toml | 24 | ||||
-rw-r--r-- | plugins/plugin_moneo_mapper/src/builder.rs | 62 | ||||
-rw-r--r-- | plugins/plugin_moneo_mapper/src/config.rs | 5 | ||||
-rw-r--r-- | plugins/plugin_moneo_mapper/src/error.rs | 9 | ||||
-rw-r--r-- | plugins/plugin_moneo_mapper/src/lib.rs | 8 | ||||
-rw-r--r-- | plugins/plugin_moneo_mapper/src/plugin.rs | 118 |
8 files changed, 273 insertions, 0 deletions
@@ -1118,6 +1118,18 @@ dependencies = [ ] [[package]] +name = "getset" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" +dependencies = [ + "proc-macro-error", + "proc-macro2 1.0.32", + "quote 1.0.10", + "syn 1.0.82", +] + +[[package]] name = "gimli" version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1682,6 +1694,20 @@ dependencies = [ ] [[package]] +name = "moneo-mqtt" +version = "0.1.0" +source = "git+https://github.com/matthiasbeyer/moneo-mqtt?branch=master#582c7a2d02d8830890356a70dfe164ac2e34724a" +dependencies = [ + "chrono", + "getset", + "serde", + "serde_json", + "serde_with", + "thiserror", + "uuid", +] + +[[package]] name = "mqtt_channel" version = "0.5.2" dependencies = [ @@ -2209,6 +2235,25 @@ dependencies = [ ] [[package]] +name = "plugin_moneo_mapper" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "miette", + "moneo-mqtt", + "plugin_mqtt", + "serde", + "serde_json", + "tedge_api", + "tedge_lib", + "thiserror", + "tokio", + "tokio-util 0.7.0", + "tracing", +] + +[[package]] name = "plugin_mqtt" version = "0.1.0" dependencies = [ @@ -3000,6 +3045,7 @@ version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "946fa04a8ac43ff78a1f4b811990afb9ddbdf5890b46d6dda0ba1998230138b7" dependencies = [ + "chrono", "rustversion", "serde", "serde_with_macros", @@ -9,6 +9,7 @@ members = [ "plugins/plugin_inotify", "plugins/plugin_log", "plugins/plugin_measurement_filter", + "plugins/plugin_moneo_mapper", "plugins/plugin_mqtt", "plugins/plugin_mqtt_measurement_bridge", "plugins/plugin_sysstat", diff --git a/plugins/plugin_moneo_mapper/Cargo.toml b/plugins/plugin_moneo_mapper/Cargo.toml new file mode 100644 index 00000000..bab6caa2 --- /dev/null +++ b/plugins/plugin_moneo_mapper/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "plugin_moneo_mapper" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +miette = "4.4" +thiserror = "1" +futures = "0.3" +serde = { version = "1.0.136", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = [] } +tokio-util = "0.7.0" +tracing = "0.1" + +tedge_api = { path = "../../crates/core/tedge_api" } +tedge_lib = { path = "../../crates/core/tedge_lib" } + +plugin_mqtt = { path = "../../plugins/plugin_mqtt" } +moneo-mqtt = { git = "https://github.com/matthiasbeyer/moneo-mqtt", branch = "master" } + diff --git a/plugins/plugin_moneo_mapper/src/builder.rs b/plugins/plugin_moneo_mapper/src/builder.rs new file mode 100644 index 00000000..7e934717 --- /dev/null +++ b/plugins/plugin_moneo_mapper/src/builder.rs @@ -0,0 +1,62 @@ +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; + +use tedge_api::plugin::BuiltPlugin; +use tedge_api::plugin::HandleTypes; +use tedge_api::plugin::PluginExt; +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; +use tedge_lib::measurement::Measurement; + +use crate::config::MoneoMapperConfig; +use crate::plugin::MoneoMapperPlugin; + +tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement)); + +pub struct MoneoMapperPluginBuilder; + +#[async_trait] +impl<PD> PluginBuilder<PD> for MoneoMapperPluginBuilder +where + PD: PluginDirectory, +{ + fn kind_name() -> &'static str { + "moneo_mapper" + } + + fn kind_message_types() -> HandleTypes + where + Self: Sized, + { + MoneoMapperPlugin::get_handled_types() + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: MoneoMapperConfig| ()) + .map_err(|_| miette::miette!("Failed to parse moneo mapper configuration")) + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<BuiltPlugin, PluginError> { + let config = config + .try_into::<MoneoMapperConfig>() + .map_err(|_| miette::miette!("Failed to parse moneo mapper configuration"))?; + + let target = plugin_dir.get_address_for(&config.target)?; + Ok(MoneoMapperPlugin::new(target).finish()) + } +} + diff --git a/plugins/plugin_moneo_mapper/src/config.rs b/plugins/plugin_moneo_mapper/src/config.rs new file mode 100644 index 00000000..1ba464c3 --- /dev/null +++ b/plugins/plugin_moneo_mapper/src/config.rs @@ -0,0 +1,5 @@ +#[derive(Debug, serde::Deserialize)] +pub struct MoneoMapperConfig { + pub(crate) target: String, +} + diff --git a/plugins/plugin_moneo_mapper/src/error.rs b/plugins/plugin_moneo_mapper/src/error.rs new file mode 100644 index 00000000..33b252f1 --- /dev/null +++ b/plugins/plugin_moneo_mapper/src/error.rs @@ -0,0 +1,9 @@ +#[derive(Debug, miette::Diagnostic, thiserror::Error)] +#[diagnostic()] +pub enum Error { + #[error("Failed to deserialize")] + Deserialize(#[from] serde_json::Error), + + #[error("Failed to send message")] + SendingFailed, +} diff --git a/plugins/plugin_moneo_mapper/src/lib.rs b/plugins/plugin_moneo_mapper/src/lib.rs new file mode 100644 index 00000000..613448df --- /dev/null +++ b/plugins/plugin_moneo_mapper/src/lib.rs @@ -0,0 +1,8 @@ +mod builder; +mod config; +mod error; +mod plugin; + +pub use crate::builder::MoneoMapperPluginBuilder; +pub use crate::plugin::MoneoMapperPlugin; + diff --git a/plugins/plugin_moneo_mapper/src/plugin.rs b/plugins/plugin_moneo_mapper/src/plugin.rs new file mode 100644 index 00000000..3e9c32bc --- /dev/null +++ b/plugins/plugin_moneo_mapper/src/plugin.rs @@ -0,0 +1,118 @@ +use async_trait::async_trait; +use futures::stream::StreamExt; +use tedge_api::address::Address; +use tedge_api::address::ReplySender; +use tedge_api::error::PluginError; +use tedge_api::plugin::Handle; +use tedge_api::plugin::Message; +use tedge_api::plugin::Plugin; +use tedge_lib::measurement::Measurement; +use tedge_lib::measurement::MeasurementValue; +use tracing::debug; + +use crate::error::Error; + +tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement)); + +pub struct MoneoMapperPlugin { + target: Address<MeasurementReceiver>, +} + +impl MoneoMapperPlugin { + pub fn new(target: Address<MeasurementReceiver>) -> Self { + Self { target } + } +} + +impl tedge_api::plugin::PluginDeclaration for MoneoMapperPlugin { + type HandledMessages = (plugin_mqtt::IncomingMessage,); +} + +#[async_trait] +impl Plugin for MoneoMapperPlugin { + async fn start(&mut self) -> Result<(), PluginError> { + debug!("Setting up moneo mapper plugin"); + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + debug!("Shutting down moneo mapper plugin!"); + Ok(()) + } +} + +#[async_trait] +impl Handle<plugin_mqtt::IncomingMessage> for MoneoMapperPlugin { + async fn handle_message( + &self, + message: plugin_mqtt::IncomingMessage, + _sender: ReplySender<<plugin_mqtt::IncomingMessage as Message>::Reply>, + ) -> Result<(), PluginError> { + let payloads: Vec<moneo_mqtt::payload::Payload> = + serde_json::from_value(message.into_payload()).map_err(Error::Deserialize)?; + + for payload in payloads { + let measurement_name = payload.device_path().clone().into_string(); + payload + .values() + .iter() + .map(moneo_mqtt::payload::Value::clone) + .map(moneo_mqtt::payload::Value::into_inner) + .map(map_serde_value_to_measurement) + .collect::<Result<Vec<_>, Error>>()? + .into_iter() + .filter_map(|o| o) + .map(|msmv| { + let measurement = Measurement::new(measurement_name.clone(), msmv); + async { + self.target + .send(measurement) + .await + .map_err(|_| Error::SendingFailed) + } + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<_>>() + .await; + } + + Ok(()) + } +} + +fn map_serde_value_to_measurement(v: serde_json::Value) -> Result<Option<MeasurementValue>, Error> { + match v { + serde_json::Value::Null => Ok(None), + serde_json::Value::Bool(b) => Ok(Some(MeasurementValue::Bool(b))), + serde_json::Value::Number(num) => { + let f = num + .as_f64() + .or_else(|| num.as_i64().map(|i| i as f64)) + .or_else(|| num.as_u64().map(|u| u as f64)) + .unwrap(); // TODO + Ok(Some(MeasurementValue::Float(f))) + } + + serde_json::Value::String(s) => Ok(Some(MeasurementValue::Text(s))), + serde_json::Value::Array(v) => { + let v = v + .into_iter() + .map(map_serde_value_to_measurement) + .collect::<Result<Vec<_>, Error>>()? + .into_iter() + .filter_map(|o| o) + .collect(); + Ok(Some(MeasurementValue::List(v))) + } + serde_json::Value::Object(m) => { + let m = m + .into_iter() + .map(|(k, v)| map_serde_value_to_measurement(v).map(|v| v.map(|v| (k, v)))) + .collect::<Result<Vec<Option<(String, MeasurementValue)>>, Error>>()? + .into_iter() + .filter_map(|o| o) + .collect(); + Ok(Some(MeasurementValue::Map(m))) + } + } +} |