diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-14 16:12:58 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-17 11:11:36 +0200 |
commit | 84c718da13add723a04d8e78bc0442ef7fa9090a (patch) | |
tree | ff4ca601db7f5c9242340cfd22d80bde7c87fabc | |
parent | 5e358be9aeebd6ffa23dbb2f782049906880a231 (diff) |
Add thin_egde_json plugin
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | Cargo.lock | 20 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/Cargo.toml | 19 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/src/builder.rs | 61 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/src/config.rs | 11 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/src/error.rs | 17 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/src/lib.rs | 9 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/src/message.rs | 22 | ||||
-rw-r--r-- | plugins/plugin_thin_edge_json/src/plugin.rs | 67 |
9 files changed, 227 insertions, 0 deletions
@@ -2372,6 +2372,26 @@ dependencies = [ ] [[package]] +name = "plugin_thin_edge_json" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "miette", + "plugin_mqtt", + "serde", + "sysinfo", + "tedge_api", + "tedge_lib", + "thin_edge_json", + "thiserror", + "tokio", + "tokio-util 0.7.0", + "toml", + "tracing", +] + +[[package]] name = "pollster" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -14,6 +14,7 @@ members = [ "plugins/plugin_mqtt_measurement_bridge", "plugins/plugin_notification", "plugins/plugin_sysstat", + "plugins/plugin_thin_edge_json", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", "plugins/tedge_apama_plugin", diff --git a/plugins/plugin_thin_edge_json/Cargo.toml b/plugins/plugin_thin_edge_json/Cargo.toml new file mode 100644 index 00000000..8ecaa323 --- /dev/null +++ b/plugins/plugin_thin_edge_json/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "plugin_thin_edge_json" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +miette = "4.4" +serde = { version = "1.0.136", features = ["derive"] } +thiserror = "1" +toml = "0.5" + +tedge_api = { path = "../../crates/core/tedge_api" } +plugin_mqtt = { path = "../../plugins/plugin_mqtt" } + +thin_edge_json = { path = "../../crates/core/thin_edge_json" } + diff --git a/plugins/plugin_thin_edge_json/src/builder.rs b/plugins/plugin_thin_edge_json/src/builder.rs new file mode 100644 index 00000000..cc4961cc --- /dev/null +++ b/plugins/plugin_thin_edge_json/src/builder.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; + +use tedge_api::plugin::BuiltPlugin; +use tedge_api::plugin::HandleTypes; +use tedge_api::plugin::PluginExt; +use tedge_api::CancellationToken; +use tedge_api::PluginBuilder; +use tedge_api::PluginConfiguration; +use tedge_api::PluginDirectory; +use tedge_api::PluginError; + +use crate::config::ThinEdgeJsonConfig; +use crate::plugin::ThinEdgeJsonPlugin; + +pub struct ThinEdgeJsonPluginBuilder; + +#[async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for ThinEdgeJsonPluginBuilder { + fn kind_name() -> &'static str { + "thin_edge_json" + } + + fn kind_configuration() -> Option<tedge_api::ConfigDescription> { + Some(<ThinEdgeJsonConfig as tedge_api::AsConfig>::as_config()) + } + + fn kind_message_types() -> HandleTypes + where + Self: Sized, + { + ThinEdgeJsonPlugin::get_handled_types() + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: ThinEdgeJsonConfig| ()) + .map_err(crate::error::Error::ConfigParseFailed) + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<BuiltPlugin, PluginError> { + let config: ThinEdgeJsonConfig = config + .try_into() + .map_err(crate::error::Error::ConfigParseFailed)?; + + let target_addr = plugin_dir + .get_address_for::<crate::plugin::ThinEdgeJsonMessageReceiver>(config.target())?; + + Ok(ThinEdgeJsonPlugin::new(target_addr).finish()) + } +} diff --git a/plugins/plugin_thin_edge_json/src/config.rs b/plugins/plugin_thin_edge_json/src/config.rs new file mode 100644 index 00000000..d924d797 --- /dev/null +++ b/plugins/plugin_thin_edge_json/src/config.rs @@ -0,0 +1,11 @@ +#[derive(Debug, serde::Deserialize, tedge_api::Config)] +pub struct ThinEdgeJsonConfig { + /// The name of the plugin to send the parsed ThinEdgeJsonMessage to + target: String, +} + +impl ThinEdgeJsonConfig { + pub(crate) fn target(&self) -> &str { + &self.target + } +} diff --git a/plugins/plugin_thin_edge_json/src/error.rs b/plugins/plugin_thin_edge_json/src/error.rs new file mode 100644 index 00000000..12385c7d --- /dev/null +++ b/plugins/plugin_thin_edge_json/src/error.rs @@ -0,0 +1,17 @@ +#[derive(Debug, miette::Diagnostic, thiserror::Error)] +pub(crate) enum Error { + #[error("Failed to parse configuration")] + ConfigParseFailed(toml::de::Error), + + #[error("Parsing bytes as UTF8 String failed")] + ParseUtf8Error(#[from] std::str::Utf8Error), + + #[error("Failed to parse ThinEdgeJson")] + ThinEdgeJsonParserError(#[from] thin_edge_json::parser::ThinEdgeJsonParserError), + + #[error("Failed to build ThinEdgeJson")] + ThinEdgeJsonBuilderError(#[from] thin_edge_json::builder::ThinEdgeJsonBuilderError), + + #[error("Failed to send ThinEdgeJson")] + FailedToSend, +} diff --git a/plugins/plugin_thin_edge_json/src/lib.rs b/plugins/plugin_thin_edge_json/src/lib.rs new file mode 100644 index 00000000..d7a5e530 --- /dev/null +++ b/plugins/plugin_thin_edge_json/src/lib.rs @@ -0,0 +1,9 @@ +mod builder; +mod config; +mod error; +mod message; +mod plugin; + +pub use crate::builder::ThinEdgeJsonPluginBuilder; +pub use crate::message::ThinEdgeJsonMessage; +pub use crate::plugin::ThinEdgeJsonPlugin; diff --git a/plugins/plugin_thin_edge_json/src/message.rs b/plugins/plugin_thin_edge_json/src/message.rs new file mode 100644 index 00000000..c203f6f5 --- /dev/null +++ b/plugins/plugin_thin_edge_json/src/message.rs @@ -0,0 +1,22 @@ +use thin_edge_json::data::ThinEdgeJson; + +#[derive(Debug)] +pub struct ThinEdgeJsonMessage(ThinEdgeJson); + +impl From<ThinEdgeJson> for ThinEdgeJsonMessage { + fn from(tejson: ThinEdgeJson) -> Self { + Self(tejson) + } +} + +impl ThinEdgeJsonMessage { + pub fn inner(&self) -> &ThinEdgeJson { + &self.0 + } + + pub fn into_inner(self) -> ThinEdgeJson { + self.0 + } +} + +impl tedge_api::plugin::Message for ThinEdgeJsonMessage {} diff --git a/plugins/plugin_thin_edge_json/src/plugin.rs b/plugins/plugin_thin_edge_json/src/plugin.rs new file mode 100644 index 00000000..be09acf0 --- /dev/null +++ b/plugins/plugin_thin_edge_json/src/plugin.rs @@ -0,0 +1,67 @@ +use async_trait::async_trait; + +use tedge_api::address::ReplySenderFor; +use tedge_api::plugin::Handle; +use tedge_api::Address; +use tedge_api::Plugin; +use tedge_api::PluginError; + +use plugin_mqtt::IncomingMessage; + +use crate::error::Error; +use crate::message::ThinEdgeJsonMessage; + +tedge_api::make_receiver_bundle!(pub struct ThinEdgeJsonMessageReceiver(ThinEdgeJsonMessage)); + +#[derive(Debug)] +pub struct ThinEdgeJsonPlugin { + target_addr: Address<ThinEdgeJsonMessageReceiver>, +} + +impl tedge_api::plugin::PluginDeclaration for ThinEdgeJsonPlugin { + type HandledMessages = (IncomingMessage,); +} + +impl ThinEdgeJsonPlugin { + pub(crate) fn new(target_addr: Address<ThinEdgeJsonMessageReceiver>) -> Self { + Self { target_addr } + } +} + +#[async_trait] +impl Plugin for ThinEdgeJsonPlugin { + async fn start(&mut self) -> Result<(), PluginError> { + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + Ok(()) + } +} + +#[async_trait] +impl Handle<IncomingMessage> for ThinEdgeJsonPlugin { + async fn handle_message( + &self, + message: IncomingMessage, + _sender: ReplySenderFor<IncomingMessage>, + ) -> Result<(), PluginError> { + let payload = std::str::from_utf8(message.payload()).map_err(Error::from)?; + + let payload = { + let mut visitor = thin_edge_json::builder::ThinEdgeJsonBuilder::new(); + thin_edge_json::parser::parse_str(payload, &mut visitor).map_err(Error::from)?; + + visitor.done().map_err(Error::from)? + }; + + let message = ThinEdgeJsonMessage::from(payload); + let _ = self + .target_addr + .send_and_wait(message) + .await + .map_err(|_| Error::FailedToSend)?; + + Ok(()) + } +} |