summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-05-14 16:46:54 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-17 11:11:36 +0200
commitcf15b5346c5b1ea5a7ea5085639a7cc536e656bb (patch)
treeebe952696db4068e9d678f2981d706005dd8d8de
parent84c718da13add723a04d8e78bc0442ef7fa9090a (diff)
Add plugin for mapping thin_egde_json to Measurement
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock19
-rw-r--r--Cargo.toml1
-rw-r--r--plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml21
-rw-r--r--plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs61
-rw-r--r--plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs11
-rw-r--r--plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs8
-rw-r--r--plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs7
-rw-r--r--plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs78
8 files changed, 202 insertions, 4 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4efa0c32..043aabd7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2376,17 +2376,28 @@ name = "plugin_thin_edge_json"
version = "0.1.0"
dependencies = [
"async-trait",
- "futures",
"miette",
"plugin_mqtt",
"serde",
- "sysinfo",
+ "tedge_api",
+ "thin_edge_json",
+ "thiserror",
+ "toml",
+]
+
+[[package]]
+name = "plugin_thin_edge_json_to_measurement_mapper"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "futures",
+ "miette",
+ "plugin_thin_edge_json",
+ "serde",
"tedge_api",
"tedge_lib",
"thin_edge_json",
"thiserror",
- "tokio",
- "tokio-util 0.7.0",
"toml",
"tracing",
]
diff --git a/Cargo.toml b/Cargo.toml
index 25cf88ad..22c4f92b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,7 @@ members = [
"plugins/plugin_notification",
"plugins/plugin_sysstat",
"plugins/plugin_thin_edge_json",
+ "plugins/plugin_thin_edge_json_to_measurement_mapper",
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
"plugins/tedge_apama_plugin",
diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml b/plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml
new file mode 100644
index 00000000..247f8cb5
--- /dev/null
+++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "plugin_thin_edge_json_to_measurement_mapper"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-trait = "0.1"
+futures = "0.3"
+miette = "4.4"
+serde = { version = "1.0.136", features = ["derive"] }
+thiserror = "1"
+toml = "0.5"
+tracing = "0.1"
+
+tedge_api = { path = "../../crates/core/tedge_api" }
+tedge_lib = { path = "../../crates/core/tedge_lib" }
+
+thin_edge_json = { path = "../../crates/core/thin_edge_json" }
+plugin_thin_edge_json = { path = "../../plugins/plugin_thin_edge_json" }
diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs
new file mode 100644
index 00000000..160635e8
--- /dev/null
+++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs
@@ -0,0 +1,61 @@
+use async_trait::async_trait;
+
+use tedge_api::plugin::BuiltPlugin;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::plugin::PluginExt;
+use tedge_api::CancellationToken;
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+
+use crate::config::ThinEdgeJsonToMeasurementMapperConfig;
+use crate::plugin::ThinEdgeJsonToMeasurementMapperPlugin;
+
+pub struct ThinEdgeJsonToMeasurementMapperPluginBuilder;
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for ThinEdgeJsonToMeasurementMapperPluginBuilder {
+ fn kind_name() -> &'static str {
+ "thin_edge_json_to_measurement_mapper"
+ }
+
+ fn kind_configuration() -> Option<tedge_api::ConfigDescription> {
+ Some(<ThinEdgeJsonToMeasurementMapperConfig as tedge_api::AsConfig>::as_config())
+ }
+
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized,
+ {
+ ThinEdgeJsonToMeasurementMapperPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into()
+ .map(|_: ThinEdgeJsonToMeasurementMapperConfig| ())
+ .map_err(crate::error::Error::ConfigParseFailed)
+ .map_err(PluginError::from)
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ let config: ThinEdgeJsonToMeasurementMapperConfig = config
+ .try_into()
+ .map_err(crate::error::Error::ConfigParseFailed)?;
+
+ let target_addr =
+ plugin_dir.get_address_for::<crate::plugin::MeasurementReceiver>(config.target())?;
+
+ Ok(ThinEdgeJsonToMeasurementMapperPlugin::new(target_addr).finish())
+ }
+}
diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs
new file mode 100644
index 00000000..600cdaa5
--- /dev/null
+++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs
@@ -0,0 +1,11 @@
+#[derive(Debug, serde::Deserialize, tedge_api::Config)]
+pub struct ThinEdgeJsonToMeasurementMapperConfig {
+ /// The name of the plugin to send the parsed ThinEdgeJsonToMeasurementMapperMessage to
+ target: String,
+}
+
+impl ThinEdgeJsonToMeasurementMapperConfig {
+ pub(crate) fn target(&self) -> &str {
+ &self.target
+ }
+}
diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs
new file mode 100644
index 00000000..38e9beb1
--- /dev/null
+++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs
@@ -0,0 +1,8 @@
+#[derive(Debug, miette::Diagnostic, thiserror::Error)]
+pub(crate) enum Error {
+ #[error("Failed to parse configuration")]
+ ConfigParseFailed(toml::de::Error),
+
+ #[error("Failed to send ThinEdgeJson")]
+ FailedToSend,
+}
diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs
new file mode 100644
index 00000000..7c03fbf5
--- /dev/null
+++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs
@@ -0,0 +1,7 @@
+mod builder;
+mod config;
+mod error;
+mod plugin;
+
+pub use crate::builder::ThinEdgeJsonToMeasurementMapperPluginBuilder;
+pub use crate::plugin::ThinEdgeJsonToMeasurementMapperPlugin;
diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs
new file mode 100644
index 00000000..2c9d66de
--- /dev/null
+++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs
@@ -0,0 +1,78 @@
+use async_trait::async_trait;
+
+use futures::stream::StreamExt;
+use plugin_thin_edge_json::ThinEdgeJsonMessage;
+use tedge_api::address::ReplySenderFor;
+use tedge_api::plugin::Handle;
+use tedge_api::Address;
+use tedge_api::Plugin;
+use tedge_api::PluginError;
+use tedge_lib::iter::IntoSendAll;
+use tedge_lib::measurement::Measurement;
+use tedge_lib::measurement::MeasurementValue;
+use thin_edge_json::data::ThinEdgeValue;
+use tracing::Instrument;
+
+use crate::error::Error;
+
+tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement));
+
+#[derive(Debug)]
+pub struct ThinEdgeJsonToMeasurementMapperPlugin {
+ target_addr: Address<MeasurementReceiver>,
+}
+
+impl tedge_api::plugin::PluginDeclaration for ThinEdgeJsonToMeasurementMapperPlugin {
+ type HandledMessages = (ThinEdgeJsonMessage,);
+}
+
+impl ThinEdgeJsonToMeasurementMapperPlugin {
+ pub(crate) fn new(target_addr: Address<MeasurementReceiver>) -> Self {
+ Self { target_addr }
+ }
+}
+
+#[async_trait]
+impl Plugin for ThinEdgeJsonToMeasurementMapperPlugin {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Handle<ThinEdgeJsonMessage> for ThinEdgeJsonToMeasurementMapperPlugin {
+ async fn handle_message(
+ &self,
+ message: ThinEdgeJsonMessage,
+ _sender: ReplySenderFor<ThinEdgeJsonMessage>,
+ ) -> Result<(), PluginError> {
+ message
+ .into_inner()
+ .values
+ .into_iter()
+ .map(|value| match value {
+ ThinEdgeValue::Single(s) => vec![s],
+ ThinEdgeValue::Multi(m) => m.values, // TODO: We ignore the `MultiValueMeasurement::name` here
+ })
+ .map(Vec::into_iter)
+ .flatten()
+ .map(|msmt| Measurement::new(msmt.name, MeasurementValue::Float(msmt.value)))
+ .map(|msmt| (msmt, &self.target_addr))
+ .send_all()
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<Result<_, _>>>()
+ .instrument(tracing::debug_span!(
+ "plugin.plugin_thin_edge_json_to_measurement_mapper.handle.send_all"
+ ))
+ .await
+ .into_iter()
+ .map(|r| r.map(|_| ()).map_err(|_| Error::FailedToSend)) // Ignore result, turn error into Error::FailedToSend
+ .collect::<Result<Vec<()>, Error>>()
+ .map(|_| ())
+ .map_err(PluginError::from)
+ }
+}