diff options
author | initard <alex.solomes@softwareag.com> | 2022-07-29 12:20:57 +0100 |
---|---|---|
committer | initard <alex.solomes@softwareag.com> | 2022-07-29 13:49:47 +0100 |
commit | 791512e25329873b394fa3562247d41eed575303 (patch) | |
tree | ba818343465daeb92db0f5051f56f21c8738a8b2 | |
parent | 7e29063c59ab37d75333c68ead11cd369562d343 (diff) |
implementing inotify crate for c8y_log_plugin
Signed-off-by: initard <solo@softwareag.com>
-rw-r--r-- | Cargo.lock | 40 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 3 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 63 |
3 files changed, 56 insertions, 50 deletions
@@ -279,9 +279,9 @@ dependencies = [ [[package]] name = "bit-set" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e11e16035ea35e4e5997b393eacbf6f63983188f7a2ad25bfb13465f5ad59de" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" dependencies = [ "bit-vec", ] @@ -352,9 +352,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "c8y_api" @@ -429,7 +429,6 @@ dependencies = [ "easy_reader", "filetime", "glob", - "inotify", "mockall", "mqtt_channel", "serde", @@ -608,9 +607,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "1.2.3" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83827793632c72fa4f73c2edb31e7a997527dd8ffe7077344621fc62c5478157" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" dependencies = [ "cache-padded", ] @@ -682,9 +681,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c02a4d71819009c192cf4872265391563fd6a84c81ff2c0f2a7026ca4c1d85c" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ "cfg-if 1.0.0", "crossbeam-utils", @@ -692,9 +691,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" dependencies = [ "cfg-if 1.0.0", "crossbeam-epoch", @@ -703,9 +702,9 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07db9d94cbd326813772c968ccd25999e5f8ae22f4f8d1b11effa37ef6ce281d" +checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1" dependencies = [ "autocfg", "cfg-if 1.0.0", @@ -717,9 +716,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d82ee10ce34d7bc12c2122495e7593a9c41347ecdd64185af4ecf72cb1a7f83" +checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" dependencies = [ "cfg-if 1.0.0", "once_cell", @@ -2238,9 +2237,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.13" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" +checksum = "534cfe58d6a18cc17120fbf4635d53d14691c1fe4d951064df9bd326178d7d5a" dependencies = [ "bitflags", ] @@ -2686,9 +2685,12 @@ checksum = "2e24979f63a11545f5f2c60141afe249d4f19f84581ea2138065e400941d83d3" [[package]] name = "slab" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] [[package]] name = "smallvec" diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 8a35e792..93427810 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -23,12 +23,11 @@ clap = { version = "3.2", features = ["cargo", "derive"] } csv = "1.1" easy_reader = "0.5" glob = "0.3" -inotify = "0.10" mqtt_channel = { path = "../../crates/common/mqtt_channel" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } -tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging", "fs-notify"] } time = { version = "0.3" } thin_edge_json = { path = "../../crates/core/thin_edge_json" } thiserror = "1.0" diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index 5e7dcc4b..abcfa3a9 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -9,19 +9,20 @@ use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestReques use c8y_smartrest::topic::C8yTopic; use clap::Parser; -use config::LogPluginConfig; -use inotify::{EventMask, EventStream}; -use inotify::{Inotify, WatchMask}; use mqtt_channel::{Connection, Message, StreamExt, TopicFilter}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, DEFAULT_TEDGE_CONFIG_PATH, }; -use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use tedge_utils::{ + file::{create_directory_with_user_group, create_file_with_user_group}, + fs_notify::{fs_notify_stream, pin_mut, FileEvent}, +}; use thin_edge_json::health::{health_check_topics, send_health_status}; use tracing::{error, info}; +use crate::config::LogPluginConfig; use crate::logfile_request::{ handle_dynamic_log_type_update, handle_logfile_request_operation, read_log_config, }; @@ -82,42 +83,41 @@ pub async fn create_http_client( Ok(http_proxy) } -fn create_inofity_file_watch_stream( - config_file: &Path, -) -> Result<EventStream<[u8; 1024]>, anyhow::Error> { - let buffer = [0; 1024]; - let mut inotify = Inotify::init().expect("Error while initializing inotify instance"); - - inotify - .add_watch(&config_file, WatchMask::CLOSE_WRITE) - .expect("Failed to add file watch"); - - Ok(inotify.event_stream(buffer)?) -} - async fn run( - config_file: &Path, + config_dir: &Path, + config_file_name: &str, mqtt_client: &mut Connection, http_client: &mut JwtAuthHttpProxy, ) -> Result<(), anyhow::Error> { let mut plugin_config = LogPluginConfig::default(); - let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + let health_check_topics = health_check_topics("c8y-log-plugin"); + let config_file_path = config_dir.join(config_file_name); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + + let fs_notification_stream = fs_notify_stream(&[( + config_dir, + Some(config_file_name.to_string()), + &[FileEvent::Modified, FileEvent::Deleted, FileEvent::Created], + )])?; + pin_mut!(fs_notification_stream); loop { tokio::select! { message = mqtt_client.received.next() => { if let Some(message) = message { - process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?; + process_mqtt_message(message, &plugin_config, mqtt_client, http_client, &config_file_path, health_check_topics.clone()).await?; } else { // message is None and the connection has been closed return Ok(()) } } - Some(Ok(event)) = inotify_stream.next() => { - if event.mask == EventMask::CLOSE_WRITE { - plugin_config = read_log_config(config_file); - let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + Some(Ok((_path, mask))) = fs_notification_stream.next() => { + match mask { + FileEvent::Created | FileEvent::Deleted | FileEvent::Modified => { + plugin_config = read_log_config(&config_file_path); + let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; + } } } } @@ -173,13 +173,12 @@ pub async fn process_mqtt_message( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let config_plugin_opt = LogfileRequestPluginOpt::parse(); - let config_file = PathBuf::from(&format!( - "{}/{DEFAULT_PLUGIN_CONFIG_FILE}", + let config_dir = PathBuf::from( &config_plugin_opt .config_dir .to_str() - .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH) - )); + .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH), + ); tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug); @@ -201,7 +200,13 @@ async fn main() -> Result<(), anyhow::Error> { let mut mqtt_client = create_mqtt_client(&tedge_config).await?; let mut http_client = create_http_client(&tedge_config).await?; - let () = run(&config_file, &mut mqtt_client, &mut http_client).await?; + let () = run( + &config_dir, + DEFAULT_PLUGIN_CONFIG_FILE, + &mut mqtt_client, + &mut http_client, + ) + .await?; Ok(()) } |