diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-04-25 11:46:10 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-04-25 11:46:10 +0200 |
commit | 913ae12ea83c8a91e489779213178e335e8e2bc7 (patch) | |
tree | c9c4eef0ef035f3dd3d516cc21ed3b641f5fe3d4 | |
parent | ce9c9228832d6040f2dad1d2d37c37f75a864dae (diff) | |
parent | d0965ddabfddece53653e8cf833511560cb73c8f (diff) |
Merge branch 'showcase-plugin' into showcaseshowcase
-rw-r--r-- | Cargo.lock | 21 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/plugin_random_measurements/Cargo.toml | 19 | ||||
-rw-r--r-- | plugins/plugin_random_measurements/src/lib.rs | 132 | ||||
-rw-r--r-- | tedge/Cargo.toml | 2 | ||||
-rw-r--r-- | tedge/example-random.toml | 25 | ||||
-rw-r--r-- | tedge/src/main.rs | 7 |
7 files changed, 205 insertions, 2 deletions
@@ -1183,7 +1183,7 @@ dependencies = [ "proc-macro-error", "proc-macro2 1.0.32", "quote 1.0.10", - "syn 1.0.82", + "syn 1.0.91", ] [[package]] @@ -2392,7 +2392,7 @@ dependencies = [ "tedge_lib", "thiserror", "tokio", - "tokio-util 0.7.0", + "tokio-util 0.7.1", "tracing", ] @@ -2437,6 +2437,22 @@ dependencies = [ ] [[package]] +name = "plugin_random_measurements" +version = "0.1.0" +dependencies = [ + "async-trait", + "humantime 2.1.0", + "humantime-serde", + "miette", + "rand", + "serde", + "tedge_api", + "tedge_lib", + "tokio", + "tracing", +] + +[[package]] name = "plugin_sm" version = "0.5.2" dependencies = [ @@ -3511,6 +3527,7 @@ dependencies = [ "plugin_moneo_mapper", "plugin_mqtt", "plugin_mqtt_measurement_bridge", + "plugin_random_measurements", "plugin_sysstat", "tedge_api", "tedge_core", @@ -13,6 +13,7 @@ members = [ "plugins/plugin_moneo_mapper", "plugins/plugin_mqtt", "plugins/plugin_mqtt_measurement_bridge", + "plugins/plugin_random_measurements", "plugins/plugin_sysstat", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", diff --git a/plugins/plugin_random_measurements/Cargo.toml b/plugins/plugin_random_measurements/Cargo.toml new file mode 100644 index 00000000..399564be --- /dev/null +++ b/plugins/plugin_random_measurements/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "plugin_random_measurements" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +humantime = "2.1" +humantime-serde = "1.1" +miette = "4.4" +rand = "0.8" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1", features = ["rt"] } +tracing = "0.1" + +tedge_api = { path = "../../crates/core/tedge_api" } +tedge_lib = { path = "../../crates/core/tedge_lib" } diff --git a/plugins/plugin_random_measurements/src/lib.rs b/plugins/plugin_random_measurements/src/lib.rs new file mode 100644 index 00000000..168e1db7 --- /dev/null +++ b/plugins/plugin_random_measurements/src/lib.rs @@ -0,0 +1,132 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use miette::Context; +use miette::IntoDiagnostic; +use tedge_api::plugin::PluginExt; +use tedge_api::CancellationToken; + +use tedge_api::Address; +use tedge_api::Plugin; +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; +use tedge_lib::mainloop::MainloopStopper; +use tedge_lib::measurement::Measurement; +use tedge_lib::measurement::MeasurementValue; + +pub struct RandomMeasurementsPluginBuilder; + +#[derive(serde::Deserialize, Debug)] +struct RandomMeasurementsPluginConfig { + #[serde(with = "humantime_serde")] + interval: std::time::Duration, + target: String, +} + +#[async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for RandomMeasurementsPluginBuilder { + fn kind_name() -> &'static str { + "random_measurements" + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: RandomMeasurementsPluginConfig| ()) + .into_diagnostic() + .context("Failed to parse random-measurements-plugin configuration") + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<tedge_api::plugin::BuiltPlugin, PluginError> { + let config = config + .try_into::<RandomMeasurementsPluginConfig>() + .into_diagnostic() + .context("Failed to parse random-measurements-plugin configuration")?; + + let address = plugin_dir.get_address_for::<MeasurementReceiver>(&config.target)?; + Ok(RandomMeasurementsPlugin::new(address, config.interval).finish()) + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + RandomMeasurementsPlugin::get_handled_types() + } +} + +tedge_api::make_receiver_bundle!(struct MeasurementReceiver(Measurement)); + +struct RandomMeasurementsPlugin { + addr: Address<MeasurementReceiver>, + interval: std::time::Duration, + + stopper: Option<MainloopStopper>, +} + +impl tedge_api::plugin::PluginDeclaration for RandomMeasurementsPlugin { + type HandledMessages = (); +} + +impl RandomMeasurementsPlugin { + fn new(addr: Address<MeasurementReceiver>, interval: std::time::Duration) -> Self { + Self { + addr, + interval, + + stopper: None, + } + } +} + +struct State { + target: Address<MeasurementReceiver>, +} + +#[async_trait] +impl Plugin for RandomMeasurementsPlugin { + async fn start(&mut self) -> Result<(), PluginError> { + let state = State { + target: self.addr.clone(), + }; + let (stopper, mainloop) = + tedge_lib::mainloop::Mainloop::ticking_every(self.interval, state); + + self.stopper = Some(stopper); + + let _ = tokio::spawn(mainloop.run(main_random_measurements)); + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + if let Some(stopper) = self.stopper.take() { + stopper + .stop() + .map_err(|()| miette::miette!("Failed to stop mainloop"))? + } + Ok(()) + } +} + +async fn main_random_measurements(state: Arc<State>) -> Result<(), PluginError> { + let f: f64 = rand::random(); // value in [0, 1) + let msmt = Measurement::new("random".to_string(), MeasurementValue::Float(f)); + state + .target + .send(msmt) + .await + .map_err(|_| miette::miette!("Failed to send random measurement"))?; + Ok(()) +} diff --git a/tedge/Cargo.toml b/tedge/Cargo.toml index a61c3774..eeaaab2b 100644 --- a/tedge/Cargo.toml +++ b/tedge/Cargo.toml @@ -32,6 +32,7 @@ plugin_mqtt = { path = "../plugins/plugin_mqtt", optional = true } plugin_mqtt_measurement_bridge = { path = "../plugins/plugin_mqtt_measurement_bridge", optional = true } plugin_moneo_mapper = { path = "../plugins/plugin_moneo_mapper", optional = true } plugin_azure_bridge = { path = "../plugins/plugin_azure_bridge", optional = true } +plugin_random_measurements = { path = "../plugins/plugin_random_measurements", optional = true } [features] default = [ @@ -55,6 +56,7 @@ builtin_plugin_httpstop = ["plugin_httpstop"] builtin_plugin_measurement_filter = ["plugin_measurement_filter"] builtin_plugin_moneo_mapper = ["plugin_moneo_mapper"] builtin_plugin_azure_bridge = ["plugin_azure_bridge"] +builtin_plugin_random_measurements = ["plugin_random_measurements"] mqtt = ["plugin_mqtt", "plugin_mqtt_measurement_bridge"] moneo = ["plugin_moneo_mapper"] diff --git a/tedge/example-random.toml b/tedge/example-random.toml new file mode 100644 index 00000000..9c9cc76e --- /dev/null +++ b/tedge/example-random.toml @@ -0,0 +1,25 @@ +communication_buffer_size = 10 + +plugin_shutdown_timeout_ms = 10000 + +[plugins] + + +[plugins.rand] +kind = "random_measurements" + +[plugins.rand.configuration] +target = "logger" +interval = "1sec" + + +[plugins.logger] +kind = "log" + +[plugins.logger.configuration] +level = "info" + + + +acknowledge = false + diff --git a/tedge/src/main.rs b/tedge/src/main.rs index fde29134..8fc62dac 100644 --- a/tedge/src/main.rs +++ b/tedge/src/main.rs @@ -127,6 +127,13 @@ async fn main() -> miette::Result<()> { let application = register_plugin!( application, + "builtin_plugin_random_measurements", + plugin_random_measurements::RandomMeasurementsPluginBuilder, + plugin_random_measurements::RandomMeasurementsPluginBuilder + ); + + let application = register_plugin!( + application, "moneo", plugin_moneo_mapper::MoneoMapperPluginBuilder, plugin_moneo_mapper::MoneoMapperPluginBuilder |