summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-05-20 15:51:19 +0100
committerinitard <solo@softwareag.com>2022-05-20 16:09:23 +0100
commitc325c533aa6c05f73b26f8d8569b199b81b84edf (patch)
tree083137ff8fc03e6387e1a6914b753b984bf36ebb /plugins
parentbcf887cb468c04271bf38e06a904da5e615f382e (diff)
logs are read in reverse and use modified date
- logs are now read in reverse order into a VecDeque - log files now use metadata for modified time Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml2
-rw-r--r--plugins/c8y_log_plugin/src/logfile_request.rs287
-rw-r--r--plugins/c8y_log_plugin/src/main.rs6
3 files changed, 219 insertions, 76 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index d8158abe..ab98265d 100644
--- a/plugins/c8y_log_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -21,6 +21,7 @@ c8y_api = { path = "../../crates/core/c8y_api" }
c8y_smartrest = { path = "../../crates/core/c8y_smartrest" }
clap = { version = "3.0", features = ["cargo", "derive"] }
csv = "1.1"
+easy_reader = "0.5"
glob = "0.3"
inotify = "0.10"
mqtt_channel = { path = "../../crates/common/mqtt_channel" }
@@ -28,6 +29,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+time = { version = "0.3" }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs
index 325fb7a5..aa889058 100644
--- a/plugins/c8y_log_plugin/src/logfile_request.rs
+++ b/plugins/c8y_log_plugin/src/logfile_request.rs
@@ -1,11 +1,16 @@
-use std::path::Path;
+use std::{
+ collections::VecDeque,
+ path::{Path, PathBuf},
+};
+use easy_reader::EasyReader;
use glob::glob;
+use time::OffsetDateTime;
use crate::config::LogPluginConfig;
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::{
- smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest},
+ smartrest_deserializer::SmartRestLogRequest,
smartrest_serializer::{
CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
@@ -52,29 +57,90 @@ impl TryIntoOperationStatusMessage for LogfileRequest {
.to_smartrest()
}
}
-/// Reads tedge logs according to `SmartRestLogRequest`.
-///
-/// If needed, logs are concatenated.
-///
-/// Logs are sorted alphanumerically from oldest to newest.
-///
-/// # Examples
-///
-/// ```
-/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
-/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
-/// )
-/// .unwrap();
-///
-/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
-/// ```
-pub fn read_tedge_logs(
+
+fn read_log_content(
+ logfile: &Path,
+ mut line_counter: usize,
+ max_lines: usize,
+ filter_text: &Option<String>,
+) -> Result<(usize, String), anyhow::Error> {
+ if line_counter >= max_lines {
+ Err(anyhow::anyhow!(
+ "`max_lines` filled. No more logs to return."
+ ))
+ } else {
+ let mut file_content_as_vec = VecDeque::new();
+ let file = std::fs::File::open(&logfile)?;
+ let file_name = format!(
+ "filename: {}\n",
+ logfile.file_name().unwrap().to_str().unwrap()
+ );
+ let mut reader = EasyReader::new(file)?;
+ reader.eof();
+
+ while line_counter < max_lines {
+ if let Some(haystack) = reader.prev_line()? {
+ if let Some(needle) = &filter_text {
+ if haystack.contains(needle) {
+ file_content_as_vec.push_front(format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ file_content_as_vec.push_front(format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ // there are no more lines.prev_line()
+ break;
+ }
+ }
+
+ file_content_as_vec.push_front(file_name);
+
+ let file_content = file_content_as_vec
+ .iter()
+ .map(|x| x.to_string())
+ .collect::<String>();
+ Ok((line_counter, file_content))
+ }
+}
+
+/// read any log file comming from `smartrest_obj.log_type`
+pub fn new_read_logs(
smartrest_obj: &SmartRestLogRequest,
- plugin_config_path: &Path,
+ plugin_config: &LogPluginConfig,
) -> Result<String, anyhow::Error> {
- let plugin_config = LogPluginConfig::new(&plugin_config_path);
let mut output = String::new();
+ // first filter logs on type
+ let mut logfiles_to_read = filter_logs_on_type(&smartrest_obj, &plugin_config)?;
+ logfiles_to_read = filter_logs_path_on_metadata(&smartrest_obj, logfiles_to_read)?;
+
+ let mut line_counter = 0usize;
+ for logfile in logfiles_to_read {
+ match read_log_content(
+ logfile.as_path(),
+ line_counter,
+ smartrest_obj.lines,
+ &smartrest_obj.needle,
+ ) {
+ Ok((lines, file_content)) => {
+ line_counter = lines;
+ output.push_str(&file_content);
+ }
+ Err(_e) => {
+ // TODO filter this error for `max_lines` error only
+ break;
+ }
+ };
+ }
+
+ Ok(output)
+}
+fn filter_logs_on_type(
+ smartrest_obj: &SmartRestLogRequest,
+ plugin_config: &LogPluginConfig,
+) -> Result<Vec<PathBuf>, anyhow::Error> {
let mut files_to_send = Vec::new();
for files in &plugin_config.files {
let maybe_file_path = files.path.as_str(); // because it can be a glob pattern
@@ -82,60 +148,30 @@ pub fn read_tedge_logs(
if !file_type.eq(&smartrest_obj.log_type) {
continue;
- }
-
- // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see:
- // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24
- for entry in glob(maybe_file_path)? {
- let file_path = entry?;
- if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) {
- if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to)
- {
- files_to_send.push(file_path);
- }
- } else {
- files_to_send.push(file_path);
+ } else {
+ for entry in glob(maybe_file_path)? {
+ let file_path = entry?;
+ files_to_send.push(file_path)
}
}
}
+ Ok(files_to_send)
+}
- // loop sorted vector and push store log file to `output`
- let mut line_counter: usize = 0;
- for entry in files_to_send {
- dbg!("files to read:", &entry);
- let file_content = std::fs::read_to_string(&entry)?;
- if file_content.is_empty() {
- continue;
- }
-
- // adding file header only if line_counter permits more lines to be added
- match &entry.file_stem().and_then(|f| f.to_str()) {
- Some(file_name) if line_counter < smartrest_obj.lines => {
- output.push_str(&format!("filename: {}\n", file_name));
- }
- _ => {}
- }
-
- // split at new line delimiter ("\n")
- let mut lines = file_content.lines().rev();
- while line_counter < smartrest_obj.lines {
- if let Some(haystack) = lines.next() {
- if let Some(needle) = &smartrest_obj.needle {
- if haystack.contains(needle) {
- output.push_str(&format!("{}\n", haystack));
- line_counter += 1;
- }
- } else {
- output.push_str(&format!("{}\n", haystack));
- line_counter += 1;
- }
- } else {
- // there are no lines.next()
- break;
- }
+/// filter a vector of pathbufs according to `smartrest_obj.date_from` and `smartrest_obj.date_to`
+fn filter_logs_path_on_metadata(
+ smartrest_obj: &SmartRestLogRequest,
+ logs_path_vec: Vec<PathBuf>,
+) -> Result<Vec<PathBuf>, anyhow::Error> {
+ let mut out = vec![];
+ for file_pathbuf in logs_path_vec {
+ let metadata = std::fs::metadata(&file_pathbuf)?;
+ let datetime_modified = OffsetDateTime::from(metadata.modified()?);
+ if datetime_modified >= smartrest_obj.date_from {
+ out.push(file_pathbuf);
}
}
- Ok(output)
+ Ok(out)
}
/// executes the log file request
@@ -145,14 +181,14 @@ pub fn read_tedge_logs(
/// - sends request successful (mqtt)
pub async fn handle_logfile_request_operation(
smartrest_request: &SmartRestLogRequest,
- plugin_config_path: &Path,
+ plugin_config: &LogPluginConfig,
mqtt_client: &mut Connection,
http_client: &mut JwtAuthHttpProxy,
) -> Result<(), anyhow::Error> {
let executing = LogfileRequest::executing()?;
let () = mqtt_client.published.send(executing).await?;
- let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?;
+ let log_content = new_read_logs(&smartrest_request, &plugin_config)?;
let upload_event_url = http_client
.upload_log_binary(&smartrest_request.log_type, &log_content)
@@ -169,9 +205,114 @@ pub async fn handle_logfile_request_operation(
pub async fn handle_dynamic_log_type_update(
mqtt_client: &mut Connection,
config_dir: &Path,
-) -> Result<(), anyhow::Error> {
+) -> Result<LogPluginConfig, anyhow::Error> {
let plugin_config = LogPluginConfig::new(config_dir);
let msg = plugin_config.to_supported_config_types_message()?;
let () = mqtt_client.published.send(msg).await?;
- Ok(())
+ Ok(plugin_config)
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{
+ io::Write,
+ path::{Path, PathBuf},
+ };
+
+ use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric};
+ use tempfile::TempDir;
+
+ use crate::config::{FileEntry, LogPluginConfig};
+
+ use super::{filter_logs_on_type, filter_logs_path_on_metadata, read_log_content};
+
+ fn get_filter_on_logs_type() -> Result<(TempDir, Vec<PathBuf>), anyhow::Error> {
+ let tempdir = TempDir::new()?;
+ let tempdir_path = tempdir
+ .path()
+ .to_str()
+ .ok_or_else(|| anyhow::anyhow!("temp dir not created"))?;
+
+ std::fs::File::create(&format!("{tempdir_path}/file_a"))?;
+ std::fs::File::create(&format!("{tempdir_path}/file_b"))?;
+ std::fs::File::create(&format!("{tempdir_path}/file_c"))?;
+
+ let files = vec![
+ FileEntry {
+ path: format!("{tempdir_path}/file_a"),
+ config_type: "type_one".to_string(),
+ },
+ FileEntry {
+ path: format!("{tempdir_path}/file_b"),
+ config_type: "type_one".to_string(),
+ },
+ FileEntry {
+ path: format!("{tempdir_path}/file_c"),
+ config_type: "type_two".to_string(),
+ },
+ ];
+ let logs_config = LogPluginConfig { files: files };
+
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(
+ "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+ )?;
+
+ let after_file = filter_logs_on_type(&smartrest_obj, &logs_config)?;
+ Ok((tempdir, after_file))
+ }
+
+ #[test]
+ fn test_filter_logs_on_type() {
+ let (tempdir, after_file) = get_filter_on_logs_type().unwrap();
+ let tempdir_path = tempdir.path().to_str().unwrap();
+ assert_eq!(
+ after_file,
+ vec![
+ PathBuf::from(&format!("{tempdir_path}/file_a")),
+ PathBuf::from(&format!("{tempdir_path}/file_b"))
+ ]
+ )
+ }
+
+ #[test]
+ fn test_filter_logs_path_on_metadata() {
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(
+ "522,DeviceSerial,type_one,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+ )
+ .unwrap();
+ let (_tempdir, logs) = get_filter_on_logs_type().unwrap();
+ filter_logs_path_on_metadata(&smartrest_obj, logs).unwrap();
+ }
+
+ #[test]
+ fn test_read_log_content() {
+ let tempdir = TempDir::new().unwrap();
+ let tempdir_path = tempdir
+ .path()
+ .to_str()
+ .ok_or_else(|| anyhow::anyhow!("temp dir not created"))
+ .unwrap();
+ let file_path = &format!("{tempdir_path}/file_a.log");
+
+ let mut log_file = std::fs::OpenOptions::new()
+ .append(true)
+ .create(true)
+ .write(true)
+ .open(file_path)
+ .unwrap();
+
+ let data = "this is the first line.\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.";
+
+ let () = log_file.write_all(data.as_bytes()).unwrap();
+
+ let line_counter = 0;
+ let max_lines = 4;
+ let filter_text = None;
+
+ let (line_counter, result) =
+ read_log_content(Path::new(file_path), line_counter, max_lines, &filter_text).unwrap();
+
+ assert_eq!(line_counter, max_lines);
+ assert_eq!(result, "filename: file_a.log\nthis is the second line.\nthis is the third line.\nthis is the forth line.\nthis is the fifth line.\n");
+ }
}
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index e6c05693..90b16946 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -95,7 +95,7 @@ async fn run(
mqtt_client: &mut Connection,
http_client: &mut JwtAuthHttpProxy,
) -> Result<(), anyhow::Error> {
- let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+ let mut plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
@@ -110,7 +110,7 @@ async fn run(
let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
handle_logfile_request_operation(
&smartrest_obj,
- &config_file,
+ &plugin_config,
mqtt_client,
http_client,
)
@@ -136,7 +136,7 @@ async fn run(
if let Ok(event) = event_or_error {
match event.mask {
EventMask::CLOSE_WRITE => {
- let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
+ plugin_config = handle_dynamic_log_type_update(mqtt_client, config_file).await?;
}
_ => {}
}