summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-05-14 16:12:58 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-17 11:11:36 +0200
commit84c718da13add723a04d8e78bc0442ef7fa9090a (patch)
treeff4ca601db7f5c9242340cfd22d80bde7c87fabc
parent5e358be9aeebd6ffa23dbb2f782049906880a231 (diff)
Add thin_egde_json plugin
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock20
-rw-r--r--Cargo.toml1
-rw-r--r--plugins/plugin_thin_edge_json/Cargo.toml19
-rw-r--r--plugins/plugin_thin_edge_json/src/builder.rs61
-rw-r--r--plugins/plugin_thin_edge_json/src/config.rs11
-rw-r--r--plugins/plugin_thin_edge_json/src/error.rs17
-rw-r--r--plugins/plugin_thin_edge_json/src/lib.rs9
-rw-r--r--plugins/plugin_thin_edge_json/src/message.rs22
-rw-r--r--plugins/plugin_thin_edge_json/src/plugin.rs67
9 files changed, 227 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index cd02f8da..4efa0c32 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2372,6 +2372,26 @@ dependencies = [
]
[[package]]
+name = "plugin_thin_edge_json"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "futures",
+ "miette",
+ "plugin_mqtt",
+ "serde",
+ "sysinfo",
+ "tedge_api",
+ "tedge_lib",
+ "thin_edge_json",
+ "thiserror",
+ "tokio",
+ "tokio-util 0.7.0",
+ "toml",
+ "tracing",
+]
+
+[[package]]
name = "pollster"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 3285d1b7..25cf88ad 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,6 +14,7 @@ members = [
"plugins/plugin_mqtt_measurement_bridge",
"plugins/plugin_notification",
"plugins/plugin_sysstat",
+ "plugins/plugin_thin_edge_json",
"plugins/tedge_apt_plugin",
"plugins/tedge_dummy_plugin",
"plugins/tedge_apama_plugin",
diff --git a/plugins/plugin_thin_edge_json/Cargo.toml b/plugins/plugin_thin_edge_json/Cargo.toml
new file mode 100644
index 00000000..8ecaa323
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "plugin_thin_edge_json"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-trait = "0.1"
+miette = "4.4"
+serde = { version = "1.0.136", features = ["derive"] }
+thiserror = "1"
+toml = "0.5"
+
+tedge_api = { path = "../../crates/core/tedge_api" }
+plugin_mqtt = { path = "../../plugins/plugin_mqtt" }
+
+thin_edge_json = { path = "../../crates/core/thin_edge_json" }
+
diff --git a/plugins/plugin_thin_edge_json/src/builder.rs b/plugins/plugin_thin_edge_json/src/builder.rs
new file mode 100644
index 00000000..cc4961cc
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/src/builder.rs
@@ -0,0 +1,61 @@
+use async_trait::async_trait;
+
+use tedge_api::plugin::BuiltPlugin;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::plugin::PluginExt;
+use tedge_api::CancellationToken;
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+
+use crate::config::ThinEdgeJsonConfig;
+use crate::plugin::ThinEdgeJsonPlugin;
+
+pub struct ThinEdgeJsonPluginBuilder;
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for ThinEdgeJsonPluginBuilder {
+ fn kind_name() -> &'static str {
+ "thin_edge_json"
+ }
+
+ fn kind_configuration() -> Option<tedge_api::ConfigDescription> {
+ Some(<ThinEdgeJsonConfig as tedge_api::AsConfig>::as_config())
+ }
+
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized,
+ {
+ ThinEdgeJsonPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into()
+ .map(|_: ThinEdgeJsonConfig| ())
+ .map_err(crate::error::Error::ConfigParseFailed)
+ .map_err(PluginError::from)
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ let config: ThinEdgeJsonConfig = config
+ .try_into()
+ .map_err(crate::error::Error::ConfigParseFailed)?;
+
+ let target_addr = plugin_dir
+ .get_address_for::<crate::plugin::ThinEdgeJsonMessageReceiver>(config.target())?;
+
+ Ok(ThinEdgeJsonPlugin::new(target_addr).finish())
+ }
+}
diff --git a/plugins/plugin_thin_edge_json/src/config.rs b/plugins/plugin_thin_edge_json/src/config.rs
new file mode 100644
index 00000000..d924d797
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/src/config.rs
@@ -0,0 +1,11 @@
+#[derive(Debug, serde::Deserialize, tedge_api::Config)]
+pub struct ThinEdgeJsonConfig {
+ /// The name of the plugin to send the parsed ThinEdgeJsonMessage to
+ target: String,
+}
+
+impl ThinEdgeJsonConfig {
+ pub(crate) fn target(&self) -> &str {
+ &self.target
+ }
+}
diff --git a/plugins/plugin_thin_edge_json/src/error.rs b/plugins/plugin_thin_edge_json/src/error.rs
new file mode 100644
index 00000000..12385c7d
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/src/error.rs
@@ -0,0 +1,17 @@
+#[derive(Debug, miette::Diagnostic, thiserror::Error)]
+pub(crate) enum Error {
+ #[error("Failed to parse configuration")]
+ ConfigParseFailed(toml::de::Error),
+
+ #[error("Parsing bytes as UTF8 String failed")]
+ ParseUtf8Error(#[from] std::str::Utf8Error),
+
+ #[error("Failed to parse ThinEdgeJson")]
+ ThinEdgeJsonParserError(#[from] thin_edge_json::parser::ThinEdgeJsonParserError),
+
+ #[error("Failed to build ThinEdgeJson")]
+ ThinEdgeJsonBuilderError(#[from] thin_edge_json::builder::ThinEdgeJsonBuilderError),
+
+ #[error("Failed to send ThinEdgeJson")]
+ FailedToSend,
+}
diff --git a/plugins/plugin_thin_edge_json/src/lib.rs b/plugins/plugin_thin_edge_json/src/lib.rs
new file mode 100644
index 00000000..d7a5e530
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/src/lib.rs
@@ -0,0 +1,9 @@
+mod builder;
+mod config;
+mod error;
+mod message;
+mod plugin;
+
+pub use crate::builder::ThinEdgeJsonPluginBuilder;
+pub use crate::message::ThinEdgeJsonMessage;
+pub use crate::plugin::ThinEdgeJsonPlugin;
diff --git a/plugins/plugin_thin_edge_json/src/message.rs b/plugins/plugin_thin_edge_json/src/message.rs
new file mode 100644
index 00000000..c203f6f5
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/src/message.rs
@@ -0,0 +1,22 @@
+use thin_edge_json::data::ThinEdgeJson;
+
+#[derive(Debug)]
+pub struct ThinEdgeJsonMessage(ThinEdgeJson);
+
+impl From<ThinEdgeJson> for ThinEdgeJsonMessage {
+ fn from(tejson: ThinEdgeJson) -> Self {
+ Self(tejson)
+ }
+}
+
+impl ThinEdgeJsonMessage {
+ pub fn inner(&self) -> &ThinEdgeJson {
+ &self.0
+ }
+
+ pub fn into_inner(self) -> ThinEdgeJson {
+ self.0
+ }
+}
+
+impl tedge_api::plugin::Message for ThinEdgeJsonMessage {}
diff --git a/plugins/plugin_thin_edge_json/src/plugin.rs b/plugins/plugin_thin_edge_json/src/plugin.rs
new file mode 100644
index 00000000..be09acf0
--- /dev/null
+++ b/plugins/plugin_thin_edge_json/src/plugin.rs
@@ -0,0 +1,67 @@
+use async_trait::async_trait;
+
+use tedge_api::address::ReplySenderFor;
+use tedge_api::plugin::Handle;
+use tedge_api::Address;
+use tedge_api::Plugin;
+use tedge_api::PluginError;
+
+use plugin_mqtt::IncomingMessage;
+
+use crate::error::Error;
+use crate::message::ThinEdgeJsonMessage;
+
+tedge_api::make_receiver_bundle!(pub struct ThinEdgeJsonMessageReceiver(ThinEdgeJsonMessage));
+
+#[derive(Debug)]
+pub struct ThinEdgeJsonPlugin {
+ target_addr: Address<ThinEdgeJsonMessageReceiver>,
+}
+
+impl tedge_api::plugin::PluginDeclaration for ThinEdgeJsonPlugin {
+ type HandledMessages = (IncomingMessage,);
+}
+
+impl ThinEdgeJsonPlugin {
+ pub(crate) fn new(target_addr: Address<ThinEdgeJsonMessageReceiver>) -> Self {
+ Self { target_addr }
+ }
+}
+
+#[async_trait]
+impl Plugin for ThinEdgeJsonPlugin {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl Handle<IncomingMessage> for ThinEdgeJsonPlugin {
+ async fn handle_message(
+ &self,
+ message: IncomingMessage,
+ _sender: ReplySenderFor<IncomingMessage>,
+ ) -> Result<(), PluginError> {
+ let payload = std::str::from_utf8(message.payload()).map_err(Error::from)?;
+
+ let payload = {
+ let mut visitor = thin_edge_json::builder::ThinEdgeJsonBuilder::new();
+ thin_edge_json::parser::parse_str(payload, &mut visitor).map_err(Error::from)?;
+
+ visitor.done().map_err(Error::from)?
+ };
+
+ let message = ThinEdgeJsonMessage::from(payload);
+ let _ = self
+ .target_addr
+ .send_and_wait(message)
+ .await
+ .map_err(|_| Error::FailedToSend)?;
+
+ Ok(())
+ }
+}