diff options
author | initard <solo@softwareag.com> | 2022-05-20 15:51:19 +0100 |
---|---|---|
committer | initard <solo@softwareag.com> | 2022-05-20 16:09:23 +0100 |
commit | c325c533aa6c05f73b26f8d8569b199b81b84edf (patch) | |
tree | 083137ff8fc03e6387e1a6914b753b984bf36ebb /plugins | |
parent | bcf887cb468c04271bf38e06a904da5e615f382e (diff) |
logs are read in reverse and use modified date
- logs are now read in reverse order into a VecDeque
- log files now use metadata for modified time
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 2 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/logfile_request.rs | 287 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 6 |
3 files changed, 219 insertions, 76 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index d8158abe..ab98265d 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -21,6 +21,7 @@ c8y_api = { path = "../../crates/core/c8y_api" } c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } clap = { version = "3.0", features = ["cargo", "derive"] } csv = "1.1" +easy_reader = "0.5" glob = "0.3" inotify = "0.10" mqtt_channel = { path = "../../crates/common/mqtt_channel" } @@ -28,6 +29,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +time = { version = "0.3" } thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs index 325fb7a5..aa889058 100644 --- a/plugins/c8y_log_plugin/src/logfile_request.rs +++ b/plugins/c8y_log_plugin/src/logfile_request.rs @@ -1,11 +1,16 @@ -use std::path::Path; +use std::{ + collections::VecDeque, + path::{Path, PathBuf}, +}; +use easy_reader::EasyReader; use glob::glob; +use time::OffsetDateTime; use crate::config::LogPluginConfig; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::{ - smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}, + smartrest_deserializer::SmartRestLogRequest, smartrest_serializer::{ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, @@ -52,29 +57,90 @@ impl TryIntoOperationStatusMessage for LogfileRequest { .to_smartrest() } } -/// Reads tedge logs according to `SmartRestLogRequest`. -/// -/// If needed, logs are concatenated. -/// -/// Logs are sorted alphanumerically from oldest to newest. -/// -/// # Examples -/// -/// ``` -/// let smartrest_obj = SmartRestLogRequest::from_smartrest( -/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", -/// ) -/// .unwrap(); -/// -/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap(); -/// ``` -pub fn read_tedge_logs( + +fn read_log_content( + logfile: &Path, + mut line_counter: usize, + max_lines: usize, + filter_text: &Option<String>, +) -> Result<(usize, String), anyhow::Error> { + if line_counter >= max_lines { + Err(anyhow::anyhow!( + "`max_lines` filled. No more logs to return." + )) + } else { + let mut file_content_as_vec = VecDeque::new(); + let file = std::fs::File::open(&logfile)?; + let file_name = format!( + "filename: {}\n", + logfile.file_name().unwrap().to_str().unwrap() + ); + let mut reader = EasyReader::new(file)?; + reader.eof(); + + while line_counter < max_lines { + if let Some(haystack) = reader.prev_line()? { + if let Some(needle) = &filter_text { + if haystack.contains(needle) { + file_content_as_vec.push_front(format!("{}\n", haystack)); + line_counter += 1; + } + } else { + file_content_as_vec.push_front(format!("{}\n", haystack)); + line_counter += 1; + } + } else { + // there are no more lines.prev_line() + break; + } + } + + file_content_as_vec.push_front(file_name); + + let file_content = file_content_as_vec + .iter() + .map(|x| x.to_string()) + .collect::<String>(); + Ok((line_counter, file_content)) + } +} + +/// read any log file comming from `smartrest_obj.log_type` +pub fn new_read_logs( smartrest_obj: &SmartRestLogRequest, - plugin_config_path: &Path, + plugin_config: &LogPluginConfig, ) -> Result<String, anyhow::Error> { - let plugin_config = LogPluginConfig::new(&plugin_config_path); let mut output = String::new(); + // first filter logs on type + let mut logfiles_to_read = filter_logs_on_type(&smartrest_obj, &plugin_config)?; + logfiles_to_read = filter_logs_path_on_metadata(&smartrest_obj, logfiles_to_read)?; + + let mut line_counter = 0usize; + for logfile in logfiles_to_read { + match read_log_content( + logfile.as_path(), + line_counter, + smartrest_obj.lines, + &smartrest_obj.needle, + ) { + Ok((lines, file_content)) => { + line_counter = lines; + output.push_str(&file_content); + } + Err(_e) => { + // TODO filter this error for `max_lines` error only + break; + } + }; + } + + Ok(output) +} +fn filter_logs_on_type( + smartrest_obj: &SmartRestLogRequest, + plugin_config: &LogPluginConfig, +) -> Result<Vec<PathBuf>, anyhow::Error> { let mut files_to_send = Vec::new(); for files in &plugin_config.files { let maybe_file_path = files.path.as_str(); // because it can be a glob pattern @@ -82,60 +148,30 @@ pub fn read_tedge_logs( if !file_type.eq(&smartrest_obj.log_type) { continue; - } - - // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see: - // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24 - for entry in glob(maybe_file_path)? { - let file_path = entry?; - if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) { - if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to) - { - files_to_send.push(file_path); - } - } else { - files_to_send.push(file_path); + } else { + for entry in glob(maybe_file_path)? { + let file_path = entry?; + files_to_send.push(file_path) } } } + Ok(files_to_send) +} - // loop sorted vector and push store log file to `output` - let mut line_counter: usize = 0; - for entry in files_to_send { - dbg!("files to read:", &entry); - let file_content = std::fs::read_to_string(&entry)?; - if file_content.is_empty() { - continue; - } - - // adding file header only if line_counter permits more lines to be added - match &entry.file_stem().and_then(|f| f.to_str()) { - Some(file_name) if line_counter < smartrest_obj.lines => { - output.push_str(&format!("filename: {}\n", file_name)); - } - _ => {} - } - - // split at new line delimiter ("\n") - let mut lines = file_content.lines().rev(); - while line_counter < smartrest_obj.lines { - if let Some(haystack) = lines.next() { - if let Some(needle) = &smartrest_obj.needle { - if haystack.contains(needle) { - output.push_str(&format!("{}\n", haystack)); - line_counter += 1; - } - } else { - output.push_str(&format!("{}\n", haystack)); - line_counter += 1; - } - } else { - // there are no lines.next() - break; - } +/// filter a vector of pathbufs according to `smartrest_obj.date_from` and `smartrest_obj.date_to` +fn filter_logs_path_on_metadata( + smartrest_obj: &SmartRestLogRequest, + logs_path_vec: Vec<PathBuf>, +) -> Result<Vec<PathBuf>, anyhow::Error> { + let mut out = vec![]; + for file_pathbuf in logs_path_vec { + let metadata = std::fs::metadata(&file_pathbuf)?; + let datetime_modified = OffsetDateTime::from(metadata.modified()?); + if datetime_modified >= smartrest_obj.date_from { + out.push(file_pathbuf); } } - Ok(output) + Ok(out) } /// executes the log file request @@ -145,14 +181,14 @@ pub fn read_tedge_logs( /// - sends request successful (mqtt) pub async fn handle_logfile_request_operation( smartrest_request: &SmartRestLogRequest, - plugin_config_path: &Path, + plugin_config: &LogPluginConfig, mqtt_client: &mut Connection, http_client: &mut JwtAuthHttpProxy, ) -> Result<(), anyhow::Error> { let executing = LogfileRequest::executing()?; let () = mqtt_client.published.send(executing).await?; - let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?; + let log_content = new_read_logs(&smartrest_request, &plugin_config)?; let upload_event_url = http_client .upload_log_binary(&smartrest_request.log_type, &log_content) @@ -169,9 +205,114 @@ pub async fn handle_logfile_request_operation( pub async fn handle_dynamic_log_type_update( mqtt_client: &mut Connection, config_dir: &Path, -) -> Result<(), anyhow::Error> { +) -> Result<LogPluginConfig, anyhow::Error> { let plugin_config = LogPluginConfig::new(config_dir); let msg = plugin_config.to_supported_config_types_message()?; let () = mqtt_client.published.send(msg).await?; - Ok(()) + Ok(plugin_config) +} + +#[cfg(test)] +mod tests { + use std::{ + io::Write, + path::{Path, PathBuf}, + }; + + use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; + use tempfile::TempDir; + + use crate::config::{FileEntry, LogPluginConfig}; + + use super::{filter_logs_on_type, filter_logs_path_on_metadata, read_log_content}; + + fn get_filter_on_logs_type() -> Result<(TempDir, Vec<PathBuf>), anyhow::Error> { + let tempdir = TempDir::new()?; + let tempdir_path = tempdir + .path() + .to_str() + .ok_or_else(|| anyhow::anyhow!("temp dir not created"))?; + + std::fs::File::create(&format!("{tempdir_path}/file_a"))?; + std::fs::File::create(&format!("{tempdir_path}/file_b"))?; + std::fs::File::create(&format!("{tempdir_path}/file_c"))?; + + let files = vec![ + FileEntry { + path: format!("{tempdir_path}/file_a"), + config_type: "type_one".to_string(), + }, + FileEntry { + path: format!("{tempdir_path}/file_b"), + config_type: "type_one".to_string(), + }, + FileEntry { + path: format!("{tempdir_path}/file_c"), + config_type: "type_two".to_string(), + }, + ]; + let logs_config = LogPluginConfig { files: files }; + + let smartrest_obj = SmartRestLogRequest::from_smartrest( + "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", + )?; + + let after_file = filter_logs_on_type(&smartrest_obj, &logs_config)?; + Ok((tempdir, after_file)) + } + + #[test] + fn test_filter_logs_on_type() { + let (tempdir, after_file) = get_filter_on_logs_type().unwrap(); + let tempdir_path = tempdir.path().to_str().unwrap(); + assert_eq!( + after_file, + vec![ + PathBuf::from(&format!("{tempdir_path}/file_a")), + PathBuf::from(&format!("{tempdir_path}/file_b")) + ] + ) + } + + #[test] + fn test_filter_logs_path_on_metadata() { + let smartrest_obj = SmartRestLogRequest::from_smartrest( + "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", + ) + .unwrap(); + let (_tempdir, logs) = get_filter_on_logs_type().unwrap(); + filter_logs_path_on_metadata(&smartrest_obj, logs).unwrap(); + } + + #[test] + fn test_read_log_content() { + let tempdir = TempDir::new().unwrap(); + let tempdir_path = tempdir + .path() + .to_str() + .ok_or_else(|| anyhow::anyhow!("temp dir not created")) + .unwrap(); + let file_path = &format!("{tempdir_path}/file_a.log"); + + let mut log_file = std::fs::OpenOptions::new() + .append(true) + .create(true) + .write(true) + .open(file_path) + .unwrap(); + + let data = "this is the first line.\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line."; + + let () = log_file.write_all(data.as_bytes()).unwrap(); + + let line_counter = 0; + let max_lines = 4; + let filter_text = None; + + let (line_counter, result) = + read_log_content(Path::new(file_path), line_counter, max_lines, &filter_text).unwrap(); + + assert_eq!(line_counter, max_lines); + assert_eq!(result, "filename: file_a.log\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.\n"); + } } diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index e6c05693..90b16946 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -95,7 +95,7 @@ async fn run( mqtt_client: &mut Connection, http_client: &mut JwtAuthHttpProxy, ) -> Result<(), anyhow::Error> { - let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + let mut plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?; let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; @@ -110,7 +110,7 @@ async fn run( let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; handle_logfile_request_operation( &smartrest_obj, - &config_file, + &plugin_config, mqtt_client, http_client, ) @@ -136,7 +136,7 @@ async fn run( if let Ok(event) = event_or_error { match event.mask { EventMask::CLOSE_WRITE => { - let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?; } _ => {} } |