summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-07-29 12:55:03 +0530
committerGitHub <noreply@github.com>2022-07-29 12:55:03 +0530
commit64d300de6b872821e9f435908b941bdd5f191df1 (patch)
tree88b7925f743337cf8f05b1d90f92bfae51f6d0e9 /plugins
parent7305240fbc3578441aaec1a41e7a3faa264c9efb (diff)
MQTT health endpoints for tedge plugin extensions (#1299)
* tedge watchdog for c8y-log-plugin and c8y-config-plugin This PR also refactors the health check by removing the duplicate code. Pushed the duplicate code to one place and reused it across all the thin-edge services. Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/c8y_configuration_plugin/Cargo.toml1
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs129
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml1
-rw-r--r--plugins/c8y_log_plugin/src/main.rs94
4 files changed, 134 insertions, 91 deletions
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml
index 8b0eeb50..b54ff0f0 100644
--- a/plugins/c8y_configuration_plugin/Cargo.toml
+++ b/plugins/c8y_configuration_plugin/Cargo.toml
@@ -26,6 +26,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index 173ebd3c..665d9f1c 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -13,13 +13,15 @@ use c8y_smartrest::smartrest_deserializer::{
};
use c8y_smartrest::topic::C8yTopic;
use clap::Parser;
-use mqtt_channel::{Message, SinkExt, StreamExt, Topic};
+use mqtt_channel::{Connection, Message, SinkExt, StreamExt, Topic};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, MqttPortSetting, TEdgeConfig, TmpPathSetting,
DEFAULT_TEDGE_CONFIG_PATH,
};
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use thin_edge_json::health::{health_check_topics, send_health_status};
+
use tracing::{debug, error, info};
pub const DEFAULT_PLUGIN_CONFIG_FILE_PATH: &str = "/etc/tedge/c8y/c8y-configuration-plugin.toml";
@@ -66,6 +68,7 @@ async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection,
mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
let _ = topic_filter
.add_unchecked(format!("{CONFIG_CHANGE_TOPIC}/{DEFAULT_PLUGIN_CONFIG_TYPE}").as_str());
+ let _ = topic_filter.add_all(health_check_topics("c8y-configuration-plugin"));
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
@@ -135,60 +138,85 @@ async fn run(
let () = mqtt_client.published.send(msg).await?;
// Mqtt message loop
+ process_mqtt_message(
+ &mut plugin_config,
+ &mut mqtt_client,
+ config_file_path,
+ http_client,
+ tmp_dir,
+ )
+ .await?;
+
+ mqtt_client.close().await;
+
+ Ok(())
+}
+
+async fn process_mqtt_message(
+ plugin_config: &mut PluginConfig,
+ mqtt_client: &mut Connection,
+ config_file_path: &Path,
+ http_client: &mut impl C8YHttpProxy,
+ tmp_dir: PathBuf,
+) -> Result<(), anyhow::Error> {
+ let health_check_topics = health_check_topics("c8y-configuration-plugin");
while let Some(message) = mqtt_client.received.next().await {
debug!("Received {:?}", message);
- if let Ok(payload) = message.payload_str() {
- let result = if let "tedge/configuration_change/c8y-configuration-plugin" =
- message.topic.name.as_str()
- {
- // Reload the plugin config file
- plugin_config = PluginConfig::new(config_file_path);
- // Resend the supported config types
- let msg = plugin_config.to_supported_config_types_message()?;
- mqtt_client.published.send(msg).await?;
- Ok(())
- } else {
- match payload.split(',').next().unwrap_or_default() {
- "524" => {
- let maybe_config_download_request =
- SmartRestConfigDownloadRequest::from_smartrest(payload);
- if let Ok(config_download_request) = maybe_config_download_request {
- handle_config_download_request(
- &plugin_config,
- config_download_request,
- tmp_dir.clone(),
- &mut mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect Download SmartREST payload: {}", payload);
- Ok(())
+ if health_check_topics.accept(&message) {
+ send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await;
+ } else if let Ok(payload) = message.payload_str() {
+ let result = match message.topic.name.as_str() {
+ "tedge/configuration_change/c8y-configuration-plugin" => {
+ // Reload the plugin config file
+ let plugin_config = PluginConfig::new(config_file_path);
+ // Resend the supported config types
+ let msg = plugin_config.to_supported_config_types_message()?;
+ mqtt_client.published.send(msg).await?;
+ Ok(())
+ }
+ _ => {
+ match payload.split(',').next().unwrap_or_default() {
+ "524" => {
+ let maybe_config_download_request =
+ SmartRestConfigDownloadRequest::from_smartrest(payload);
+ if let Ok(config_download_request) = maybe_config_download_request {
+ handle_config_download_request(
+ plugin_config,
+ config_download_request,
+ tmp_dir.clone(),
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect Download SmartREST payload: {}", payload);
+ Ok(())
+ }
}
- }
- "526" => {
- // retrieve config file upload smartrest request from payload
- let maybe_config_upload_request =
- SmartRestConfigUploadRequest::from_smartrest(payload);
-
- if let Ok(config_upload_request) = maybe_config_upload_request {
- // handle the config file upload request
- handle_config_upload_request(
- &plugin_config,
- config_upload_request,
- &mut mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect Upload SmartREST payload: {}", payload);
+ "526" => {
+ // retrieve config file upload smartrest request from payload
+ let maybe_config_upload_request =
+ SmartRestConfigUploadRequest::from_smartrest(payload);
+
+ if let Ok(config_upload_request) = maybe_config_upload_request {
+ // handle the config file upload request
+ handle_config_upload_request(
+ plugin_config,
+ config_upload_request,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect Upload SmartREST payload: {}", payload);
+ Ok(())
+ }
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
Ok(())
}
}
- _ => {
- // Ignore operation messages not meant for this plugin
- Ok(())
- }
}
};
@@ -197,9 +225,6 @@ async fn run(
}
}
}
-
- mqtt_client.close().await;
-
Ok(())
}
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 884add27..8a35e792 100644
--- a/plugins/c8y_log_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -30,6 +30,7 @@ serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
time = { version = "0.3" }
+thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index 9ca5066d..5e7dcc4b 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -12,13 +12,14 @@ use clap::Parser;
use config::LogPluginConfig;
use inotify::{EventMask, EventStream};
use inotify::{Inotify, WatchMask};
-use mqtt_channel::{Connection, StreamExt};
+use mqtt_channel::{Connection, Message, StreamExt, TopicFilter};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
DEFAULT_TEDGE_CONFIG_PATH,
};
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use thin_edge_json::health::{health_check_topics, send_health_status};
use tracing::{error, info};
use crate::logfile_request::{
@@ -59,7 +60,9 @@ async fn create_mqtt_client(
tedge_config: &TEdgeConfig,
) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
- let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
+ let mut topics: TopicFilter = health_check_topics("c8y-log-plugin");
+
+ topics.add_unchecked(C8yTopic::SmartRestRequest.as_str());
// subscribing also to c8y bridge health topic to know when the bridge is up
topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?;
@@ -99,49 +102,16 @@ async fn run(
) -> Result<(), anyhow::Error> {
let mut plugin_config = LogPluginConfig::default();
let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+ let health_check_topics = health_check_topics("c8y-log-plugin");
loop {
tokio::select! {
message = mqtt_client.received.next() => {
if let Some(message) = message {
- if is_c8y_bridge_up(&message) {
- plugin_config = read_log_config(config_file);
- let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
- }
- if let Ok(payload) = message.payload_str() {
- let result = match payload.split(',').next().unwrap_or_default() {
- "522" => {
- info!("Log request received: {payload}");
- // retrieve smartrest object from payload
- let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload);
- if let Ok(smartrest_obj) = maybe_smartrest_obj {
- handle_logfile_request_operation(
- &smartrest_obj,
- &plugin_config,
- mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect SmartREST payload: {}", payload);
- Ok(())
- }
- }
- _ => {
- // Ignore operation messages not meant for this plugin
- Ok(())
- }
- };
-
- if let Err(err) = result {
- let error_message = format!("Handling of operation: '{}' failed with {}", payload, err);
- error!("{}", error_message);
- }
- }
- }
- else {
+ process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?;
+ } else {
// message is None and the connection has been closed
- return Ok(());
+ return Ok(())
}
}
Some(Ok(event)) = inotify_stream.next() => {
@@ -154,6 +124,52 @@ async fn run(
}
}
+pub async fn process_mqtt_message(
+ message: Message,
+ plugin_config: &LogPluginConfig,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+ config_file: &Path,
+ health_check_topics: TopicFilter,
+) -> Result<(), anyhow::Error> {
+ if is_c8y_bridge_up(&message) {
+ let plugin_config = read_log_config(config_file);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ } else if health_check_topics.accept(&message) {
+ send_health_status(&mut mqtt_client.published, "c8y-log-plugin").await;
+ } else if let Ok(payload) = message.payload_str() {
+ let result = match payload.split(',').next().unwrap_or_default() {
+ "522" => {
+ info!("Log request received: {payload}");
+ // retrieve smartrest object from payload
+ let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload);
+ if let Ok(smartrest_obj) = maybe_smartrest_obj {
+ handle_logfile_request_operation(
+ &smartrest_obj,
+ plugin_config,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect SmartREST payload: {}", payload);
+ Ok(())
+ }
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
+ Ok(())
+ }
+ };
+
+ if let Err(err) = result {
+ let error_message = format!("Handling of operation: '{}' failed with {}", payload, err);
+ error!("{}", error_message);
+ }
+ }
+ Ok(())
+}
+
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let config_plugin_opt = LogfileRequestPluginOpt::parse();