summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinitard <alex.solomes@softwareag.com>2022-07-29 12:20:57 +0100
committerinitard <alex.solomes@softwareag.com>2022-07-29 13:49:47 +0100
commit791512e25329873b394fa3562247d41eed575303 (patch)
treeba818343465daeb92db0f5051f56f21c8738a8b2
parent7e29063c59ab37d75333c68ead11cd369562d343 (diff)
implementing inotify crate for c8y_log_plugin
Signed-off-by: initard <solo@softwareag.com>
-rw-r--r--Cargo.lock40
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml3
-rw-r--r--plugins/c8y_log_plugin/src/main.rs63
3 files changed, 56 insertions, 50 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 042a2c4f..6cfb3a7f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -279,9 +279,9 @@ dependencies = [
[[package]]
name = "bit-set"
-version = "0.5.2"
+version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e11e16035ea35e4e5997b393eacbf6f63983188f7a2ad25bfb13465f5ad59de"
+checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1"
dependencies = [
"bit-vec",
]
@@ -352,9 +352,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
-version = "1.1.0"
+version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
+checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e"
[[package]]
name = "c8y_api"
@@ -429,7 +429,6 @@ dependencies = [
"easy_reader",
"filetime",
"glob",
- "inotify",
"mockall",
"mqtt_channel",
"serde",
@@ -608,9 +607,9 @@ dependencies = [
[[package]]
name = "concurrent-queue"
-version = "1.2.3"
+version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "83827793632c72fa4f73c2edb31e7a997527dd8ffe7077344621fc62c5478157"
+checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
@@ -682,9 +681,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
-version = "0.5.5"
+version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4c02a4d71819009c192cf4872265391563fd6a84c81ff2c0f2a7026ca4c1d85c"
+checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
@@ -692,9 +691,9 @@ dependencies = [
[[package]]
name = "crossbeam-deque"
-version = "0.8.1"
+version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
+checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-epoch",
@@ -703,9 +702,9 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
-version = "0.9.9"
+version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "07db9d94cbd326813772c968ccd25999e5f8ae22f4f8d1b11effa37ef6ce281d"
+checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1"
dependencies = [
"autocfg",
"cfg-if 1.0.0",
@@ -717,9 +716,9 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
-version = "0.8.10"
+version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7d82ee10ce34d7bc12c2122495e7593a9c41347ecdd64185af4ecf72cb1a7f83"
+checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
@@ -2238,9 +2237,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
-version = "0.2.13"
+version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42"
+checksum = "534cfe58d6a18cc17120fbf4635d53d14691c1fe4d951064df9bd326178d7d5a"
dependencies = [
"bitflags",
]
@@ -2686,9 +2685,12 @@ checksum = "2e24979f63a11545f5f2c60141afe249d4f19f84581ea2138065e400941d83d3"
[[package]]
name = "slab"
-version = "0.4.6"
+version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
+checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
+dependencies = [
+ "autocfg",
+]
[[package]]
name = "smallvec"
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 8a35e792..93427810 100644
--- a/plugins/c8y_log_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -23,12 +23,11 @@ clap = { version = "3.2", features = ["cargo", "derive"] }
csv = "1.1"
easy_reader = "0.5"
glob = "0.3"
-inotify = "0.10"
mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
-tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging", "fs-notify"] }
time = { version = "0.3" }
thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index 5e7dcc4b..abcfa3a9 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -9,19 +9,20 @@ use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestReques
use c8y_smartrest::topic::C8yTopic;
use clap::Parser;
-use config::LogPluginConfig;
-use inotify::{EventMask, EventStream};
-use inotify::{Inotify, WatchMask};
use mqtt_channel::{Connection, Message, StreamExt, TopicFilter};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
DEFAULT_TEDGE_CONFIG_PATH,
};
-use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use tedge_utils::{
+ file::{create_directory_with_user_group, create_file_with_user_group},
+ fs_notify::{fs_notify_stream, pin_mut, FileEvent},
+};
use thin_edge_json::health::{health_check_topics, send_health_status};
use tracing::{error, info};
+use crate::config::LogPluginConfig;
use crate::logfile_request::{
handle_dynamic_log_type_update, handle_logfile_request_operation, read_log_config,
};
@@ -82,42 +83,41 @@ pub async fn create_http_client(
Ok(http_proxy)
}
-fn create_inofity_file_watch_stream(
- config_file: &Path,
-) -> Result<EventStream<[u8; 1024]>, anyhow::Error> {
- let buffer = [0; 1024];
- let mut inotify = Inotify::init().expect("Error while initializing inotify instance");
-
- inotify
- .add_watch(&config_file, WatchMask::CLOSE_WRITE)
- .expect("Failed to add file watch");
-
- Ok(inotify.event_stream(buffer)?)
-}
-
async fn run(
- config_file: &Path,
+ config_dir: &Path,
+ config_file_name: &str,
mqtt_client: &mut Connection,
http_client: &mut JwtAuthHttpProxy,
) -> Result<(), anyhow::Error> {
let mut plugin_config = LogPluginConfig::default();
- let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+
let health_check_topics = health_check_topics("c8y-log-plugin");
+ let config_file_path = config_dir.join(config_file_name);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+
+ let fs_notification_stream = fs_notify_stream(&[(
+ config_dir,
+ Some(config_file_name.to_string()),
+ &[FileEvent::Modified, FileEvent::Deleted, FileEvent::Created],
+ )])?;
+ pin_mut!(fs_notification_stream);
loop {
tokio::select! {
message = mqtt_client.received.next() => {
if let Some(message) = message {
- process_mqtt_message(message, &plugin_config, mqtt_client, http_client, config_file, health_check_topics.clone()).await?;
+ process_mqtt_message(message, &plugin_config, mqtt_client, http_client, &config_file_path, health_check_topics.clone()).await?;
} else {
// message is None and the connection has been closed
return Ok(())
}
}
- Some(Ok(event)) = inotify_stream.next() => {
- if event.mask == EventMask::CLOSE_WRITE {
- plugin_config = read_log_config(config_file);
- let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ Some(Ok((_path, mask))) = fs_notification_stream.next() => {
+ match mask {
+ FileEvent::Created | FileEvent::Deleted | FileEvent::Modified => {
+ plugin_config = read_log_config(&config_file_path);
+ let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
+ }
}
}
}
@@ -173,13 +173,12 @@ pub async fn process_mqtt_message(
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let config_plugin_opt = LogfileRequestPluginOpt::parse();
- let config_file = PathBuf::from(&format!(
- "{}/{DEFAULT_PLUGIN_CONFIG_FILE}",
+ let config_dir = PathBuf::from(
&config_plugin_opt
.config_dir
.to_str()
- .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH)
- ));
+ .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH),
+ );
tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug);
@@ -201,7 +200,13 @@ async fn main() -> Result<(), anyhow::Error> {
let mut mqtt_client = create_mqtt_client(&tedge_config).await?;
let mut http_client = create_http_client(&tedge_config).await?;
- let () = run(&config_file, &mut mqtt_client, &mut http_client).await?;
+ let () = run(
+ &config_dir,
+ DEFAULT_PLUGIN_CONFIG_FILE,
+ &mut mqtt_client,
+ &mut http_client,
+ )
+ .await?;
Ok(())
}