diff options
author | PradeepKiruvale <pradeepkumar.kj@softwareag.com> | 2022-07-29 12:55:03 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-29 12:55:03 +0530 |
commit | 64d300de6b872821e9f435908b941bdd5f191df1 (patch) | |
tree | 88b7925f743337cf8f05b1d90f92bfae51f6d0e9 /plugins | |
parent | 7305240fbc3578441aaec1a41e7a3faa264c9efb (diff) |
MQTT health endpoints for tedge plugin extensions (#1299)
* tedge watchdog for c8y-log-plugin and c8y-config-plugin
This PR also refactors the health check by removing the duplicate code. Pushed the duplicate code to one place and
reused it across all the thin-edge services.
Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/c8y_configuration_plugin/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 129 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 94 |
4 files changed, 134 insertions, 91 deletions
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index 8b0eeb50..b54ff0f0 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -26,6 +26,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +thin_edge_json = { path = "../../crates/core/thin_edge_json" } thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index 173ebd3c..665d9f1c 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -13,13 +13,15 @@ use c8y_smartrest::smartrest_deserializer::{ }; use c8y_smartrest::topic::C8yTopic; use clap::Parser; -use mqtt_channel::{Message, SinkExt, StreamExt, Topic}; +use mqtt_channel::{Connection, Message, SinkExt, StreamExt, Topic}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, MqttPortSetting, TEdgeConfig, TmpPathSetting, DEFAULT_TEDGE_CONFIG_PATH, }; use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use thin_edge_json::health::{health_check_topics, send_health_status}; + use tracing::{debug, error, info}; pub const DEFAULT_PLUGIN_CONFIG_FILE_PATH: &str = "/etc/tedge/c8y/c8y-configuration-plugin.toml"; @@ -66,6 +68,7 @@ async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); let _ = topic_filter .add_unchecked(format!("{CONFIG_CHANGE_TOPIC}/{DEFAULT_PLUGIN_CONFIG_TYPE}").as_str()); + let _ = topic_filter.add_all(health_check_topics("c8y-configuration-plugin")); let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) @@ -135,60 +138,85 @@ async fn run( let () = mqtt_client.published.send(msg).await?; // Mqtt message loop + process_mqtt_message( + &mut plugin_config, + &mut mqtt_client, + config_file_path, + http_client, + tmp_dir, + ) + .await?; + + mqtt_client.close().await; + + Ok(()) +} + +async fn process_mqtt_message( + plugin_config: &mut PluginConfig, + mqtt_client: &mut Connection, + config_file_path: &Path, + http_client: &mut impl C8YHttpProxy, + tmp_dir: PathBuf, +) -> Result<(), anyhow::Error> { + let health_check_topics = health_check_topics("c8y-configuration-plugin"); while let Some(message) = mqtt_client.received.next().await { debug!("Received {:?}", message); - if let Ok(payload) = message.payload_str() { - let result = if let "tedge/configuration_change/c8y-configuration-plugin" = - message.topic.name.as_str() - { - // Reload the plugin config file - plugin_config = PluginConfig::new(config_file_path); - // Resend the supported config types - let msg = plugin_config.to_supported_config_types_message()?; - mqtt_client.published.send(msg).await?; - Ok(()) - } else { - match payload.split(',').next().unwrap_or_default() { - "524" => { - let maybe_config_download_request = - SmartRestConfigDownloadRequest::from_smartrest(payload); - if let Ok(config_download_request) = maybe_config_download_request { - handle_config_download_request( - &plugin_config, - config_download_request, - tmp_dir.clone(), - &mut mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect Download SmartREST payload: {}", payload); - Ok(()) + if health_check_topics.accept(&message) { + send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await; + } else if let Ok(payload) = message.payload_str() { + let result = match message.topic.name.as_str() { + "tedge/configuration_change/c8y-configuration-plugin" => { + // Reload the plugin config file + let plugin_config = PluginConfig::new(config_file_path); + // Resend the supported config types + let msg = plugin_config.to_supported_config_types_message()?; + mqtt_client.published.send(msg).await?; + Ok(()) + } + _ => { + match payload.split(',').next().unwrap_or_default() { + "524" => { + let maybe_config_download_request = + SmartRestConfigDownloadRequest::from_smartrest(payload); + if let Ok(config_download_request) = maybe_config_download_request { + handle_config_download_request( + plugin_config, + config_download_request, + tmp_dir.clone(), + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect Download SmartREST payload: {}", payload); + Ok(()) + } } - } - "526" => { - // retrieve config file upload smartrest request from payload - let maybe_config_upload_request = - SmartRestConfigUploadRequest::from_smartrest(payload); - - if let Ok(config_upload_request) = maybe_config_upload_request { - // handle the config file upload request - handle_config_upload_request( - &plugin_config, - config_upload_request, - &mut mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect Upload SmartREST payload: {}", payload); + "526" => { + // retrieve config file upload smartrest request from payload + let maybe_config_upload_request = + SmartRestConfigUploadRequest::from_smartrest(payload); + + if let Ok(config_upload_request) = maybe_config_upload_request { + // handle the config file upload request + handle_config_upload_request( + plugin_config, + config_upload_request, + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect Upload SmartREST payload: {}", payload); + Ok(()) + } + } + _ => { + // Ignore operation messages not meant for this plugin Ok(()) } } - _ => { - // Ignore operation messages not meant for this plugin - Ok(()) - } } }; @@ -197,9 +225,6 @@ async fn run( } } } - - mqtt_client.close().await; - Ok(()) } diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 884add27..8a35e792 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -30,6 +30,7 @@ serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } time = { version = "0.3" } +thin_edge_json = { path = "../../crates/core/thin_edge_json" } thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index 9ca5066d..5e7dcc4b 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -12,13 +12,14 @@ use clap::Parser; use config::LogPluginConfig; use inotify::{EventMask, EventStream}; use inotify::{Inotify, WatchMask}; -use mqtt_channel::{Connection, StreamExt}; +use mqtt_channel::{Connection, Message, StreamExt, TopicFilter}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, DEFAULT_TEDGE_CONFIG_PATH, }; use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use thin_edge_json::health::{health_check_topics, send_health_status}; use tracing::{error, info}; use crate::logfile_request::{ @@ -59,7 +60,9 @@ async fn create_mqtt_client( tedge_config: &TEdgeConfig, ) -> Result<mqtt_channel::Connection, anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); - let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); + let mut topics: TopicFilter = health_check_topics("c8y-log-plugin"); + + topics.add_unchecked(C8yTopic::SmartRestRequest.as_str()); // subscribing also to c8y bridge health topic to know when the bridge is up topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?; @@ -99,49 +102,16 @@ async fn run( ) -> Result<(), anyhow::Error> { let mut plugin_config = LogPluginConfig::default(); let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + let health_check_topics = health_check_topics("c8y-log-plugin"); loop { tokio::select! { message = mqtt_client.received.next() => { if let Some(message) = message { - if is_c8y_bridge_up(&message) { - plugin_config = read_log_config(config_file); - let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; - } - if let Ok(payload) = message.payload_str() { - let result = match payload.split(',').next().unwrap_or_default() { - "522" => { - info!("Log request received: {payload}"); - // retrieve smartrest object from payload - let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload); - if let Ok(smartrest_obj) = maybe_smartrest_obj { - handle_logfile_request_operation( - &smartrest_obj, - &plugin_config, - mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect SmartREST payload: {}", payload); - Ok(()) - } - } - _ => { - // Ignore operation messages not meant for this plugin - Ok(()) - } - }; - - if let Err(err) = result { - let error_message = format!("Handling of operation: '{}' failed with {}", payload, err); - error!("{}", error_message); - } - } - } - else { + process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?; + } else { // message is None and the connection has been closed - return Ok(()); + return Ok(()) } } Some(Ok(event)) = inotify_stream.next() => { @@ -154,6 +124,52 @@ async fn run( } } +pub async fn process_mqtt_message( + message: Message, + plugin_config: &LogPluginConfig, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, + config_file: &Path, + health_check_topics: TopicFilter, +) -> Result<(), anyhow::Error> { + if is_c8y_bridge_up(&message) { + let plugin_config = read_log_config(config_file); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + } else if health_check_topics.accept(&message) { + send_health_status(&mut mqtt_client.published, "c8y-log-plugin").await; + } else if let Ok(payload) = message.payload_str() { + let result = match payload.split(',').next().unwrap_or_default() { + "522" => { + info!("Log request received: {payload}"); + // retrieve smartrest object from payload + let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload); + if let Ok(smartrest_obj) = maybe_smartrest_obj { + handle_logfile_request_operation( + &smartrest_obj, + plugin_config, + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect SmartREST payload: {}", payload); + Ok(()) + } + } + _ => { + // Ignore operation messages not meant for this plugin + Ok(()) + } + }; + + if let Err(err) = result { + let error_message = format!("Handling of operation: '{}' failed with {}", payload, err); + error!("{}", error_message); + } + } + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let config_plugin_opt = LogfileRequestPluginOpt::parse(); |