diff options
author | Alex Solomes <alex.solomes@softwareag.com> | 2022-05-25 11:26:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-25 11:26:25 +0100 |
commit | 355d5111fc0ba139473da4643df4dd21e708674e (patch) | |
tree | b39c9e3c577450c4c86850f452dea204c3ce0518 /plugins | |
parent | ae88bb1eb08631be648f84521d513f48b40f4631 (diff) |
Porting c8y log plugin hotfixes to main (#1163)
* log plugin bugfix, tests, doc and service update
- added logic to sort files by file metadata
- added tests for c8y log plugin
- updated documentation
- updated service file to enable service if connect to c8y
- log plugin handling failure case and info logs
- updating request as failed if there is an issue retrieving the log
- empty file error now returns an empty string
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/error.rs | 17 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/logfile_request.rs | 313 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 4 |
4 files changed, 270 insertions, 65 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 54b202c4..02d55ccd 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -37,6 +37,7 @@ tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] assert_matches = "1.5" +filetime = "0.2" mockall = "0.11" tempfile = "3.3" test-case = "2.0" diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs index 045d4bf3..b120684f 100644 --- a/plugins/c8y_log_plugin/src/error.rs +++ b/plugins/c8y_log_plugin/src/error.rs @@ -5,4 +5,21 @@ pub enum LogRetrievalError { #[error(transparent)] FromConfigSetting(#[from] tedge_config::ConfigSettingError), + + #[error(transparent)] + FromStdIo(#[from] std::io::Error), + + #[error(transparent)] + FromGlobPatternError(#[from] glob::PatternError), + + #[error(transparent)] + FromGlobError(#[from] glob::GlobError), + + // NOTE: `MaxLines` is not a client-facing error. It is used + // to break out of `read_log_content`. + #[error("Log file has maximum number of lines.")] + MaxLines, + + #[error("No logs available for log type: {log_type}. Hint: is your `path` key correct?")] + NoLogsAvailableForType { log_type: String }, } diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs index aa889058..b538f714 100644 --- a/plugins/c8y_log_plugin/src/logfile_request.rs +++ b/plugins/c8y_log_plugin/src/logfile_request.rs @@ -6,8 +6,9 @@ use std::{ use easy_reader::EasyReader; use glob::glob; use time::OffsetDateTime; +use tracing::info; -use crate::config::LogPluginConfig; +use crate::{config::LogPluginConfig, error::LogRetrievalError}; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::{ smartrest_deserializer::SmartRestLogRequest, @@ -63,45 +64,47 @@ fn read_log_content( mut line_counter: usize, max_lines: usize, filter_text: &Option<String>, -) -> Result<(usize, String), anyhow::Error> { +) -> Result<(usize, String), LogRetrievalError> { if line_counter >= max_lines { - Err(anyhow::anyhow!( - "`max_lines` filled. No more logs to return." - )) + Err(LogRetrievalError::MaxLines) } else { let mut file_content_as_vec = VecDeque::new(); let file = std::fs::File::open(&logfile)?; let file_name = format!( "filename: {}\n", - logfile.file_name().unwrap().to_str().unwrap() + logfile.file_name().unwrap().to_str().unwrap() // never fails because we check file exists ); - let mut reader = EasyReader::new(file)?; - reader.eof(); - - while line_counter < max_lines { - if let Some(haystack) = reader.prev_line()? { - if let Some(needle) = &filter_text { - if haystack.contains(needle) { - file_content_as_vec.push_front(format!("{}\n", haystack)); - line_counter += 1; + let reader = EasyReader::new(file); + match reader { + Ok(mut reader) => { + reader.eof(); + while line_counter < max_lines { + if let Some(haystack) = reader.prev_line()? { + if let Some(needle) = &filter_text { + if haystack.contains(needle) { + file_content_as_vec.push_front(format!("{}\n", haystack)); + line_counter += 1; + } + } else { + file_content_as_vec.push_front(format!("{}\n", haystack)); + line_counter += 1; + } + } else { + // there are no more lines.prev_line() + break; } - } else { - file_content_as_vec.push_front(format!("{}\n", haystack)); - line_counter += 1; } - } else { - // there are no more lines.prev_line() - break; - } - } - file_content_as_vec.push_front(file_name); + file_content_as_vec.push_front(file_name); - let file_content = file_content_as_vec - .iter() - .map(|x| x.to_string()) - .collect::<String>(); - Ok((line_counter, file_content)) + let file_content = file_content_as_vec + .iter() + .map(|x| x.to_string()) + .collect::<String>(); + Ok((line_counter, file_content)) + } + Err(_err) => Ok((line_counter, String::new())), + } } } @@ -127,10 +130,12 @@ pub fn new_read_logs( line_counter = lines; output.push_str(&file_content); } - Err(_e) => { - // TODO filter this error for `max_lines` error only + Err(_error @ LogRetrievalError::MaxLines) => { break; } + Err(error) => { + return Err(error.into()); + } }; } @@ -140,7 +145,7 @@ pub fn new_read_logs( fn filter_logs_on_type( smartrest_obj: &SmartRestLogRequest, plugin_config: &LogPluginConfig, -) -> Result<Vec<PathBuf>, anyhow::Error> { +) -> Result<Vec<PathBuf>, LogRetrievalError> { let mut files_to_send = Vec::new(); for files in &plugin_config.files { let maybe_file_path = files.path.as_str(); // because it can be a glob pattern @@ -155,15 +160,34 @@ fn filter_logs_on_type( } } } - Ok(files_to_send) + if files_to_send.is_empty() { + Err(LogRetrievalError::NoLogsAvailableForType { + log_type: smartrest_obj.log_type.to_string(), + }) + } else { + Ok(files_to_send) + } } /// filter a vector of pathbufs according to `smartrest_obj.date_from` and `smartrest_obj.date_to` fn filter_logs_path_on_metadata( smartrest_obj: &SmartRestLogRequest, - logs_path_vec: Vec<PathBuf>, -) -> Result<Vec<PathBuf>, anyhow::Error> { + mut logs_path_vec: Vec<PathBuf>, +) -> Result<Vec<PathBuf>, LogRetrievalError> { let mut out = vec![]; + + logs_path_vec.sort_by_key(|pathbuf| { + if let Ok(metadata) = std::fs::metadata(&pathbuf) { + if let Ok(file_modified_time) = metadata.modified() { + return OffsetDateTime::from(file_modified_time); + } + }; + // if the file metadata can not be read, we set the file's metadata + // to UNIX_EPOCH (Jan 1st 1970) + return OffsetDateTime::UNIX_EPOCH; + }); + logs_path_vec.reverse(); // to get most recent + for file_pathbuf in logs_path_vec { let metadata = std::fs::metadata(&file_pathbuf)?; let datetime_modified = OffsetDateTime::from(metadata.modified()?); @@ -171,7 +195,14 @@ fn filter_logs_path_on_metadata( out.push(file_pathbuf); } } - Ok(out) + + if out.is_empty() { + Err(LogRetrievalError::NoLogsAvailableForType { + log_type: smartrest_obj.log_type.to_string(), + }) + } else { + Ok(out) + } } /// executes the log file request @@ -179,7 +210,7 @@ fn filter_logs_path_on_metadata( /// - sends request executing (mqtt) /// - uploads log content (http) /// - sends request successful (mqtt) -pub async fn handle_logfile_request_operation( +async fn execute_logfile_request_operation( smartrest_request: &SmartRestLogRequest, plugin_config: &LogPluginConfig, mqtt_client: &mut Connection, @@ -197,8 +228,32 @@ pub async fn handle_logfile_request_operation( let successful = LogfileRequest::successful(Some(upload_event_url))?; let () = mqtt_client.published.send(successful).await?; + info!("Log request processed."); Ok(()) } +pub async fn handle_logfile_request_operation( + smartrest_request: &SmartRestLogRequest, + plugin_config: &LogPluginConfig, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + match execute_logfile_request_operation( + smartrest_request, + plugin_config, + mqtt_client, + http_client, + ) + .await + { + Ok(()) => Ok(()), + Err(error) => { + let error_message = format!("Handling of operation failed with {}", error); + let failed_msg = LogfileRequest::failed(error_message)?; + let () = mqtt_client.published.send(failed_msg).await?; + Err(error) + } + } +} /// updates the log types on Cumulocity /// sends 118,typeA,typeB,... on mqtt @@ -219,14 +274,32 @@ mod tests { path::{Path, PathBuf}, }; - use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; + use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest; + use filetime::{set_file_mtime, FileTime}; use tempfile::TempDir; + use time::macros::datetime; - use crate::config::{FileEntry, LogPluginConfig}; + use crate::{ + config::{FileEntry, LogPluginConfig}, + logfile_request::new_read_logs, + }; use super::{filter_logs_on_type, filter_logs_path_on_metadata, read_log_content}; - fn get_filter_on_logs_type() -> Result<(TempDir, Vec<PathBuf>), anyhow::Error> { + /// Preparing a temp directory containing four files, with + /// two types { type_one, type_two }: + /// + /// file_a, type_one + /// file_b, type_one + /// file_c, type_two + /// file_d, type_one + /// + /// each file has the following modified "file update" timestamp: + /// file_a has timestamp: 1970/01/01 00:00:02 + /// file_b has timestamp: 1970/01/01 00:00:03 + /// file_c has timestamp: 1970/01/01 00:00:11 + /// file_d has timestamp: (current, not modified) + fn prepare() -> Result<(TempDir, LogPluginConfig), anyhow::Error> { let tempdir = TempDir::new()?; let tempdir_path = tempdir .path() @@ -236,6 +309,16 @@ mod tests { std::fs::File::create(&format!("{tempdir_path}/file_a"))?; std::fs::File::create(&format!("{tempdir_path}/file_b"))?; std::fs::File::create(&format!("{tempdir_path}/file_c"))?; + std::fs::File::create(&format!("{tempdir_path}/file_d"))?; + + let new_mtime = FileTime::from_unix_time(2, 0); + set_file_mtime(&format!("{tempdir_path}/file_a"), new_mtime).unwrap(); + + let new_mtime = FileTime::from_unix_time(3, 0); + set_file_mtime(&format!("{tempdir_path}/file_b"), new_mtime).unwrap(); + + let new_mtime = FileTime::from_unix_time(11, 0); + set_file_mtime(&format!("{tempdir_path}/file_c"), new_mtime).unwrap(); let files = vec![ FileEntry { @@ -250,53 +333,103 @@ mod tests { path: format!("{tempdir_path}/file_c"), config_type: "type_two".to_string(), }, + FileEntry { + path: format!("{tempdir_path}/file_d"), + config_type: "type_one".to_string(), + }, ]; let logs_config = LogPluginConfig { files: files }; + Ok((tempdir, logs_config)) + } - let smartrest_obj = SmartRestLogRequest::from_smartrest( - "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", - )?; - - let after_file = filter_logs_on_type(&smartrest_obj, &logs_config)?; - Ok((tempdir, after_file)) + fn build_smartrest_log_request_object( + log_type: String, + needle: Option<String>, + lines: usize, + ) -> SmartRestLogRequest { + SmartRestLogRequest { + message_id: "522".to_string(), + device: "device".to_string(), + log_type: log_type, + date_from: datetime!(1970-01-01 00:00:03 +00:00), + date_to: datetime!(1970-01-01 00:00:00 +00:00), // not used + needle: needle, + lines: lines, + } } #[test] + /// Filter on type = "type_one". + /// There are four logs created in tempdir { file_a, file_b, file_c, file_d } + /// Of which, { file_a, file_b, file_d } are "type_one" fn test_filter_logs_on_type() { - let (tempdir, after_file) = get_filter_on_logs_type().unwrap(); + let (tempdir, logs_config) = prepare().unwrap(); let tempdir_path = tempdir.path().to_str().unwrap(); + let smartrest_obj = build_smartrest_log_request_object("type_one".to_string(), None, 1000); + let logs = filter_logs_on_type(&smartrest_obj, &logs_config).unwrap(); assert_eq!( - after_file, + logs, vec![ PathBuf::from(&format!("{tempdir_path}/file_a")), - PathBuf::from(&format!("{tempdir_path}/file_b")) + PathBuf::from(&format!("{tempdir_path}/file_b")), + PathBuf::from(&format!("{tempdir_path}/file_d")) ] ) } #[test] + /// Out of logs filtered on type = "type_one", that is: { file_a, file_b, file_d }. + /// Only logs filtered on metadata remain, that is { file_b, file_d }. + /// + /// This is because: + /// + /// file_a has timestamp: 1970/01/01 00:00:02 + /// file_b has timestamp: 1970/01/01 00:00:03 + /// file_d has timestamp: (current, not modified) + /// + /// The order of the output is { file_d, file_b }, because files are sorted from + /// most recent to oldest fn test_filter_logs_path_on_metadata() { - let smartrest_obj = SmartRestLogRequest::from_smartrest( - "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", + let (tempdir, logs_config) = prepare().unwrap(); + let smartrest_obj = build_smartrest_log_request_object("type_one".to_string(), None, 1000); + let logs = filter_logs_on_type(&smartrest_obj, &logs_config).unwrap(); + let logs = filter_logs_path_on_metadata(&smartrest_obj, logs).unwrap(); + + assert_eq!( + logs, + vec![ + PathBuf::from(format!("{}/file_d", tempdir.path().to_str().unwrap())), + PathBuf::from(format!("{}/file_b", tempdir.path().to_str().unwrap())), + ] ) - .unwrap(); - let (_tempdir, logs) = get_filter_on_logs_type().unwrap(); - filter_logs_path_on_metadata(&smartrest_obj, logs).unwrap(); } #[test] + /// Inserting 5 log lines in { file_a }: + /// [ + /// this is the first line. + /// this is the second line. + /// this is the third line. + /// this is the fourth line. + /// this is the fifth line. + /// ] + /// + /// Requesting back only 4. Note that because we read the logs in reverse order, the first line + /// should be ommited. The result sould be: + /// [ + /// this is the second line. + /// this is the third line. + /// this is the fourth line. + /// this is the fifth line. + /// ] + /// fn test_read_log_content() { - let tempdir = TempDir::new().unwrap(); - let tempdir_path = tempdir - .path() - .to_str() - .ok_or_else(|| anyhow::anyhow!("temp dir not created")) - .unwrap(); - let file_path = &format!("{tempdir_path}/file_a.log"); - + let (tempdir, _logs_config) = prepare().unwrap(); + let path = tempdir.path().to_str().unwrap(); + let file_path = &format!("{path}/file_a"); let mut log_file = std::fs::OpenOptions::new() .append(true) - .create(true) + .create(false) .write(true) .open(file_path) .unwrap(); @@ -313,6 +446,58 @@ mod tests { read_log_content(Path::new(file_path), line_counter, max_lines, &filter_text).unwrap(); assert_eq!(line_counter, max_lines); - assert_eq!(result, "filename: file_a.log\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.\n"); + assert_eq!(result, "filename: file_a\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.\n"); + } + + #[test] + /// Inserting 5 lines of logs for each log file { file_a, ..., file_d }. + /// Each line contains the text: "this is the { line_number } line of { file_name } + /// where line_number { first, second, third, forth, fifth } + /// where file_name { file_a, ..., file_d } + /// + /// Requesting logs for log_type = "type_one", that are older than: + /// timestamp: 1970/01/01 00:00:03 + /// + /// These are: + /// file_b and file_d + /// + /// file_d is the newest file, so its logs are read first. then file_b. + /// + /// Because only 7 lines are requested (and each file has 5 lines), the expedted + /// result is: + /// + /// - all logs from file_d (5) + /// - last two logs from file_b (2) + fn test_read_log_content_multiple_files() { + let (tempdir, logs_config) = prepare().unwrap(); + let tempdir_path = tempdir.path().to_str().unwrap(); + + for (file_name, m_time) in [ + ("file_a", 2), + ("file_b", 3), + ("file_c", 11), + ("file_d", 100), + ] { + let file_path = &format!("{tempdir_path}/{file_name}"); + + let mut log_file = std::fs::OpenOptions::new() + .append(true) + .create(false) + .write(true) + .open(file_path) + .unwrap(); + + let data = &format!("this is the first line of {file_name}.\nthis is the second line of {file_name}.\nthis is the third line of {file_name}.\nthis is the forth line of {file_name}.\nthis is the fifth line of {file_name}."); + + let () = log_file.write_all(data.as_bytes()).unwrap(); + + let new_mtime = FileTime::from_unix_time(m_time, 0); + set_file_mtime(file_path, new_mtime).unwrap(); + } + + let smartrest_obj = build_smartrest_log_request_object("type_one".to_string(), None, 7); + + let result = new_read_logs(&smartrest_obj, &logs_config).unwrap(); + assert_eq!(result, String::from("filename: file_d\nthis is the first line of file_d.\nthis is the second line of file_d.\nthis is the third line of file_d.\nthis is the forth line of file_d.\nthis is the fifth line of file_d.\nfilename: file_b\nthis is the forth line of file_b.\nthis is the fifth line of file_b.\n")) } } diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index 90b16946..ce133bef 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -106,6 +106,7 @@ async fn run( if let Ok(payload) = message.payload_str() { let result = match payload.split(',').next().unwrap_or_default() { "522" => { + info!("Log request received: {payload}"); // retrieve smartrest object from payload let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; handle_logfile_request_operation( @@ -123,7 +124,8 @@ async fn run( }; if let Err(err) = result { - error!("Handling of operation: '{}' failed with {}", payload, err); + let error_message = format!("Handling of operation: '{}' failed with {}", payload, err); + error!("{}", error_message); } } } |