diff options
author | initard <solo@softwareag.com> | 2022-06-24 12:14:58 +0100 |
---|---|---|
committer | initard <alex.solomes@softwareag.com> | 2022-07-20 09:17:48 +0100 |
commit | 589081115f943a1275464682a027b97d20e2d5f0 (patch) | |
tree | 0fdb36a64fb4079f5921a28432e1702f411519f4 /plugins | |
parent | ae947b3e64d0cb5684c171b9e15c522d16749d48 (diff) |
publishing supported log types if c8y bridge is up
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/c8y_log_plugin/src/logfile_request.rs | 12 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 24 |
2 files changed, 24 insertions, 12 deletions
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs index f88afec5..750f2acd 100644 --- a/plugins/c8y_log_plugin/src/logfile_request.rs +++ b/plugins/c8y_log_plugin/src/logfile_request.rs @@ -255,16 +255,18 @@ pub async fn handle_logfile_request_operation( } } +pub(crate) fn read_log_config(config_dir: &Path) -> LogPluginConfig { + LogPluginConfig::new(config_dir) +} + /// updates the log types on Cumulocity /// sends 118,typeA,typeB,... on mqtt pub async fn handle_dynamic_log_type_update( + plugin_config: &LogPluginConfig, mqtt_client: &mut Connection, - config_dir: &Path, -) -> Result<LogPluginConfig, anyhow::Error> { - let plugin_config = LogPluginConfig::new(config_dir); +) -> Result<(), anyhow::Error> { let msg = plugin_config.to_supported_config_types_message()?; - let () = mqtt_client.published.send(msg).await?; - Ok(plugin_config) + Ok(mqtt_client.published.send(msg).await?) } #[cfg(test)] diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index febd442f..9ca5066d 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -4,10 +4,12 @@ mod logfile_request; use anyhow::Result; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_api::utils::bridge::{is_c8y_bridge_up, C8Y_BRIDGE_HEALTH_TOPIC}; use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; use c8y_smartrest::topic::C8yTopic; use clap::Parser; +use config::LogPluginConfig; use inotify::{EventMask, EventStream}; use inotify::{Inotify, WatchMask}; use mqtt_channel::{Connection, StreamExt}; @@ -19,7 +21,9 @@ use tedge_config::{ use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; use tracing::{error, info}; -use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation}; +use crate::logfile_request::{ + handle_dynamic_log_type_update, handle_logfile_request_operation, read_log_config, +}; const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml"; const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`. @@ -55,11 +59,13 @@ async fn create_mqtt_client( tedge_config: &TEdgeConfig, ) -> Result<mqtt_channel::Connection, anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); + // subscribing also to c8y bridge health topic to know when the bridge is up + topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?; + let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) - .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( - C8yTopic::SmartRestRequest.as_str(), - )); + .with_subscriptions(topics); let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; Ok(mqtt_client) @@ -91,14 +97,17 @@ async fn run( mqtt_client: &mut Connection, http_client: &mut JwtAuthHttpProxy, ) -> Result<(), anyhow::Error> { - let mut plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?; - + let mut plugin_config = LogPluginConfig::default(); let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; loop { tokio::select! { message = mqtt_client.received.next() => { if let Some(message) = message { + if is_c8y_bridge_up(&message) { + plugin_config = read_log_config(config_file); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + } if let Ok(payload) = message.payload_str() { let result = match payload.split(',').next().unwrap_or_default() { "522" => { @@ -137,7 +146,8 @@ async fn run( } Some(Ok(event)) = inotify_stream.next() => { if event.mask == EventMask::CLOSE_WRITE { - plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + plugin_config = read_log_config(config_file); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; } } } |