summaryrefslogtreecommitdiffstats
path: root/plugins/c8y_log_plugin/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/c8y_log_plugin/src/main.rs')
-rw-r--r--plugins/c8y_log_plugin/src/main.rs94
1 files changed, 55 insertions, 39 deletions
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index 9ca5066d..5e7dcc4b 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -12,13 +12,14 @@ use clap::Parser;
use config::LogPluginConfig;
use inotify::{EventMask, EventStream};
use inotify::{Inotify, WatchMask};
-use mqtt_channel::{Connection, StreamExt};
+use mqtt_channel::{Connection, Message, StreamExt, TopicFilter};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
DEFAULT_TEDGE_CONFIG_PATH,
};
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use thin_edge_json::health::{health_check_topics, send_health_status};
use tracing::{error, info};
use crate::logfile_request::{
@@ -59,7 +60,9 @@ async fn create_mqtt_client(
tedge_config: &TEdgeConfig,
) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
- let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
+ let mut topics: TopicFilter = health_check_topics("c8y-log-plugin");
+
+ topics.add_unchecked(C8yTopic::SmartRestRequest.as_str());
// subscribing also to c8y bridge health topic to know when the bridge is up
topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?;
@@ -99,49 +102,16 @@ async fn run(
) -> Result<(), anyhow::Error> {
let mut plugin_config = LogPluginConfig::default();
let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+ let health_check_topics = health_check_topics("c8y-log-plugin");
loop {
tokio::select! {
message = mqtt_client.received.next() => {
if let Some(message) = message {
- if is_c8y_bridge_up(&message) {
- plugin_config = read_log_config(config_file);
- let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
- }
- if let Ok(payload) = message.payload_str() {
- let result = match payload.split(',').next().unwrap_or_default() {
- "522" => {
- info!("Log request received: {payload}");
- // retrieve smartrest object from payload
- let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload);
- if let Ok(smartrest_obj) = maybe_smartrest_obj {
- handle_logfile_request_operation(
- &smartrest_obj,
- &plugin_config,
- mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect SmartREST payload: {}", payload);
- Ok(())
- }
- }
- _ => {
- // Ignore operation messages not meant for this plugin
- Ok(())
- }
- };
-
- if let Err(err) = result {
- let error_message = format!("Handling of operation: '{}' failed with {}", payload, err);
- error!("{}", error_message);
- }
- }
- }
- else {
+ process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?;
+ } else {
// message is None and the connection has been closed
- return Ok(());
+ return Ok(())
}
}
Some(Ok(event)) = inotify_stream.next() => {
@@ -154,6 +124,52 @@ async fn run(
}
}
+pub async fn process_mqtt_message(
+ message: Message,
+ plugin_config: &LogPluginConfig,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+ config_file: &Path,
+ health_check_topics: TopicFilter,
+) -> Result<(), anyhow::Error> {
+ if is_c8y_bridge_up(&message) {
+ let plugin_config = read_log_config(config_file);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ } else if health_check_topics.accept(&message) {
+ send_health_status(&mut mqtt_client.published, "c8y-log-plugin").await;
+ } else if let Ok(payload) = message.payload_str() {
+ let result = match payload.split(',').next().unwrap_or_default() {
+ "522" => {
+ info!("Log request received: {payload}");
+ // retrieve smartrest object from payload
+ let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload);
+ if let Ok(smartrest_obj) = maybe_smartrest_obj {
+ handle_logfile_request_operation(
+ &smartrest_obj,
+ plugin_config,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect SmartREST payload: {}", payload);
+ Ok(())
+ }
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
+ Ok(())
+ }
+ };
+
+ if let Err(err) = result {
+ let error_message = format!("Handling of operation: '{}' failed with {}", payload, err);
+ error!("{}", error_message);
+ }
+ }
+ Ok(())
+}
+
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let config_plugin_opt = LogfileRequestPluginOpt::parse();