summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-04-21 12:40:53 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-04-21 13:10:13 +0200
commit589aa039d9980f8cf3d5052d22a04ebd4bd888a3 (patch)
treed570ef6adb234e05ad7d5739c319d1e7abd0bf88
parent11b361d02e2fd9201660e3ac3dadf88cc15cfbc4 (diff)
Add moneo mapper plugin
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock46
-rw-r--r--Cargo.toml1
-rw-r--r--plugins/plugin_moneo_mapper/Cargo.toml24
-rw-r--r--plugins/plugin_moneo_mapper/src/builder.rs62
-rw-r--r--plugins/plugin_moneo_mapper/src/config.rs5
-rw-r--r--plugins/plugin_moneo_mapper/src/error.rs9
-rw-r--r--plugins/plugin_moneo_mapper/src/lib.rs8
-rw-r--r--plugins/plugin_moneo_mapper/src/plugin.rs118
8 files changed, 273 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7273cc6a..5a13076d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1118,6 +1118,18 @@ dependencies = [
]
[[package]]
+name = "getset"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9"
+dependencies = [
+ "proc-macro-error",
+ "proc-macro2 1.0.32",
+ "quote 1.0.10",
+ "syn 1.0.82",
+]
+
+[[package]]
name = "gimli"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1682,6 +1694,20 @@ dependencies = [
]
[[package]]
+name = "moneo-mqtt"
+version = "0.1.0"
+source = "git+https://github.com/matthiasbeyer/moneo-mqtt?branch=master#582c7a2d02d8830890356a70dfe164ac2e34724a"
+dependencies = [
+ "chrono",
+ "getset",
+ "serde",
+ "serde_json",
+ "serde_with",
+ "thiserror",
+ "uuid",
+]
+
+[[package]]
name = "mqtt_channel"
version = "0.5.2"
dependencies = [
@@ -2209,6 +2235,25 @@ dependencies = [
]
[[package]]
+name = "plugin_moneo_mapper"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "futures",
+ "miette",
+ "moneo-mqtt",
+ "plugin_mqtt",
+ "serde",
+ "serde_json",
+ "tedge_api",
+ "tedge_lib",
+ "thiserror",
+ "tokio",
+ "tokio-util 0.7.0",
+ "tracing",
+]
+
+[[package]]
name = "plugin_mqtt"
version = "0.1.0"
dependencies = [
@@ -3000,6 +3045,7 @@ version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "946fa04a8ac43ff78a1f4b811990afb9ddbdf5890b46d6dda0ba1998230138b7"
dependencies = [
+ "chrono",
"rustversion",
"serde",
"serde_with_macros",
diff --git a/Cargo.toml b/Cargo.toml
index 0fc7fc0f..fc4e981f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ members = [
"plugins/plugin_inotify",
"plugins/plugin_log",
"plugins/plugin_measurement_filter",
+ "plugins/plugin_moneo_mapper",
"plugins/plugin_mqtt",
"plugins/plugin_mqtt_measurement_bridge",
"plugins/plugin_sysstat",
diff --git a/plugins/plugin_moneo_mapper/Cargo.toml b/plugins/plugin_moneo_mapper/Cargo.toml
new file mode 100644
index 00000000..bab6caa2
--- /dev/null
+++ b/plugins/plugin_moneo_mapper/Cargo.toml
@@ -0,0 +1,24 @@
+[package]
+name = "plugin_moneo_mapper"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-trait = "0.1"
+miette = "4.4"
+thiserror = "1"
+futures = "0.3"
+serde = { version = "1.0.136", features = ["derive"] }
+serde_json = "1"
+tokio = { version = "1", features = [] }
+tokio-util = "0.7.0"
+tracing = "0.1"
+
+tedge_api = { path = "../../crates/core/tedge_api" }
+tedge_lib = { path = "../../crates/core/tedge_lib" }
+
+plugin_mqtt = { path = "../../plugins/plugin_mqtt" }
+moneo-mqtt = { git = "https://github.com/matthiasbeyer/moneo-mqtt", branch = "master" }
+
diff --git a/plugins/plugin_moneo_mapper/src/builder.rs b/plugins/plugin_moneo_mapper/src/builder.rs
new file mode 100644
index 00000000..7e934717
--- /dev/null
+++ b/plugins/plugin_moneo_mapper/src/builder.rs
@@ -0,0 +1,62 @@
+use async_trait::async_trait;
+use tokio_util::sync::CancellationToken;
+
+use tedge_api::plugin::BuiltPlugin;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::plugin::PluginExt;
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+use tedge_lib::measurement::Measurement;
+
+use crate::config::MoneoMapperConfig;
+use crate::plugin::MoneoMapperPlugin;
+
+tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement));
+
+pub struct MoneoMapperPluginBuilder;
+
+#[async_trait]
+impl<PD> PluginBuilder<PD> for MoneoMapperPluginBuilder
+where
+ PD: PluginDirectory,
+{
+ fn kind_name() -> &'static str {
+ "moneo_mapper"
+ }
+
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized,
+ {
+ MoneoMapperPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into()
+ .map(|_: MoneoMapperConfig| ())
+ .map_err(|_| miette::miette!("Failed to parse moneo mapper configuration"))
+ .map_err(PluginError::from)
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ let config = config
+ .try_into::<MoneoMapperConfig>()
+ .map_err(|_| miette::miette!("Failed to parse moneo mapper configuration"))?;
+
+ let target = plugin_dir.get_address_for(&config.target)?;
+ Ok(MoneoMapperPlugin::new(target).finish())
+ }
+}
+
diff --git a/plugins/plugin_moneo_mapper/src/config.rs b/plugins/plugin_moneo_mapper/src/config.rs
new file mode 100644
index 00000000..1ba464c3
--- /dev/null
+++ b/plugins/plugin_moneo_mapper/src/config.rs
@@ -0,0 +1,5 @@
+#[derive(Debug, serde::Deserialize)]
+pub struct MoneoMapperConfig {
+ pub(crate) target: String,
+}
+
diff --git a/plugins/plugin_moneo_mapper/src/error.rs b/plugins/plugin_moneo_mapper/src/error.rs
new file mode 100644
index 00000000..33b252f1
--- /dev/null
+++ b/plugins/plugin_moneo_mapper/src/error.rs
@@ -0,0 +1,9 @@
+#[derive(Debug, miette::Diagnostic, thiserror::Error)]
+#[diagnostic()]
+pub enum Error {
+ #[error("Failed to deserialize")]
+ Deserialize(#[from] serde_json::Error),
+
+ #[error("Failed to send message")]
+ SendingFailed,
+}
diff --git a/plugins/plugin_moneo_mapper/src/lib.rs b/plugins/plugin_moneo_mapper/src/lib.rs
new file mode 100644
index 00000000..613448df
--- /dev/null
+++ b/plugins/plugin_moneo_mapper/src/lib.rs
@@ -0,0 +1,8 @@
+mod builder;
+mod config;
+mod error;
+mod plugin;
+
+pub use crate::builder::MoneoMapperPluginBuilder;
+pub use crate::plugin::MoneoMapperPlugin;
+
diff --git a/plugins/plugin_moneo_mapper/src/plugin.rs b/plugins/plugin_moneo_mapper/src/plugin.rs
new file mode 100644
index 00000000..3e9c32bc
--- /dev/null
+++ b/plugins/plugin_moneo_mapper/src/plugin.rs
@@ -0,0 +1,118 @@
+use async_trait::async_trait;
+use futures::stream::StreamExt;
+use tedge_api::address::Address;
+use tedge_api::address::ReplySender;
+use tedge_api::error::PluginError;
+use tedge_api::plugin::Handle;
+use tedge_api::plugin::Message;
+use tedge_api::plugin::Plugin;
+use tedge_lib::measurement::Measurement;
+use tedge_lib::measurement::MeasurementValue;
+use tracing::debug;
+
+use crate::error::Error;
+
+tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement));
+
+pub struct MoneoMapperPlugin {
+ target: Address<MeasurementReceiver>,
+}
+
+impl MoneoMapperPlugin {
+ pub fn new(target: Address<MeasurementReceiver>) -> Self {
+ Self { target }
+ }
+}
+
+impl tedge_api::plugin::PluginDeclaration for MoneoMapperPlugin {
+ type HandledMessages = (plugin_mqtt::IncomingMessage,);
+}
+
+#[async_trait]
+impl Plugin for MoneoMapperPlugin {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ debug!("Setting up moneo mapper plugin");
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ debug!("Shutting down moneo mapper plugin!");
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Handle<plugin_mqtt::IncomingMessage> for MoneoMapperPlugin {
+ async fn handle_message(
+ &self,
+ message: plugin_mqtt::IncomingMessage,
+ _sender: ReplySender<<plugin_mqtt::IncomingMessage as Message>::Reply>,
+ ) -> Result<(), PluginError> {
+ let payloads: Vec<moneo_mqtt::payload::Payload> =
+ serde_json::from_value(message.into_payload()).map_err(Error::Deserialize)?;
+
+ for payload in payloads {
+ let measurement_name = payload.device_path().clone().into_string();
+ payload
+ .values()
+ .iter()
+ .map(moneo_mqtt::payload::Value::clone)
+ .map(moneo_mqtt::payload::Value::into_inner)
+ .map(map_serde_value_to_measurement)
+ .collect::<Result<Vec<_>, Error>>()?
+ .into_iter()
+ .filter_map(|o| o)
+ .map(|msmv| {
+ let measurement = Measurement::new(measurement_name.clone(), msmv);
+ async {
+ self.target
+ .send(measurement)
+ .await
+ .map_err(|_| Error::SendingFailed)
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<_>>()
+ .await;
+ }
+
+ Ok(())
+ }
+}
+
+fn map_serde_value_to_measurement(v: serde_json::Value) -> Result<Option<MeasurementValue>, Error> {
+ match v {
+ serde_json::Value::Null => Ok(None),
+ serde_json::Value::Bool(b) => Ok(Some(MeasurementValue::Bool(b))),
+ serde_json::Value::Number(num) => {
+ let f = num
+ .as_f64()
+ .or_else(|| num.as_i64().map(|i| i as f64))
+ .or_else(|| num.as_u64().map(|u| u as f64))
+ .unwrap(); // TODO
+ Ok(Some(MeasurementValue::Float(f)))
+ }
+
+ serde_json::Value::String(s) => Ok(Some(MeasurementValue::Text(s))),
+ serde_json::Value::Array(v) => {
+ let v = v
+ .into_iter()
+ .map(map_serde_value_to_measurement)
+ .collect::<Result<Vec<_>, Error>>()?
+ .into_iter()
+ .filter_map(|o| o)
+ .collect();
+ Ok(Some(MeasurementValue::List(v)))
+ }
+ serde_json::Value::Object(m) => {
+ let m = m
+ .into_iter()
+ .map(|(k, v)| map_serde_value_to_measurement(v).map(|v| v.map(|v| (k, v))))
+ .collect::<Result<Vec<Option<(String, MeasurementValue)>>, Error>>()?
+ .into_iter()
+ .filter_map(|o| o)
+ .collect();
+ Ok(Some(MeasurementValue::Map(m)))
+ }
+ }
+}