summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-06-24 12:14:58 +0100
committerinitard <alex.solomes@softwareag.com>2022-07-20 09:17:48 +0100
commit589081115f943a1275464682a027b97d20e2d5f0 (patch)
tree0fdb36a64fb4079f5921a28432e1702f411519f4 /plugins
parentae947b3e64d0cb5684c171b9e15c522d16749d48 (diff)
publishing supported log types if c8y bridge is up
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/c8y_log_plugin/src/logfile_request.rs12
-rw-r--r--plugins/c8y_log_plugin/src/main.rs24
2 files changed, 24 insertions, 12 deletions
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs
index f88afec5..750f2acd 100644
--- a/plugins/c8y_log_plugin/src/logfile_request.rs
+++ b/plugins/c8y_log_plugin/src/logfile_request.rs
@@ -255,16 +255,18 @@ pub async fn handle_logfile_request_operation(
}
}
+pub(crate) fn read_log_config(config_dir: &Path) -> LogPluginConfig {
+ LogPluginConfig::new(config_dir)
+}
+
/// updates the log types on Cumulocity
/// sends 118,typeA,typeB,... on mqtt
pub async fn handle_dynamic_log_type_update(
+ plugin_config: &LogPluginConfig,
mqtt_client: &mut Connection,
- config_dir: &Path,
-) -> Result<LogPluginConfig, anyhow::Error> {
- let plugin_config = LogPluginConfig::new(config_dir);
+) -> Result<(), anyhow::Error> {
let msg = plugin_config.to_supported_config_types_message()?;
- let () = mqtt_client.published.send(msg).await?;
- Ok(plugin_config)
+ Ok(mqtt_client.published.send(msg).await?)
}
#[cfg(test)]
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index febd442f..9ca5066d 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -4,10 +4,12 @@ mod logfile_request;
use anyhow::Result;
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_api::utils::bridge::{is_c8y_bridge_up, C8Y_BRIDGE_HEALTH_TOPIC};
use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
use c8y_smartrest::topic::C8yTopic;
use clap::Parser;
+use config::LogPluginConfig;
use inotify::{EventMask, EventStream};
use inotify::{Inotify, WatchMask};
use mqtt_channel::{Connection, StreamExt};
@@ -19,7 +21,9 @@ use tedge_config::{
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
use tracing::{error, info};
-use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation};
+use crate::logfile_request::{
+ handle_dynamic_log_type_update, handle_logfile_request_operation, read_log_config,
+};
const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml";
const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`.
@@ -55,11 +59,13 @@ async fn create_mqtt_client(
tedge_config: &TEdgeConfig,
) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
+ // subscribing also to c8y bridge health topic to know when the bridge is up
+ topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?;
+
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
- .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
- C8yTopic::SmartRestRequest.as_str(),
- ));
+ .with_subscriptions(topics);
let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
Ok(mqtt_client)
@@ -91,14 +97,17 @@ async fn run(
mqtt_client: &mut Connection,
http_client: &mut JwtAuthHttpProxy,
) -> Result<(), anyhow::Error> {
- let mut plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
-
+ let mut plugin_config = LogPluginConfig::default();
let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
loop {
tokio::select! {
message = mqtt_client.received.next() => {
if let Some(message) = message {
+ if is_c8y_bridge_up(&message) {
+ plugin_config = read_log_config(config_file);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ }
if let Ok(payload) = message.payload_str() {
let result = match payload.split(',').next().unwrap_or_default() {
"522" => {
@@ -137,7 +146,8 @@ async fn run(
}
Some(Ok(event)) = inotify_stream.next() => {
if event.mask == EventMask::CLOSE_WRITE {
- plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+ plugin_config = read_log_config(config_file);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
}
}
}