diff options
Diffstat (limited to 'plugins/c8y_log_plugin/src/main.rs')
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 94 |
1 files changed, 55 insertions, 39 deletions
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index 9ca5066d..5e7dcc4b 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -12,13 +12,14 @@ use clap::Parser; use config::LogPluginConfig; use inotify::{EventMask, EventStream}; use inotify::{Inotify, WatchMask}; -use mqtt_channel::{Connection, StreamExt}; +use mqtt_channel::{Connection, Message, StreamExt, TopicFilter}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, DEFAULT_TEDGE_CONFIG_PATH, }; use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use thin_edge_json::health::{health_check_topics, send_health_status}; use tracing::{error, info}; use crate::logfile_request::{ @@ -59,7 +60,9 @@ async fn create_mqtt_client( tedge_config: &TEdgeConfig, ) -> Result<mqtt_channel::Connection, anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); - let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); + let mut topics: TopicFilter = health_check_topics("c8y-log-plugin"); + + topics.add_unchecked(C8yTopic::SmartRestRequest.as_str()); // subscribing also to c8y bridge health topic to know when the bridge is up topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?; @@ -99,49 +102,16 @@ async fn run( ) -> Result<(), anyhow::Error> { let mut plugin_config = LogPluginConfig::default(); let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + let health_check_topics = health_check_topics("c8y-log-plugin"); loop { tokio::select! { message = mqtt_client.received.next() => { if let Some(message) = message { - if is_c8y_bridge_up(&message) { - plugin_config = read_log_config(config_file); - let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; - } - if let Ok(payload) = message.payload_str() { - let result = match payload.split(',').next().unwrap_or_default() { - "522" => { - info!("Log request received: {payload}"); - // retrieve smartrest object from payload - let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload); - if let Ok(smartrest_obj) = maybe_smartrest_obj { - handle_logfile_request_operation( - &smartrest_obj, - &plugin_config, - mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect SmartREST payload: {}", payload); - Ok(()) - } - } - _ => { - // Ignore operation messages not meant for this plugin - Ok(()) - } - }; - - if let Err(err) = result { - let error_message = format!("Handling of operation: '{}' failed with {}", payload, err); - error!("{}", error_message); - } - } - } - else { + process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?; + } else { // message is None and the connection has been closed - return Ok(()); + return Ok(()) } } Some(Ok(event)) = inotify_stream.next() => { @@ -154,6 +124,52 @@ async fn run( } } +pub async fn process_mqtt_message( + message: Message, + plugin_config: &LogPluginConfig, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, + config_file: &Path, + health_check_topics: TopicFilter, +) -> Result<(), anyhow::Error> { + if is_c8y_bridge_up(&message) { + let plugin_config = read_log_config(config_file); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + } else if health_check_topics.accept(&message) { + send_health_status(&mut mqtt_client.published, "c8y-log-plugin").await; + } else if let Ok(payload) = message.payload_str() { + let result = match payload.split(',').next().unwrap_or_default() { + "522" => { + info!("Log request received: {payload}"); + // retrieve smartrest object from payload + let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload); + if let Ok(smartrest_obj) = maybe_smartrest_obj { + handle_logfile_request_operation( + &smartrest_obj, + plugin_config, + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect SmartREST payload: {}", payload); + Ok(()) + } + } + _ => { + // Ignore operation messages not meant for this plugin + Ok(()) + } + }; + + if let Err(err) = result { + let error_message = format!("Handling of operation: '{}' failed with {}", payload, err); + error!("{}", error_message); + } + } + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let config_plugin_opt = LogfileRequestPluginOpt::parse(); |