summaryrefslogtreecommitdiffstats
path: root/plugins/c8y_log_plugin
diff options
context:
space:
mode:
authorinitard <alex.solomes@softwareag.com>2022-07-29 12:20:57 +0100
committerinitard <alex.solomes@softwareag.com>2022-07-29 13:49:47 +0100
commit791512e25329873b394fa3562247d41eed575303 (patch)
treeba818343465daeb92db0f5051f56f21c8738a8b2 /plugins/c8y_log_plugin
parent7e29063c59ab37d75333c68ead11cd369562d343 (diff)
implementing inotify crate for c8y_log_plugin
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'plugins/c8y_log_plugin')
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml3
-rw-r--r--plugins/c8y_log_plugin/src/main.rs63
2 files changed, 35 insertions, 31 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 8a35e792..93427810 100644
--- a/plugins/c8y_log_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -23,12 +23,11 @@ clap = { version = "3.2", features = ["cargo", "derive"] }
csv = "1.1"
easy_reader = "0.5"
glob = "0.3"
-inotify = "0.10"
mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
-tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging", "fs-notify"] }
time = { version = "0.3" }
thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index 5e7dcc4b..abcfa3a9 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -9,19 +9,20 @@ use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestReques
use c8y_smartrest::topic::C8yTopic;
use clap::Parser;
-use config::LogPluginConfig;
-use inotify::{EventMask, EventStream};
-use inotify::{Inotify, WatchMask};
use mqtt_channel::{Connection, Message, StreamExt, TopicFilter};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
DEFAULT_TEDGE_CONFIG_PATH,
};
-use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use tedge_utils::{
+ file::{create_directory_with_user_group, create_file_with_user_group},
+ fs_notify::{fs_notify_stream, pin_mut, FileEvent},
+};
use thin_edge_json::health::{health_check_topics, send_health_status};
use tracing::{error, info};
+use crate::config::LogPluginConfig;
use crate::logfile_request::{
handle_dynamic_log_type_update, handle_logfile_request_operation, read_log_config,
};
@@ -82,42 +83,41 @@ pub async fn create_http_client(
Ok(http_proxy)
}
-fn create_inofity_file_watch_stream(
- config_file: &Path,
-) -> Result<EventStream<[u8; 1024]>, anyhow::Error> {
- let buffer = [0; 1024];
- let mut inotify = Inotify::init().expect("Error while initializing inotify instance");
-
- inotify
- .add_watch(&config_file, WatchMask::CLOSE_WRITE)
- .expect("Failed to add file watch");
-
- Ok(inotify.event_stream(buffer)?)
-}
-
async fn run(
- config_file: &Path,
+ config_dir: &Path,
+ config_file_name: &str,
mqtt_client: &mut Connection,
http_client: &mut JwtAuthHttpProxy,
) -> Result<(), anyhow::Error> {
let mut plugin_config = LogPluginConfig::default();
- let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+
let health_check_topics = health_check_topics("c8y-log-plugin");
+ let config_file_path = config_dir.join(config_file_name);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+
+ let fs_notification_stream = fs_notify_stream(&[(
+ config_dir,
+ Some(config_file_name.to_string()),
+ &[FileEvent::Modified, FileEvent::Deleted, FileEvent::Created],
+ )])?;
+ pin_mut!(fs_notification_stream);
loop {
tokio::select! {
message = mqtt_client.received.next() => {
if let Some(message) = message {
- process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?;
+ process_mqtt_message(message, &plugin_config, mqtt_client, http_client, &config_file_path, health_check_topics.clone()).await?;
} else {
// message is None and the connection has been closed
return Ok(())
}
}
- Some(Ok(event)) = inotify_stream.next() => {
- if event.mask == EventMask::CLOSE_WRITE {
- plugin_config = read_log_config(config_file);
- let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ Some(Ok((_path, mask))) = fs_notification_stream.next() => {
+ match mask {
+ FileEvent::Created | FileEvent::Deleted | FileEvent::Modified => {
+ plugin_config = read_log_config(&config_file_path);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ }
}
}
}
@@ -173,13 +173,12 @@ pub async fn process_mqtt_message(
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let config_plugin_opt = LogfileRequestPluginOpt::parse();
- let config_file = PathBuf::from(&format!(
- "{}/{DEFAULT_PLUGIN_CONFIG_FILE}",
+ let config_dir = PathBuf::from(
&config_plugin_opt
.config_dir
.to_str()
- .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH)
- ));
+ .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH),
+ );
tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug);
@@ -201,7 +200,13 @@ async fn main() -> Result<(), anyhow::Error> {
let mut mqtt_client = create_mqtt_client(&tedge_config).await?;
let mut http_client = create_http_client(&tedge_config).await?;
- let () = run(&config_file, &mut mqtt_client, &mut http_client).await?;
+ let () = run(
+ &config_dir,
+ DEFAULT_PLUGIN_CONFIG_FILE,
+ &mut mqtt_client,
+ &mut http_client,
+ )
+ .await?;
Ok(())
}