From 8645e200799d40a2d0e7231c457bd096d926d7ec Mon Sep 17 00:00:00 2001 From: initard Date: Mon, 9 May 2022 10:31:16 +0100 Subject: c8y_log_plugin implementation #1077 - log plugin is now a daemon - log file uses a configuration file to request logs from c8y UI - log plugin uses inotify to automatically update c8y of new logs files Signed-off-by: initard --- plugins/c8y_log_plugin/Cargo.toml | 41 +++++ plugins/c8y_log_plugin/src/config.rs | 101 +++++++++++ plugins/c8y_log_plugin/src/error.rs | 8 + plugins/c8y_log_plugin/src/logfile_request.rs | 171 +++++++++++++++++++ plugins/c8y_log_plugin/src/main.rs | 232 ++++++++++++++++++++++++++ plugins/log_request_plugin/Cargo.toml | 38 ----- plugins/log_request_plugin/src/main.rs | 66 -------- plugins/log_request_plugin/src/smartrest.rs | 191 --------------------- 8 files changed, 553 insertions(+), 295 deletions(-) create mode 100644 plugins/c8y_log_plugin/Cargo.toml create mode 100644 plugins/c8y_log_plugin/src/config.rs create mode 100644 plugins/c8y_log_plugin/src/error.rs create mode 100644 plugins/c8y_log_plugin/src/logfile_request.rs create mode 100644 plugins/c8y_log_plugin/src/main.rs delete mode 100644 plugins/log_request_plugin/Cargo.toml delete mode 100644 plugins/log_request_plugin/src/main.rs delete mode 100644 plugins/log_request_plugin/src/smartrest.rs (limited to 'plugins') diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml new file mode 100644 index 00000000..d8158abe --- /dev/null +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "c8y_log_plugin" +version = "0.6.4" +authors = ["thin-edge.io team "] +edition = "2021" +rust-version = "1.58.1" +license = "Apache-2.0" +description = "Thin-edge device log file retriever for Cumulocity" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[package.metadata.deb] +maintainer-scripts = "../../configuration/debian/c8y_log_plugin" +assets = [ + ["../../configuration/init/systemd/c8y-log-plugin.service", "/lib/systemd/system/c8y-log-plugin.service", "644"], + ["target/release/c8y_log_plugin", "/usr/bin/c8y_log_plugin", "755"], +] + +[dependencies] +anyhow = "1.0" +c8y_api = { path = "../../crates/core/c8y_api" } +c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } +clap = { version = "3.0", features = ["cargo", "derive"] } +csv = "1.1" +glob = "0.3" +inotify = "0.10" +mqtt_channel = { path = "../../crates/common/mqtt_channel" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tedge_config = { path = "../../crates/common/tedge_config" } +tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +thiserror = "1.0" +tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } +toml = "0.5" +tracing = { version = "0.1", features = ["attributes", "log"] } + +[dev-dependencies] +assert_matches = "1.5" +mockall = "0.11" +tempfile = "3.3" +test-case = "2.0" +serial_test = "0.6" diff --git a/plugins/c8y_log_plugin/src/config.rs b/plugins/c8y_log_plugin/src/config.rs new file mode 100644 index 00000000..580fba95 --- /dev/null +++ b/plugins/c8y_log_plugin/src/config.rs @@ -0,0 +1,101 @@ +use c8y_smartrest::topic::C8yTopic; +use mqtt_channel::Message; +use serde::Deserialize; +use std::{borrow::Borrow, path::Path}; +use std::{collections::HashSet, fs}; +use tracing::warn; + +#[derive(Deserialize, Debug, Eq, PartialEq, Default)] +#[serde(deny_unknown_fields)] +pub struct LogPluginConfig { + pub files: Vec, +} + +#[derive(Deserialize, Debug, Eq, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct FileEntry { + pub(crate) path: String, + #[serde(rename = "type")] + pub config_type: String, +} + +impl PartialEq for FileEntry { + fn eq(&self, other: &Self) -> bool { + self.config_type == other.config_type + } +} + +impl Borrow for FileEntry { + fn borrow(&self) -> &String { + &self.config_type + } +} + +impl LogPluginConfig { + pub fn new(config_file_path: &Path) -> Self { + let config = Self::read_config(config_file_path); + config + } + + pub fn read_config(path: &Path) -> Self { + let path_str = path.display().to_string(); + match fs::read_to_string(path) { + Ok(contents) => match toml::from_str(contents.as_str()) { + Ok(config) => config, + _ => { + warn!("The config file {} is malformed.", path_str); + Self::default() + } + }, + Err(_) => { + warn!( + "The config file {} does not exist or is not readable.", + path_str + ); + Self::default() + } + } + } + + pub fn to_supported_config_types_message(&self) -> Result { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + Ok(Message::new(&topic, self.to_smartrest_payload())) + } + + pub fn get_all_file_types(&self) -> Vec { + self.files + .iter() + .map(|x| x.config_type.to_string()) + .collect::>() + .iter() + .map(|x| x.to_string()) + .collect::>() + } + + // 118,typeA,typeB,... + fn to_smartrest_payload(&self) -> String { + let mut config_types = self.get_all_file_types(); + let () = config_types.sort(); + let supported_config_types = config_types.join(","); + format!("118,{supported_config_types}") + } +} + +#[test] +fn test_no_duplicated_file_types() { + let files = vec![ + FileEntry { + path: "a/path".to_string(), + config_type: "type_one".to_string(), + }, + FileEntry { + path: "some/path".to_string(), + config_type: "type_one".to_string(), + }, + ]; + let logs_config = LogPluginConfig { files: files }; + assert_eq!( + logs_config.get_all_file_types(), + vec!["type_one".to_string()] + ); +} diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs new file mode 100644 index 00000000..045d4bf3 --- /dev/null +++ b/plugins/c8y_log_plugin/src/error.rs @@ -0,0 +1,8 @@ +#[derive(thiserror::Error, Debug)] +pub enum LogRetrievalError { + #[error(transparent)] + FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError), + + #[error(transparent)] + FromConfigSetting(#[from] tedge_config::ConfigSettingError), +} diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs new file mode 100644 index 00000000..35867948 --- /dev/null +++ b/plugins/c8y_log_plugin/src/logfile_request.rs @@ -0,0 +1,171 @@ +use std::path::Path; + +use glob::glob; + +use crate::config::LogPluginConfig; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::{ + smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}, + smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, + TryIntoOperationStatusMessage, + }, +}; +use mqtt_channel::{Connection, SinkExt}; + +pub struct LogfileRequest {} + +impl TryIntoOperationStatusMessage for LogfileRequest { + /// returns a c8y message specifying to set log status to executing. + /// + /// example message: '501,c8y_LogfileRequest' + fn status_executing() -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest() + } + + fn status_successful( + parameter: Option, + ) -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(¶meter.unwrap()) + .to_smartrest() + } + + fn status_failed( + failure_reason: String, + ) -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yLogFileRequest, + failure_reason, + ) + .to_smartrest() + } +} +/// Reads tedge logs according to `SmartRestLogRequest`. +/// +/// If needed, logs are concatenated. +/// +/// Logs are sorted alphanumerically from oldest to newest. +/// +/// # Examples +/// +/// ``` +/// let smartrest_obj = SmartRestLogRequest::from_smartrest( +/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", +/// ) +/// .unwrap(); +/// +/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap(); +/// ``` +pub fn read_tedge_logs( + smartrest_obj: &SmartRestLogRequest, + plugin_config_path: &Path, +) -> Result { + let plugin_config = LogPluginConfig::new(&plugin_config_path); + let mut output = String::new(); + + let mut files_to_send = Vec::new(); + for files in &plugin_config.files { + let maybe_file_path = files.path.as_str(); // because it can be a glob pattern + let file_type = files.config_type.as_str(); + + if !file_type.eq(&smartrest_obj.log_type) { + continue; + } + + // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see: + // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24 + for entry in glob(maybe_file_path)? { + let file_path = entry?; + if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) { + if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to) + { + files_to_send.push(file_path); + } + } else { + files_to_send.push(file_path); + } + } + } + + // loop sorted vector and push store log file to `output` + let mut line_counter: usize = 0; + for entry in files_to_send { + dbg!("files to read:", &entry); + let file_content = std::fs::read_to_string(&entry)?; + if file_content.is_empty() { + continue; + } + + // adding file header only if line_counter permits more lines to be added + match &entry.file_stem().and_then(|f| f.to_str()) { + Some(file_name) if line_counter < smartrest_obj.lines => { + output.push_str(&format!("filename: {}\n", file_name)); + } + _ => {} + } + + // split at new line delimiter ("\n") + let mut lines = file_content.lines().rev(); + while line_counter < smartrest_obj.lines { + if let Some(haystack) = lines.next() { + if let Some(needle) = &smartrest_obj.needle { + if haystack.contains(needle) { + output.push_str(&format!("{}\n", haystack)); + line_counter += 1; + } + } else { + output.push_str(&format!("{}\n", haystack)); + line_counter += 1; + } + } else { + // there are no lines.next() + break; + } + } + } + Ok(output) +} + +pub async fn handle_logfile_request_operation( + smartrest_request: &SmartRestLogRequest, + plugin_config_path: &Path, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + // executing + let executing = LogfileRequest::executing()?; + let () = mqtt_client.published.send(executing).await?; + + let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?; + + let upload_event_url = http_client + .upload_log_binary(&smartrest_request.log_type, &log_content) + .await?; + + let successful = LogfileRequest::successful(Some(upload_event_url))?; + let () = mqtt_client.published.send(successful).await?; + + Ok(()) +} + +pub async fn handle_dynamic_log_type_update( + mqtt_client: &mut Connection, + config_dir: &Path, +) -> Result<(), anyhow::Error> { + let plugin_config = LogPluginConfig::new(config_dir); + let msg = plugin_config.to_supported_config_types_message()?; + let () = mqtt_client.published.send(msg).await?; + Ok(()) +} diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs new file mode 100644 index 00000000..a799a9b2 --- /dev/null +++ b/plugins/c8y_log_plugin/src/main.rs @@ -0,0 +1,232 @@ +mod config; +mod error; +mod logfile_request; + +use anyhow::Result; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; +use c8y_smartrest::topic::C8yTopic; +use clap::Parser; + +use inotify::{EventMask, EventStream}; +use inotify::{Inotify, WatchMask}; +use mqtt_channel::{Connection, StreamExt}; +use std::path::{Path, PathBuf}; +use tedge_config::{ + ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, + DEFAULT_TEDGE_CONFIG_PATH, +}; +use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use tracing::{error, info}; + +use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation}; + +const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml"; +const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`. +`c8y_log_plugin` subscribes then to `c8y/s/ds` listening for logfile operation requests (`522`) notifying the Cumulocity tenant of their progress (messages `501`, `502` and `503`). + +The thin-edge `CONFIG_DIR` is used to store: + * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrived"#; + +#[derive(Debug, clap::Parser, Clone)] +#[clap( +name = clap::crate_name!(), +version = clap::crate_version!(), +about = clap::crate_description!(), +after_help = AFTER_HELP_TEXT +)] +pub struct LogfileRequestPluginOpt { + /// Turn-on the debug log level. + /// + /// If off only reports ERROR, WARN, and INFO + /// If on also reports DEBUG and TRACE + #[clap(long)] + pub debug: bool, + + /// Create supported operation files + #[clap(short, long)] + pub init: bool, + + #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)] + pub config_dir: PathBuf, +} + +async fn create_mqtt_client( + tedge_config: &TEdgeConfig, +) -> Result { + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(mqtt_port) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestRequest.as_str(), + )); + + let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + Ok(mqtt_client) +} + +pub async fn create_http_client( + tedge_config: &TEdgeConfig, +) -> Result { + let mut http_proxy = JwtAuthHttpProxy::try_new(tedge_config).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + +fn create_inofity_file_watch_stream( + config_file: &Path, +) -> Result, anyhow::Error> { + let buffer = [0; 1024]; + let mut inotify = Inotify::init().expect("Error while initializing inotify instance"); + + inotify + .add_watch(&config_file, WatchMask::CLOSE_WRITE) + .expect("Failed to add file watch"); + + Ok(inotify.event_stream(buffer)?) +} + +async fn run( + config_file: &Path, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + + let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + + loop { + tokio::select! { + message = mqtt_client.received.next() => { + if let Some(message) = message { + if let Ok(payload) = message.payload_str() { + let result = match payload.split(',').next().unwrap_or_default() { + "522" => { + // retrieve smartrest object from payload + let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; + handle_logfile_request_operation( + &smartrest_obj, + &config_file, + mqtt_client, + http_client, + ) + .await + } + _ => { + // Ignore operation messages not meant for this plugin + Ok(()) + } + }; + + if let Err(err) = result { + error!("Handling of operation: '{}' failed with {}", payload, err); + } + } + } + else { + // message is None and the connection has been closed + return Ok(()); + } + } + Some(event_or_error) = inotify_stream.next() => { + if let Ok(event) = event_or_error { + match event.mask { + EventMask::CLOSE_WRITE => { + let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + } + _ => {} + } + } + + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let config_plugin_opt = LogfileRequestPluginOpt::parse(); + let config_file = PathBuf::from(&format!( + "{}/{DEFAULT_PLUGIN_CONFIG_FILE}", + &config_plugin_opt + .config_dir + .to_str() + .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH) + )); + + tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug); + + // Load tedge config from the provided location + let tedge_config_location = + tedge_config::TEdgeConfigLocation::from_custom_root(&config_plugin_opt.config_dir); + let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()); + let tedge_config = config_repository.load()?; + + let logs_dir = tedge_config.query(LogPathSetting)?; + let logs_dir = PathBuf::from(logs_dir.to_string()); + + if config_plugin_opt.init { + let () = init(&config_plugin_opt.config_dir, &logs_dir)?; + return Ok(()); + } + + // Create required clients + let mut mqtt_client = create_mqtt_client(&tedge_config).await?; + let mut http_client = create_http_client(&tedge_config).await?; + + let () = run(&config_file, &mut mqtt_client, &mut http_client).await?; + Ok(()) +} + +fn init(config_dir: &PathBuf, logs_dir: &PathBuf) -> Result<(), anyhow::Error> { + info!("Creating supported operation files"); + let config_dir = config_dir.as_path().display().to_string(); + let logs_dir = logs_dir.as_path().display().to_string(); + let () = create_init_logs_directories_and_files(config_dir.as_str(), logs_dir.as_str())?; + Ok(()) +} + +/// for the log plugin to work the following directories and files are needed: +/// +/// Directories: +/// - LOGS_DIR/tedge/agent +/// - config_dir/operations/c8y +/// - CONFIG_DIR/c8y +/// +/// Files: +/// - config_dir/operations/c8y/c8y_LogfileRequest +/// - CONFIG_DIR/c8y/log/c8y-log-plugin.toml +fn create_init_logs_directories_and_files( + config_dir: &str, + logs_dir: &str, +) -> Result<(), anyhow::Error> { + // creating logs_dir + create_directory_with_user_group(&format!("{logs_dir}/tedge"), "tedge", "tedge", 0o755)?; + create_directory_with_user_group(&format!("{logs_dir}/tedge/agent"), "tedge", "tedge", 0o755)?; + // creating /operations/c8y directories + create_directory_with_user_group(&format!("{config_dir}/operations"), "tedge", "tedge", 0o755)?; + create_directory_with_user_group( + &format!("{config_dir}/operations/c8y"), + "tedge", + "tedge", + 0o755, + )?; + // creating c8y_LogfileRequest operation file + create_file_with_user_group( + &format!("{config_dir}/operations/c8y/c8y_LogfileRequest"), + "tedge", + "tedge", + 0o755, + )?; + // creating c8y directory + create_directory_with_user_group(&format!("{config_dir}/c8y"), "tedge", "tedge", 0o755)?; + // creating c8y-log-plugin.toml + // NOTE: file needs 775 permission or inotify can not watch for changes inside the file + create_file_with_user_group( + &format!("{config_dir}/{DEFAULT_PLUGIN_CONFIG_FILE}"), + "tedge", + "tedge", + 0o775, + )?; + Ok(()) +} diff --git a/plugins/log_request_plugin/Cargo.toml b/plugins/log_request_plugin/Cargo.toml deleted file mode 100644 index 4cb07317..00000000 --- a/plugins/log_request_plugin/Cargo.toml +++ /dev/null @@ -1,38 +0,0 @@ -[package] -name = "tedge_logfile_request_plugin" -version = "0.6.4" -authors = ["thin-edge.io team "] -edition = "2021" -rust-version = "1.58.1" -license = "Apache-2.0" -description = "Thin.edge.io operation plugin for Cumulocity log request" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[package.metadata.deb] -pre-depends = "tedge_mapper" -assets = [ - ["../../configuration/contrib/operations/c8y/c8y_LogfileRequest", "/etc/tedge/operations/c8y/", "644"], - ["target/release/tedge_logfile_request_plugin", "/usr/bin/tedge_logfile_request_plugin", "755"], -] - -[dependencies] -anyhow = "1.0" -async-trait = "0.1" -c8y_api = { path = "../../crates/core/c8y_api" } -c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } -csv = "1.1" -futures = "0.3" -mockall = "0.10" -reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -thiserror = "1.0" -tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } -toml = "0.5" -tracing = { version = "0.1", features = ["attributes", "log"] } -tedge_config = { path = "../../crates/common/tedge_config" } -mqtt_channel = { path = "../../crates/common/mqtt_channel" } - -[dev-dependencies] -tempfile = "3.3" diff --git a/plugins/log_request_plugin/src/main.rs b/plugins/log_request_plugin/src/main.rs deleted file mode 100644 index d448124b..00000000 --- a/plugins/log_request_plugin/src/main.rs +++ /dev/null @@ -1,66 +0,0 @@ -mod smartrest; - -use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; -use c8y_smartrest::{smartrest_deserializer::SmartRestLogRequest, topic::C8yTopic}; -use tedge_config::{get_tedge_config, ConfigSettingAccessor, MqttPortSetting}; - -use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric; -use futures::SinkExt; - -use smartrest::{ - get_log_file_request_done_message, get_log_file_request_executing, read_tedge_logs, -}; - -const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; - -/// creates an mqtt client -pub async fn create_mqtt_client() -> Result { - let tedge_config = get_tedge_config()?; - let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); - let mqtt_config = mqtt_channel::Config::default() - .with_port(mqtt_port) - .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( - C8yTopic::SmartRestResponse.as_str(), - )); - - let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; - Ok(mqtt_client) -} - -/// creates an http client -pub async fn create_http_client() -> Result { - let config = get_tedge_config()?; - let mut http_proxy = JwtAuthHttpProxy::try_new(&config).await?; - let () = http_proxy.init().await?; - Ok(http_proxy) -} - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - // reading payload from command line arguments - let payload = std::env::args().nth(1).expect("no payload given"); - - // creating required clients - let mut mqtt_client = create_mqtt_client().await?; - let mut http_client = create_http_client().await?; - - // retrieve smartrest object from payload - let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; - - // 1. set log file request to executing - let msg = get_log_file_request_executing().await?; - let () = mqtt_client.published.send(msg).await?; - // 2. read logs - let log_content = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?; - - // 3. upload log file - let upload_event_url = http_client.upload_log_binary(&log_content).await?; - - // 4. set log file request to done - let msg = get_log_file_request_done_message(&upload_event_url).await?; - let () = mqtt_client.published.send(msg).await?; - - mqtt_client.close().await; - - Ok(()) -} diff --git a/plugins/log_request_plugin/src/smartrest.rs b/plugins/log_request_plugin/src/smartrest.rs deleted file mode 100644 index b9155030..00000000 --- a/plugins/log_request_plugin/src/smartrest.rs +++ /dev/null @@ -1,191 +0,0 @@ -use c8y_smartrest::{ - smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}, - smartrest_serializer::{ - CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, - SmartRestSetOperationToSuccessful, - }, - topic::C8yTopic, -}; -use mqtt_channel::Message; - -/// returns a c8y message specifying to set log status to executing. -/// -/// example message: '501,c8y_LogfileRequest' -pub async fn get_log_file_request_executing() -> Result { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let smartrest_set_operation_status = - SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) - .to_smartrest()?; - Ok(Message::new(&topic, smartrest_set_operation_status)) -} - -/// returns a c8y message specifying to set log status to successful. -/// -/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...' -pub async fn get_log_file_request_done_message( - binary_upload_event_url: &str, -) -> Result { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let smartrest_set_operation_status = - SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) - .with_response_parameter(binary_upload_event_url) - .to_smartrest()?; - - Ok(Message::new(&topic, smartrest_set_operation_status)) -} - -/// Reads tedge logs according to `SmartRestLogRequest`. -/// -/// If needed, logs are concatenated. -/// -/// Logs are sorted alphanumerically from oldest to newest. -/// -/// # Examples -/// -/// ``` -/// let smartrest_obj = SmartRestLogRequest::from_smartrest( -/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", -/// ) -/// .unwrap(); -/// -/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap(); -/// ``` -pub fn read_tedge_logs( - smartrest_obj: &SmartRestLogRequest, - logs_dir: &str, -) -> Result { - let mut output = String::new(); - - // NOTE: As per documentation of std::fs::read_dir: - // "The order in which this iterator returns entries is platform and filesystem dependent." - // Therefore, files are sorted by date. - let mut read_vector: Vec<_> = std::fs::read_dir(logs_dir)? - .filter_map(|r| r.ok()) - .filter(|dir_entry| { - get_datetime_from_file_path(&dir_entry.path()) - .map(|dt| !(dt < smartrest_obj.date_from || dt > smartrest_obj.date_to)) - .unwrap_or(false) - }) - .filter(|dir_entry| { - let file_name = &dir_entry.file_name(); - let mut file_name = file_name.to_str().unwrap(); - - // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management" - // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077 - if file_name.starts_with("software-list") | file_name.starts_with("software-update") { - file_name = "software-management"; - } - - file_name.starts_with(&smartrest_obj.log_type) - }) - .collect(); - - read_vector.sort_by_key(|dir| dir.path()); - - // loop sorted vector and push store log file to `output` - let mut line_counter: usize = 0; - for entry in read_vector { - let file_path = entry.path(); - let file_content = std::fs::read_to_string(&file_path)?; - if file_content.is_empty() { - continue; - } - - // adding file header only if line_counter permits more lines to be added - match &file_path.file_stem().and_then(|f| f.to_str()) { - Some(file_name) if line_counter < smartrest_obj.lines => { - output.push_str(&format!("filename: {}\n", file_name)); - } - _ => {} - } - - // split at new line delimiter ("\n") - let mut lines = file_content.lines(); - while line_counter < smartrest_obj.lines { - if let Some(haystack) = lines.next() { - if let Some(needle) = &smartrest_obj.needle { - if haystack.contains(needle) { - output.push_str(&format!("{}\n", haystack)); - line_counter += 1; - } - } else { - output.push_str(&format!("{}\n", haystack)); - line_counter += 1; - } - } else { - // there are no lines.next() - break; - } - } - } - Ok(output) -} - -#[cfg(test)] -mod tests { - use super::read_tedge_logs; - use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; - use std::fs::File; - use std::io::Write; - - fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] { - let mut files: Vec<&str> = vec![]; - for line in log_content.lines() { - if line.contains("filename: ") { - let filename: &str = line.split("filename: ").last().unwrap(); - files.push(filename); - } - } - match files.try_into() { - Ok(arr) => arr, - Err(_) => panic!("Could not convert to Array &str, size 5"), - } - } - - #[test] - /// testing read_tedge_logs - /// - /// this test creates 5 fake log files in a temporary directory. - /// files are dated 2021-01-0XT01:00Z, where X = a different day. - /// - /// this tests will assert that files are read alphanumerically from oldest to newest - fn test_read_logs() { - // order in which files are created - const LOG_FILE_NAMES: [&str; 5] = [ - "software-list-2021-01-03T01:00:00Z.log", - "software-list-2021-01-02T01:00:00Z.log", - "software-list-2021-01-01T01:00:00Z.log", - "software-update-2021-01-03T01:00:00Z.log", - "software-update-2021-01-02T01:00:00Z.log", - ]; - - // expected (sorted) output - const EXPECTED_OUTPUT: [&str; 5] = [ - "software-list-2021-01-01T01:00:00Z", - "software-list-2021-01-02T01:00:00Z", - "software-list-2021-01-03T01:00:00Z", - "software-update-2021-01-02T01:00:00Z", - "software-update-2021-01-03T01:00:00Z", - ]; - - let smartrest_obj = SmartRestLogRequest::from_smartrest( - "522,DeviceSerial,software-management,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", - ) - .unwrap(); - - let temp_dir = tempfile::tempdir().unwrap(); - // creating the files - for (idx, file) in LOG_FILE_NAMES.iter().enumerate() { - let file_path = &temp_dir.path().join(file); - let mut file = File::create(file_path).unwrap(); - writeln!(file, "file num {}", idx).unwrap(); - } - - // reading the logs and extracting the file names from the log output. - let output = read_tedge_logs(&smartrest_obj, temp_dir.path().to_str().unwrap()).unwrap(); - let parsed_values = parse_file_names_from_log_content(&output); - - // asserting the order = `EXPECTED_OUTPUT` - assert!(parsed_values.eq(&EXPECTED_OUTPUT)); - } -} -- cgit v1.2.3