diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-14 16:46:54 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-17 11:11:36 +0200 |
commit | cf15b5346c5b1ea5a7ea5085639a7cc536e656bb (patch) | |
tree | ebe952696db4068e9d678f2981d706005dd8d8de | |
parent | 84c718da13add723a04d8e78bc0442ef7fa9090a (diff) |
Add plugin for mapping thin_egde_json to Measurement
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
8 files changed, 202 insertions, 4 deletions
@@ -2376,17 +2376,28 @@ name = "plugin_thin_edge_json" version = "0.1.0" dependencies = [ "async-trait", - "futures", "miette", "plugin_mqtt", "serde", - "sysinfo", + "tedge_api", + "thin_edge_json", + "thiserror", + "toml", +] + +[[package]] +name = "plugin_thin_edge_json_to_measurement_mapper" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "miette", + "plugin_thin_edge_json", + "serde", "tedge_api", "tedge_lib", "thin_edge_json", "thiserror", - "tokio", - "tokio-util 0.7.0", "toml", "tracing", ] @@ -15,6 +15,7 @@ members = [ "plugins/plugin_notification", "plugins/plugin_sysstat", "plugins/plugin_thin_edge_json", + "plugins/plugin_thin_edge_json_to_measurement_mapper", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", "plugins/tedge_apama_plugin", diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml b/plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml new file mode 100644 index 00000000..247f8cb5 --- /dev/null +++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "plugin_thin_edge_json_to_measurement_mapper" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +futures = "0.3" +miette = "4.4" +serde = { version = "1.0.136", features = ["derive"] } +thiserror = "1" +toml = "0.5" +tracing = "0.1" + +tedge_api = { path = "../../crates/core/tedge_api" } +tedge_lib = { path = "../../crates/core/tedge_lib" } + +thin_edge_json = { path = "../../crates/core/thin_edge_json" } +plugin_thin_edge_json = { path = "../../plugins/plugin_thin_edge_json" } diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs new file mode 100644 index 00000000..160635e8 --- /dev/null +++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/builder.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; + +use tedge_api::plugin::BuiltPlugin; +use tedge_api::plugin::HandleTypes; +use tedge_api::plugin::PluginExt; +use tedge_api::CancellationToken; +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; + +use crate::config::ThinEdgeJsonToMeasurementMapperConfig; +use crate::plugin::ThinEdgeJsonToMeasurementMapperPlugin; + +pub struct ThinEdgeJsonToMeasurementMapperPluginBuilder; + +#[async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for ThinEdgeJsonToMeasurementMapperPluginBuilder { + fn kind_name() -> &'static str { + "thin_edge_json_to_measurement_mapper" + } + + fn kind_configuration() -> Option<tedge_api::ConfigDescription> { + Some(<ThinEdgeJsonToMeasurementMapperConfig as tedge_api::AsConfig>::as_config()) + } + + fn kind_message_types() -> HandleTypes + where + Self: Sized, + { + ThinEdgeJsonToMeasurementMapperPlugin::get_handled_types() + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: ThinEdgeJsonToMeasurementMapperConfig| ()) + .map_err(crate::error::Error::ConfigParseFailed) + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<BuiltPlugin, PluginError> { + let config: ThinEdgeJsonToMeasurementMapperConfig = config + .try_into() + .map_err(crate::error::Error::ConfigParseFailed)?; + + let target_addr = + plugin_dir.get_address_for::<crate::plugin::MeasurementReceiver>(config.target())?; + + Ok(ThinEdgeJsonToMeasurementMapperPlugin::new(target_addr).finish()) + } +} diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs new file mode 100644 index 00000000..600cdaa5 --- /dev/null +++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/config.rs @@ -0,0 +1,11 @@ +#[derive(Debug, serde::Deserialize, tedge_api::Config)] +pub struct ThinEdgeJsonToMeasurementMapperConfig { + /// The name of the plugin to send the parsed ThinEdgeJsonToMeasurementMapperMessage to + target: String, +} + +impl ThinEdgeJsonToMeasurementMapperConfig { + pub(crate) fn target(&self) -> &str { + &self.target + } +} diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs new file mode 100644 index 00000000..38e9beb1 --- /dev/null +++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/error.rs @@ -0,0 +1,8 @@ +#[derive(Debug, miette::Diagnostic, thiserror::Error)] +pub(crate) enum Error { + #[error("Failed to parse configuration")] + ConfigParseFailed(toml::de::Error), + + #[error("Failed to send ThinEdgeJson")] + FailedToSend, +} diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs new file mode 100644 index 00000000..7c03fbf5 --- /dev/null +++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/lib.rs @@ -0,0 +1,7 @@ +mod builder; +mod config; +mod error; +mod plugin; + +pub use crate::builder::ThinEdgeJsonToMeasurementMapperPluginBuilder; +pub use crate::plugin::ThinEdgeJsonToMeasurementMapperPlugin; diff --git a/plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs new file mode 100644 index 00000000..2c9d66de --- /dev/null +++ b/plugins/plugin_thin_edge_json_to_measurement_mapper/src/plugin.rs @@ -0,0 +1,78 @@ +use async_trait::async_trait; + +use futures::stream::StreamExt; +use plugin_thin_edge_json::ThinEdgeJsonMessage; +use tedge_api::address::ReplySenderFor; +use tedge_api::plugin::Handle; +use tedge_api::Address; +use tedge_api::Plugin; +use tedge_api::PluginError; +use tedge_lib::iter::IntoSendAll; +use tedge_lib::measurement::Measurement; +use tedge_lib::measurement::MeasurementValue; +use thin_edge_json::data::ThinEdgeValue; +use tracing::Instrument; + +use crate::error::Error; + +tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement)); + +#[derive(Debug)] +pub struct ThinEdgeJsonToMeasurementMapperPlugin { + target_addr: Address<MeasurementReceiver>, +} + +impl tedge_api::plugin::PluginDeclaration for ThinEdgeJsonToMeasurementMapperPlugin { + type HandledMessages = (ThinEdgeJsonMessage,); +} + +impl ThinEdgeJsonToMeasurementMapperPlugin { + pub(crate) fn new(target_addr: Address<MeasurementReceiver>) -> Self { + Self { target_addr } + } +} + +#[async_trait] +impl Plugin for ThinEdgeJsonToMeasurementMapperPlugin { + async fn start(&mut self) -> Result<(), PluginError> { + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + Ok(()) + } +} + +#[async_trait] +impl Handle<ThinEdgeJsonMessage> for ThinEdgeJsonToMeasurementMapperPlugin { + async fn handle_message( + &self, + message: ThinEdgeJsonMessage, + _sender: ReplySenderFor<ThinEdgeJsonMessage>, + ) -> Result<(), PluginError> { + message + .into_inner() + .values + .into_iter() + .map(|value| match value { + ThinEdgeValue::Single(s) => vec![s], + ThinEdgeValue::Multi(m) => m.values, // TODO: We ignore the `MultiValueMeasurement::name` here + }) + .map(Vec::into_iter) + .flatten() + .map(|msmt| Measurement::new(msmt.name, MeasurementValue::Float(msmt.value))) + .map(|msmt| (msmt, &self.target_addr)) + .send_all() + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<Result<_, _>>>() + .instrument(tracing::debug_span!( + "plugin.plugin_thin_edge_json_to_measurement_mapper.handle.send_all" + )) + .await + .into_iter() + .map(|r| r.map(|_| ()).map_err(|_| Error::FailedToSend)) // Ignore result, turn error into Error::FailedToSend + .collect::<Result<Vec<()>, Error>>() + .map(|_| ()) + .map_err(PluginError::from) + } +} |