summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-04-22 11:13:23 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-02 10:24:24 +0200
commit6c0af4b9e99b722b58afe0ea5ca64d4d1481b0e2 (patch)
treead9a9a2879667d01476e75090473cf93e761ae09 /plugins
parent1dbc2287b0c155b3c3375361e517137996158a5d (diff)
plugin_mqtt: Improve error reporting
This patch improves the error handling story of the plugin_mqtt by introducing a dedicated error type. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/plugin_mqtt/Cargo.toml1
-rw-r--r--plugins/plugin_mqtt/src/builder.rs4
-rw-r--r--plugins/plugin_mqtt/src/error.rs37
-rw-r--r--plugins/plugin_mqtt/src/message.rs3
-rw-r--r--plugins/plugin_mqtt/src/plugin.rs27
5 files changed, 27 insertions, 45 deletions
diff --git a/plugins/plugin_mqtt/Cargo.toml b/plugins/plugin_mqtt/Cargo.toml
index 43cbdc2e..2ce68c03 100644
--- a/plugins/plugin_mqtt/Cargo.toml
+++ b/plugins/plugin_mqtt/Cargo.toml
@@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = [] }
tokio-util = "0.7.0"
tracing = "0.1"
+toml = "0.5"
tedge_api = { path = "../../crates/core/tedge_api" }
tedge_lib = { path = "../../crates/core/tedge_lib" }
diff --git a/plugins/plugin_mqtt/src/builder.rs b/plugins/plugin_mqtt/src/builder.rs
index 748a1efa..05c5c4c3 100644
--- a/plugins/plugin_mqtt/src/builder.rs
+++ b/plugins/plugin_mqtt/src/builder.rs
@@ -43,7 +43,7 @@ where
.clone()
.try_into()
.map(|_: MqttConfig| ())
- .map_err(|_| miette::miette!("Failed to parse mqtt configuration"))
+ .map_err(crate::error::Error::ConfigParseFailed)
.map_err(PluginError::from)
}
@@ -55,7 +55,7 @@ where
) -> Result<BuiltPlugin, PluginError> {
let config = config
.try_into::<MqttConfig>()
- .map_err(|_| miette::miette!("Failed to parse mqtt configuration"))?;
+ .map_err(crate::error::Error::ConfigParseFailed)?;
let addr = plugin_dir.get_address_for(&config.target)?;
Ok(MqttPlugin::new(config, addr).finish())
diff --git a/plugins/plugin_mqtt/src/error.rs b/plugins/plugin_mqtt/src/error.rs
index e5ef6a3a..b7bb8916 100644
--- a/plugins/plugin_mqtt/src/error.rs
+++ b/plugins/plugin_mqtt/src/error.rs
@@ -1,29 +1,18 @@
-use miette::Result;
-
#[derive(Debug, miette::Diagnostic, thiserror::Error)]
-#[error("Error while shutting down MQTT Plugin")]
-pub struct MqttShutdownError {
- #[related]
- errs: Vec<miette::Error>
-}
+pub(crate) enum Error {
+ #[error("Failed to parse configuration")]
+ ConfigParseFailed(toml::de::Error),
-impl MqttShutdownError {
- pub fn build_for(clientres: Result<()>, stopres: Result<()>) -> std::result::Result<(), Self> {
- let mut errs = Vec::with_capacity(2);
- if let Err(e) = clientres {
- errs.push(e);
- }
+ #[error("Failed to publish message")]
+ FailedToPublish(paho_mqtt::errors::Error),
- if let Err(e) = stopres {
- errs.push(e);
- }
+ #[error("No client, cannot send messages")]
+ NoClient,
- if errs.is_empty() {
- Ok(())
- } else {
- Err(Self {
- errs
- })
- }
- }
+ #[error("Failed to stop MQTT mainloop")]
+ FailedToStopMqttMainloop,
+
+ #[error("Failed to disconnect MQTT client")]
+ FailedToDisconnectMqttClient(paho_mqtt::errors::Error),
}
+
diff --git a/plugins/plugin_mqtt/src/message.rs b/plugins/plugin_mqtt/src/message.rs
index 17af6e68..f37de48b 100644
--- a/plugins/plugin_mqtt/src/message.rs
+++ b/plugins/plugin_mqtt/src/message.rs
@@ -39,7 +39,6 @@ impl tedge_api::plugin::Message for IncomingMessage {
tedge_api::make_receiver_bundle!(pub struct MqttMessageReceiver(IncomingMessage));
-
#[derive(Debug)]
pub struct OutgoingMessage {
pub(crate) payload: Vec<u8>,
@@ -65,5 +64,3 @@ impl OutgoingMessage {
impl tedge_api::plugin::Message for OutgoingMessage {
type Reply = tedge_api::message::NoReply; // for now
}
-
-
diff --git a/plugins/plugin_mqtt/src/plugin.rs b/plugins/plugin_mqtt/src/plugin.rs
index cf3e480e..c54c1fdc 100644
--- a/plugins/plugin_mqtt/src/plugin.rs
+++ b/plugins/plugin_mqtt/src/plugin.rs
@@ -1,6 +1,5 @@
use async_trait::async_trait;
-use miette::Context;
use miette::IntoDiagnostic;
use tedge_api::address::Address;
use tedge_api::address::ReplySender;
@@ -80,26 +79,21 @@ impl Plugin for MqttPlugin {
debug!("Shutting down mqtt plugin!");
// try to shutdown internal mainloop
- let stop_err = if let Some(stopper) = self.stopper.take() {
+ if let Some(stopper) = self.stopper.take() {
stopper
.stop()
- .map_err(|e| miette::miette!("Failed to stop MQTT mainloop: {:?}", e))
- } else {
- Ok(())
- };
+ .map_err(|_| crate::error::Error::FailedToStopMqttMainloop)?;
+ }
// try to shutdown mqtt client
- let client_shutdown_err = if let Some(client) = self.client.take() {
+ if let Some(client) = self.client.take() {
client
.disconnect(None)
.await
- .map_err(|e| miette::miette!("Failed to disconnect MQTT client: {:?}", e))
- .map(|_| ())
- } else {
- Ok(())
- };
+ .map_err(|e| crate::error::Error::FailedToDisconnectMqttClient(e))?;
+ }
- crate::error::MqttShutdownError::build_for(client_shutdown_err, stop_err).into_diagnostic()
+ Ok(())
}
}
@@ -193,12 +187,13 @@ impl Handle<OutgoingMessage> for MqttPlugin {
client
.publish(msg)
.await
- .into_diagnostic()
- .context("Failed to publish message")?;
+ .map_err(crate::error::Error::FailedToPublish)
+ .into_diagnostic()?;
+
debug!("Publishing message succeeded");
Ok(())
} else {
- Err(miette::miette!("No client, cannot send messages"))?
+ return Err(crate::error::Error::NoClient).into_diagnostic();
}
}
}