diff options
author | initard <alex.solomes@softwareag.com> | 2022-07-29 12:20:57 +0100 |
---|---|---|
committer | initard <alex.solomes@softwareag.com> | 2022-07-29 13:49:47 +0100 |
commit | 791512e25329873b394fa3562247d41eed575303 (patch) | |
tree | ba818343465daeb92db0f5051f56f21c8738a8b2 /plugins/c8y_log_plugin | |
parent | 7e29063c59ab37d75333c68ead11cd369562d343 (diff) |
implementing inotify crate for c8y_log_plugin
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'plugins/c8y_log_plugin')
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 3 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 63 |
2 files changed, 35 insertions, 31 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 8a35e792..93427810 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -23,12 +23,11 @@ clap = { version = "3.2", features = ["cargo", "derive"] } csv = "1.1" easy_reader = "0.5" glob = "0.3" -inotify = "0.10" mqtt_channel = { path = "../../crates/common/mqtt_channel" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } -tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging", "fs-notify"] } time = { version = "0.3" } thin_edge_json = { path = "../../crates/core/thin_edge_json" } thiserror = "1.0" diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index 5e7dcc4b..abcfa3a9 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -9,19 +9,20 @@ use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestReques use c8y_smartrest::topic::C8yTopic; use clap::Parser; -use config::LogPluginConfig; -use inotify::{EventMask, EventStream}; -use inotify::{Inotify, WatchMask}; use mqtt_channel::{Connection, Message, StreamExt, TopicFilter}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, DEFAULT_TEDGE_CONFIG_PATH, }; -use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use tedge_utils::{ + file::{create_directory_with_user_group, create_file_with_user_group}, + fs_notify::{fs_notify_stream, pin_mut, FileEvent}, +}; use thin_edge_json::health::{health_check_topics, send_health_status}; use tracing::{error, info}; +use crate::config::LogPluginConfig; use crate::logfile_request::{ handle_dynamic_log_type_update, handle_logfile_request_operation, read_log_config, }; @@ -82,42 +83,41 @@ pub async fn create_http_client( Ok(http_proxy) } -fn create_inofity_file_watch_stream( - config_file: &Path, -) -> Result<EventStream<[u8; 1024]>, anyhow::Error> { - let buffer = [0; 1024]; - let mut inotify = Inotify::init().expect("Error while initializing inotify instance"); - - inotify - .add_watch(&config_file, WatchMask::CLOSE_WRITE) - .expect("Failed to add file watch"); - - Ok(inotify.event_stream(buffer)?) -} - async fn run( - config_file: &Path, + config_dir: &Path, + config_file_name: &str, mqtt_client: &mut Connection, http_client: &mut JwtAuthHttpProxy, ) -> Result<(), anyhow::Error> { let mut plugin_config = LogPluginConfig::default(); - let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + let health_check_topics = health_check_topics("c8y-log-plugin"); + let config_file_path = config_dir.join(config_file_name); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + + let fs_notification_stream = fs_notify_stream(&[( + config_dir, + Some(config_file_name.to_string()), + &[FileEvent::Modified, FileEvent::Deleted, FileEvent::Created], + )])?; + pin_mut!(fs_notification_stream); loop { tokio::select! { message = mqtt_client.received.next() => { if let Some(message) = message { - process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?; + process_mqtt_message(message, &plugin_config, mqtt_client, http_client, &config_file_path, health_check_topics.clone()).await?; } else { // message is None and the connection has been closed return Ok(()) } } - Some(Ok(event)) = inotify_stream.next() => { - if event.mask == EventMask::CLOSE_WRITE { - plugin_config = read_log_config(config_file); - let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + Some(Ok((_path, mask))) = fs_notification_stream.next() => { + match mask { + FileEvent::Created | FileEvent::Deleted | FileEvent::Modified => { + plugin_config = read_log_config(&config_file_path); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + } } } } @@ -173,13 +173,12 @@ pub async fn process_mqtt_message( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let config_plugin_opt = LogfileRequestPluginOpt::parse(); - let config_file = PathBuf::from(&format!( - "{}/{DEFAULT_PLUGIN_CONFIG_FILE}", + let config_dir = PathBuf::from( &config_plugin_opt .config_dir .to_str() - .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH) - )); + .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH), + ); tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug); @@ -201,7 +200,13 @@ async fn main() -> Result<(), anyhow::Error> { let mut mqtt_client = create_mqtt_client(&tedge_config).await?; let mut http_client = create_http_client(&tedge_config).await?; - let () = run(&config_file, &mut mqtt_client, &mut http_client).await?; + let () = run( + &config_dir, + DEFAULT_PLUGIN_CONFIG_FILE, + &mut mqtt_client, + &mut http_client, + ) + .await?; Ok(()) } |