summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorAlex Solomes <alex.solomes@softwareag.com>2022-05-25 11:26:25 +0100
committerGitHub <noreply@github.com>2022-05-25 11:26:25 +0100
commit355d5111fc0ba139473da4643df4dd21e708674e (patch)
treeb39c9e3c577450c4c86850f452dea204c3ce0518 /plugins
parentae88bb1eb08631be648f84521d513f48b40f4631 (diff)
Porting c8y log plugin hotfixes to main (#1163)
* log plugin bugfix, tests, doc and service update - added logic to sort files by file metadata - added tests for c8y log plugin - updated documentation - updated service file to enable service if connect to c8y - log plugin handling failure case and info logs - updating request as failed if there is an issue retrieving the log - empty file error now returns an empty string
Diffstat (limited to 'plugins')
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml1
-rw-r--r--plugins/c8y_log_plugin/src/error.rs17
-rw-r--r--plugins/c8y_log_plugin/src/logfile_request.rs313
-rw-r--r--plugins/c8y_log_plugin/src/main.rs4
4 files changed, 270 insertions, 65 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 54b202c4..02d55ccd 100644
--- a/plugins/c8y_log_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -37,6 +37,7 @@ tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
assert_matches = "1.5"
+filetime = "0.2"
mockall = "0.11"
tempfile = "3.3"
test-case = "2.0"
diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs
index 045d4bf3..b120684f 100644
--- a/plugins/c8y_log_plugin/src/error.rs
+++ b/plugins/c8y_log_plugin/src/error.rs
@@ -5,4 +5,21 @@ pub enum LogRetrievalError {
#[error(transparent)]
FromConfigSetting(#[from] tedge_config::ConfigSettingError),
+
+ #[error(transparent)]
+ FromStdIo(#[from] std::io::Error),
+
+ #[error(transparent)]
+ FromGlobPatternError(#[from] glob::PatternError),
+
+ #[error(transparent)]
+ FromGlobError(#[from] glob::GlobError),
+
+ // NOTE: `MaxLines` is not a client-facing error. It is used
+ // to break out of `read_log_content`.
+ #[error("Log file has maximum number of lines.")]
+ MaxLines,
+
+ #[error("No logs available for log type: {log_type}. Hint: is your `path` key correct?")]
+ NoLogsAvailableForType { log_type: String },
}
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs
index aa889058..b538f714 100644
--- a/plugins/c8y_log_plugin/src/logfile_request.rs
+++ b/plugins/c8y_log_plugin/src/logfile_request.rs
@@ -6,8 +6,9 @@ use std::{
use easy_reader::EasyReader;
use glob::glob;
use time::OffsetDateTime;
+use tracing::info;
-use crate::config::LogPluginConfig;
+use crate::{config::LogPluginConfig, error::LogRetrievalError};
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::{
smartrest_deserializer::SmartRestLogRequest,
@@ -63,45 +64,47 @@ fn read_log_content(
mut line_counter: usize,
max_lines: usize,
filter_text: &Option<String>,
-) -> Result<(usize, String), anyhow::Error> {
+) -> Result<(usize, String), LogRetrievalError> {
if line_counter >= max_lines {
- Err(anyhow::anyhow!(
- "`max_lines` filled. No more logs to return."
- ))
+ Err(LogRetrievalError::MaxLines)
} else {
let mut file_content_as_vec = VecDeque::new();
let file = std::fs::File::open(&logfile)?;
let file_name = format!(
"filename: {}\n",
- logfile.file_name().unwrap().to_str().unwrap()
+ logfile.file_name().unwrap().to_str().unwrap() // never fails because we check file exists
);
- let mut reader = EasyReader::new(file)?;
- reader.eof();
-
- while line_counter < max_lines {
- if let Some(haystack) = reader.prev_line()? {
- if let Some(needle) = &filter_text {
- if haystack.contains(needle) {
- file_content_as_vec.push_front(format!("{}\n", haystack));
- line_counter += 1;
+ let reader = EasyReader::new(file);
+ match reader {
+ Ok(mut reader) => {
+ reader.eof();
+ while line_counter < max_lines {
+ if let Some(haystack) = reader.prev_line()? {
+ if let Some(needle) = &filter_text {
+ if haystack.contains(needle) {
+ file_content_as_vec.push_front(format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ file_content_as_vec.push_front(format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ // there are no more lines.prev_line()
+ break;
}
- } else {
- file_content_as_vec.push_front(format!("{}\n", haystack));
- line_counter += 1;
}
- } else {
- // there are no more lines.prev_line()
- break;
- }
- }
- file_content_as_vec.push_front(file_name);
+ file_content_as_vec.push_front(file_name);
- let file_content = file_content_as_vec
- .iter()
- .map(|x| x.to_string())
- .collect::<String>();
- Ok((line_counter, file_content))
+ let file_content = file_content_as_vec
+ .iter()
+ .map(|x| x.to_string())
+ .collect::<String>();
+ Ok((line_counter, file_content))
+ }
+ Err(_err) => Ok((line_counter, String::new())),
+ }
}
}
@@ -127,10 +130,12 @@ pub fn new_read_logs(
line_counter = lines;
output.push_str(&file_content);
}
- Err(_e) => {
- // TODO filter this error for `max_lines` error only
+ Err(_error @ LogRetrievalError::MaxLines) => {
break;
}
+ Err(error) => {
+ return Err(error.into());
+ }
};
}
@@ -140,7 +145,7 @@ pub fn new_read_logs(
fn filter_logs_on_type(
smartrest_obj: &SmartRestLogRequest,
plugin_config: &LogPluginConfig,
-) -> Result<Vec<PathBuf>, anyhow::Error> {
+) -> Result<Vec<PathBuf>, LogRetrievalError> {
let mut files_to_send = Vec::new();
for files in &plugin_config.files {
let maybe_file_path = files.path.as_str(); // because it can be a glob pattern
@@ -155,15 +160,34 @@ fn filter_logs_on_type(
}
}
}
- Ok(files_to_send)
+ if files_to_send.is_empty() {
+ Err(LogRetrievalError::NoLogsAvailableForType {
+ log_type: smartrest_obj.log_type.to_string(),
+ })
+ } else {
+ Ok(files_to_send)
+ }
}
/// filter a vector of pathbufs according to `smartrest_obj.date_from` and `smartrest_obj.date_to`
fn filter_logs_path_on_metadata(
smartrest_obj: &SmartRestLogRequest,
- logs_path_vec: Vec<PathBuf>,
-) -> Result<Vec<PathBuf>, anyhow::Error> {
+ mut logs_path_vec: Vec<PathBuf>,
+) -> Result<Vec<PathBuf>, LogRetrievalError> {
let mut out = vec![];
+
+ logs_path_vec.sort_by_key(|pathbuf| {
+ if let Ok(metadata) = std::fs::metadata(&pathbuf) {
+ if let Ok(file_modified_time) = metadata.modified() {
+ return OffsetDateTime::from(file_modified_time);
+ }
+ };
+ // if the file metadata can not be read, we set the file's metadata
+ // to UNIX_EPOCH (Jan 1st 1970)
+ return OffsetDateTime::UNIX_EPOCH;
+ });
+ logs_path_vec.reverse(); // to get most recent
+
for file_pathbuf in logs_path_vec {
let metadata = std::fs::metadata(&file_pathbuf)?;
let datetime_modified = OffsetDateTime::from(metadata.modified()?);
@@ -171,7 +195,14 @@ fn filter_logs_path_on_metadata(
out.push(file_pathbuf);
}
}
- Ok(out)
+
+ if out.is_empty() {
+ Err(LogRetrievalError::NoLogsAvailableForType {
+ log_type: smartrest_obj.log_type.to_string(),
+ })
+ } else {
+ Ok(out)
+ }
}
/// executes the log file request
@@ -179,7 +210,7 @@ fn filter_logs_path_on_metadata(
/// - sends request executing (mqtt)
/// - uploads log content (http)
/// - sends request successful (mqtt)
-pub async fn handle_logfile_request_operation(
+async fn execute_logfile_request_operation(
smartrest_request: &SmartRestLogRequest,
plugin_config: &LogPluginConfig,
mqtt_client: &mut Connection,
@@ -197,8 +228,32 @@ pub async fn handle_logfile_request_operation(
let successful = LogfileRequest::successful(Some(upload_event_url))?;
let () = mqtt_client.published.send(successful).await?;
+ info!("Log request processed.");
Ok(())
}
+pub async fn handle_logfile_request_operation(
+ smartrest_request: &SmartRestLogRequest,
+ plugin_config: &LogPluginConfig,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ match execute_logfile_request_operation(
+ smartrest_request,
+ plugin_config,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ {
+ Ok(()) => Ok(()),
+ Err(error) => {
+ let error_message = format!("Handling of operation failed with {}", error);
+ let failed_msg = LogfileRequest::failed(error_message)?;
+ let () = mqtt_client.published.send(failed_msg).await?;
+ Err(error)
+ }
+ }
+}
/// updates the log types on Cumulocity
/// sends 118,typeA,typeB,... on mqtt
@@ -219,14 +274,32 @@ mod tests {
path::{Path, PathBuf},
};
- use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
+ use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
+ use filetime::{set_file_mtime, FileTime};
use tempfile::TempDir;
+ use time::macros::datetime;
- use crate::config::{FileEntry, LogPluginConfig};
+ use crate::{
+ config::{FileEntry, LogPluginConfig},
+ logfile_request::new_read_logs,
+ };
use super::{filter_logs_on_type, filter_logs_path_on_metadata, read_log_content};
- fn get_filter_on_logs_type() -> Result<(TempDir, Vec<PathBuf>), anyhow::Error> {
+ /// Preparing a temp directory containing four files, with
+ /// two types { type_one, type_two }:
+ ///
+ /// file_a, type_one
+ /// file_b, type_one
+ /// file_c, type_two
+ /// file_d, type_one
+ ///
+ /// each file has the following modified "file update" timestamp:
+ /// file_a has timestamp: 1970/01/01 00:00:02
+ /// file_b has timestamp: 1970/01/01 00:00:03
+ /// file_c has timestamp: 1970/01/01 00:00:11
+ /// file_d has timestamp: (current, not modified)
+ fn prepare() -> Result<(TempDir, LogPluginConfig), anyhow::Error> {
let tempdir = TempDir::new()?;
let tempdir_path = tempdir
.path()
@@ -236,6 +309,16 @@ mod tests {
std::fs::File::create(&format!("{tempdir_path}/file_a"))?;
std::fs::File::create(&format!("{tempdir_path}/file_b"))?;
std::fs::File::create(&format!("{tempdir_path}/file_c"))?;
+ std::fs::File::create(&format!("{tempdir_path}/file_d"))?;
+
+ let new_mtime = FileTime::from_unix_time(2, 0);
+ set_file_mtime(&format!("{tempdir_path}/file_a"), new_mtime).unwrap();
+
+ let new_mtime = FileTime::from_unix_time(3, 0);
+ set_file_mtime(&format!("{tempdir_path}/file_b"), new_mtime).unwrap();
+
+ let new_mtime = FileTime::from_unix_time(11, 0);
+ set_file_mtime(&format!("{tempdir_path}/file_c"), new_mtime).unwrap();
let files = vec![
FileEntry {
@@ -250,53 +333,103 @@ mod tests {
path: format!("{tempdir_path}/file_c"),
config_type: "type_two".to_string(),
},
+ FileEntry {
+ path: format!("{tempdir_path}/file_d"),
+ config_type: "type_one".to_string(),
+ },
];
let logs_config = LogPluginConfig { files: files };
+ Ok((tempdir, logs_config))
+ }
- let smartrest_obj = SmartRestLogRequest::from_smartrest(
- "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
- )?;
-
- let after_file = filter_logs_on_type(&smartrest_obj, &logs_config)?;
- Ok((tempdir, after_file))
+ fn build_smartrest_log_request_object(
+ log_type: String,
+ needle: Option<String>,
+ lines: usize,
+ ) -> SmartRestLogRequest {
+ SmartRestLogRequest {
+ message_id: "522".to_string(),
+ device: "device".to_string(),
+ log_type: log_type,
+ date_from: datetime!(1970-01-01 00:00:03 +00:00),
+ date_to: datetime!(1970-01-01 00:00:00 +00:00), // not used
+ needle: needle,
+ lines: lines,
+ }
}
#[test]
+ /// Filter on type = "type_one".
+ /// There are four logs created in tempdir { file_a, file_b, file_c, file_d }
+ /// Of which, { file_a, file_b, file_d } are "type_one"
fn test_filter_logs_on_type() {
- let (tempdir, after_file) = get_filter_on_logs_type().unwrap();
+ let (tempdir, logs_config) = prepare().unwrap();
let tempdir_path = tempdir.path().to_str().unwrap();
+ let smartrest_obj = build_smartrest_log_request_object("type_one".to_string(), None, 1000);
+ let logs = filter_logs_on_type(&smartrest_obj, &logs_config).unwrap();
assert_eq!(
- after_file,
+ logs,
vec![
PathBuf::from(&format!("{tempdir_path}/file_a")),
- PathBuf::from(&format!("{tempdir_path}/file_b"))
+ PathBuf::from(&format!("{tempdir_path}/file_b")),
+ PathBuf::from(&format!("{tempdir_path}/file_d"))
]
)
}
#[test]
+ /// Out of logs filtered on type = "type_one", that is: { file_a, file_b, file_d }.
+ /// Only logs filtered on metadata remain, that is { file_b, file_d }.
+ ///
+ /// This is because:
+ ///
+ /// file_a has timestamp: 1970/01/01 00:00:02
+ /// file_b has timestamp: 1970/01/01 00:00:03
+ /// file_d has timestamp: (current, not modified)
+ ///
+ /// The order of the output is { file_d, file_b }, because files are sorted from
+ /// most recent to oldest
fn test_filter_logs_path_on_metadata() {
- let smartrest_obj = SmartRestLogRequest::from_smartrest(
- "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+ let (tempdir, logs_config) = prepare().unwrap();
+ let smartrest_obj = build_smartrest_log_request_object("type_one".to_string(), None, 1000);
+ let logs = filter_logs_on_type(&smartrest_obj, &logs_config).unwrap();
+ let logs = filter_logs_path_on_metadata(&smartrest_obj, logs).unwrap();
+
+ assert_eq!(
+ logs,
+ vec![
+ PathBuf::from(format!("{}/file_d", tempdir.path().to_str().unwrap())),
+ PathBuf::from(format!("{}/file_b", tempdir.path().to_str().unwrap())),
+ ]
)
- .unwrap();
- let (_tempdir, logs) = get_filter_on_logs_type().unwrap();
- filter_logs_path_on_metadata(&smartrest_obj, logs).unwrap();
}
#[test]
+ /// Inserting 5 log lines in { file_a }:
+ /// [
+ /// this is the first line.
+ /// this is the second line.
+ /// this is the third line.
+ /// this is the fourth line.
+ /// this is the fifth line.
+ /// ]
+ ///
+ /// Requesting back only 4. Note that because we read the logs in reverse order, the first line
+ /// should be ommited. The result sould be:
+ /// [
+ /// this is the second line.
+ /// this is the third line.
+ /// this is the fourth line.
+ /// this is the fifth line.
+ /// ]
+ ///
fn test_read_log_content() {
- let tempdir = TempDir::new().unwrap();
- let tempdir_path = tempdir
- .path()
- .to_str()
- .ok_or_else(|| anyhow::anyhow!("temp dir not created"))
- .unwrap();
- let file_path = &format!("{tempdir_path}/file_a.log");
-
+ let (tempdir, _logs_config) = prepare().unwrap();
+ let path = tempdir.path().to_str().unwrap();
+ let file_path = &format!("{path}/file_a");
let mut log_file = std::fs::OpenOptions::new()
.append(true)
- .create(true)
+ .create(false)
.write(true)
.open(file_path)
.unwrap();
@@ -313,6 +446,58 @@ mod tests {
read_log_content(Path::new(file_path), line_counter, max_lines, &filter_text).unwrap();
assert_eq!(line_counter, max_lines);
- assert_eq!(result, "filename: file_a.log\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.\n");
+ assert_eq!(result, "filename: file_a\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.\n");
+ }
+
+ #[test]
+ /// Inserting 5 lines of logs for each log file { file_a, ..., file_d }.
+ /// Each line contains the text: "this is the { line_number } line of { file_name }
+ /// where line_number { first, second, third, forth, fifth }
+ /// where file_name { file_a, ..., file_d }
+ ///
+ /// Requesting logs for log_type = "type_one", that are older than:
+ /// timestamp: 1970/01/01 00:00:03
+ ///
+ /// These are:
+ /// file_b and file_d
+ ///
+ /// file_d is the newest file, so its logs are read first. then file_b.
+ ///
+ /// Because only 7 lines are requested (and each file has 5 lines), the expedted
+ /// result is:
+ ///
+ /// - all logs from file_d (5)
+ /// - last two logs from file_b (2)
+ fn test_read_log_content_multiple_files() {
+ let (tempdir, logs_config) = prepare().unwrap();
+ let tempdir_path = tempdir.path().to_str().unwrap();
+
+ for (file_name, m_time) in [
+ ("file_a", 2),
+ ("file_b", 3),
+ ("file_c", 11),
+ ("file_d", 100),
+ ] {
+ let file_path = &format!("{tempdir_path}/{file_name}");
+
+ let mut log_file = std::fs::OpenOptions::new()
+ .append(true)
+ .create(false)
+ .write(true)
+ .open(file_path)
+ .unwrap();
+
+ let data = &format!("this is the first line of {file_name}.\nthis is the second line of {file_name}.\nthis is the third line of {file_name}.\nthis is the forth line of {file_name}.\nthis is the fifth line of {file_name}.");
+
+ let () = log_file.write_all(data.as_bytes()).unwrap();
+
+ let new_mtime = FileTime::from_unix_time(m_time, 0);
+ set_file_mtime(file_path, new_mtime).unwrap();
+ }
+
+ let smartrest_obj = build_smartrest_log_request_object("type_one".to_string(), None, 7);
+
+ let result = new_read_logs(&smartrest_obj, &logs_config).unwrap();
+ assert_eq!(result, String::from("filename: file_d\nthis is the first line of file_d.\nthis is the second line of file_d.\nthis is the third line of file_d.\nthis is the forth line of file_d.\nthis is the fifth line of file_d.\nfilename: file_b\nthis is the forth line of file_b.\nthis is the fifth line of file_b.\n"))
}
}
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index 90b16946..ce133bef 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -106,6 +106,7 @@ async fn run(
if let Ok(payload) = message.payload_str() {
let result = match payload.split(',').next().unwrap_or_default() {
"522" => {
+ info!("Log request received: {payload}");
// retrieve smartrest object from payload
let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
handle_logfile_request_operation(
@@ -123,7 +124,8 @@ async fn run(
};
if let Err(err) = result {
- error!("Handling of operation: '{}' failed with {}", payload, err);
+ let error_message = format!("Handling of operation: '{}' failed with {}", payload, err);
+ error!("{}", error_message);
}
}
}