diff options
-rw-r--r-- | Cargo.lock | 78 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | configuration/debian/c8y_log_plugin/postinst | 8 | ||||
-rw-r--r-- | configuration/init/systemd/c8y-log-plugin.service | 12 | ||||
-rw-r--r-- | crates/core/c8y_api/src/http_proxy.rs | 18 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_deserializer.rs | 29 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 44 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 12 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml (renamed from plugins/log_request_plugin/Cargo.toml) | 27 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/config.rs | 101 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/error.rs | 8 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/logfile_request.rs | 171 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 232 | ||||
-rw-r--r-- | plugins/log_request_plugin/src/main.rs | 66 | ||||
-rw-r--r-- | plugins/log_request_plugin/src/smartrest.rs | 191 |
15 files changed, 627 insertions, 372 deletions
@@ -416,6 +416,33 @@ dependencies = [ ] [[package]] +name = "c8y_log_plugin" +version = "0.6.4" +dependencies = [ + "anyhow", + "assert_matches", + "c8y_api", + "c8y_smartrest", + "clap 3.1.18", + "csv", + "glob", + "inotify", + "mockall 0.11.0", + "mqtt_channel", + "serde", + "serde_json", + "serial_test", + "tedge_config", + "tedge_utils", + "tempfile", + "test-case", + "thiserror", + "tokio", + "toml", + "tracing", +] + +[[package]] name = "c8y_smartrest" version = "0.6.4" dependencies = [ @@ -1124,6 +1151,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" [[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + +[[package]] name = "h2" version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1321,6 +1354,28 @@ dependencies = [ ] [[package]] +name = "inotify" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9" +dependencies = [ + "bitflags", + "futures-core", + "inotify-sys", + "libc", + "tokio", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + +[[package]] name = "instant" version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2902,29 +2957,6 @@ dependencies = [ ] [[package]] -name = "tedge_logfile_request_plugin" -version = "0.6.4" -dependencies = [ - "anyhow", - "async-trait", - "c8y_api", - "c8y_smartrest", - "csv", - "futures", - "mockall 0.10.2", - "mqtt_channel", - "reqwest", - "serde", - "serde_json", - "tedge_config", - "tempfile", - "thiserror", - "tokio", - "toml", - "tracing", -] - -[[package]] name = "tedge_mapper" version = "0.6.4" dependencies = [ @@ -4,10 +4,10 @@ members = [ "crates/core/*", "crates/tests/*", "plugins/c8y_configuration_plugin", + "plugins/c8y_log_plugin", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", "plugins/tedge_apama_plugin", - "plugins/log_request_plugin", ] resolver = "2" diff --git a/configuration/debian/c8y_log_plugin/postinst b/configuration/debian/c8y_log_plugin/postinst new file mode 100644 index 00000000..7f28420f --- /dev/null +++ b/configuration/debian/c8y_log_plugin/postinst @@ -0,0 +1,8 @@ +#!/bin/sh + +set -e + +### Create supported operation files +c8y_log_plugin --init + +#DEBHELPER# diff --git a/configuration/init/systemd/c8y-log-plugin.service b/configuration/init/systemd/c8y-log-plugin.service new file mode 100644 index 00000000..d1f758e9 --- /dev/null +++ b/configuration/init/systemd/c8y-log-plugin.service @@ -0,0 +1,12 @@ +[Unit] +Description=Thin-edge logfile retriever for Cumulocity +After=syslog.target network.target mosquitto.service + +[Service] +User=root +ExecStart=/usr/bin/c8y_log_plugin +Restart=on-failure +RestartPreventExitStatus=255 + +[Install] +WantedBy=multi-user.target diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 78d8280b..b9f78e32 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -42,6 +42,7 @@ pub trait C8YHttpProxy: Send + Sync { async fn upload_log_binary( &mut self, + log_type: &str, log_content: &str, ) -> Result<String, SMCumulocityMapperError>; @@ -269,20 +270,6 @@ impl JwtAuthHttpProxy { Ok(internal_id) } - fn create_log_event(&self) -> C8yCreateEvent { - let c8y_managed_object = C8yManagedObject { - id: self.end_point.c8y_internal_id.clone(), - }; - - C8yCreateEvent::new( - Some(c8y_managed_object), - "c8y_Logfile".to_string(), - OffsetDateTime::now_utc(), - "software-management".to_string(), - HashMap::new(), - ) - } - fn create_event( &self, event_type: String, @@ -388,11 +375,12 @@ impl C8YHttpProxy for JwtAuthHttpProxy { async fn upload_log_binary( &mut self, + log_type: &str, log_content: &str, ) -> Result<String, SMCumulocityMapperError> { let token = self.get_jwt_token().await?; - let log_event = self.create_log_event(); + let log_event = self.create_event(log_type.to_string(), None, None); let event_response_id = self.send_event_internal(log_event).await?; let binary_upload_event_url = self .end_point diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs index ff5c177c..0d2fb0c5 100644 --- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs @@ -1,4 +1,4 @@ -use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError}; +use crate::error::SmartRestDeserializerError; use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest}; use csv::ReaderBuilder; use download::DownloadInfo; @@ -306,9 +306,7 @@ impl SmartRestJwtResponse { /// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z"); /// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap(); /// ``` -pub fn get_datetime_from_file_path( - log_path: &Path, -) -> Result<OffsetDateTime, SMCumulocityMapperError> { +pub fn get_datetime_from_file_path(log_path: &Path) -> Option<OffsetDateTime> { if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) { // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z. // to extract the date, rsplit string on "-" and take (last) 3 @@ -317,16 +315,17 @@ pub fn get_datetime_from_file_path( stem_string_vec.reverse(); // join on '-' to get the date string let date_string = stem_string_vec.join("-"); - let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?; - - return Ok(dt); - } - match log_path.to_str() { - Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName( - path.to_string(), - )), - None => Err(SMCumulocityMapperError::InvalidUtf8Path), + let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339); + match dt { + Ok(dt) => { + return Some(dt); + } + Err(_) => { + return None; + } + } } + None } #[cfg(test)] @@ -650,7 +649,7 @@ mod tests { // this should return an Ok Result. let path_buf = PathBuf::from_str(file_path).unwrap(); let path_buf_datetime = get_datetime_from_file_path(&path_buf); - assert!(path_buf_datetime.is_ok()); + assert!(path_buf_datetime.is_some()); } #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")] @@ -662,6 +661,6 @@ mod tests { // this should return an err. let path_buf = PathBuf::from_str(file_path).unwrap(); let path_buf_datetime = get_datetime_from_file_path(&path_buf); - assert!(path_buf_datetime.is_err()); + assert!(path_buf_datetime.is_none()); } } diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index e830a7a4..dc407100 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -18,18 +18,17 @@ use c8y_smartrest::{ smartrest_serializer::{ CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, - SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType, - SmartRestSetSupportedOperations, + SmartRestSetOperationToSuccessful, SmartRestSetSupportedOperations, }, }; use c8y_translator::json; use logged_command::LoggedCommand; use mqtt_channel::{Message, Topic, TopicFilter}; -use plugin_sm::operation_logs::{OperationLogs, OperationLogsError}; +use plugin_sm::operation_logs::OperationLogs; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - fs::{self, File}, + fs::File, io::Read, path::{Path, PathBuf}, }; @@ -333,9 +332,6 @@ where &self.device_type, )); - let supported_log_types_message = self.wrap_error(create_supported_log_types_message( - &self.operation_logs.log_dir, - )); let pending_operations_message = self.wrap_error(create_get_pending_operations_message()); let software_list_message = self.wrap_error(create_get_software_list_message()); @@ -343,7 +339,6 @@ where inventory_fragments_message, supported_operations_message, device_data_message, - supported_log_types_message, pending_operations_message, software_list_message, ]) @@ -556,39 +551,6 @@ fn create_get_pending_operations_message() -> Result<Message, ConversionError> { Ok(Message::new(&topic, payload)) } -fn get_supported_log_types(log_dir: &Path) -> Result<Vec<String>, ConversionError> { - let mut result = HashSet::new(); - - let paths = fs::read_dir(&log_dir).unwrap(); - for path in paths { - let path = path?; - if fs::metadata(path.path())?.is_file() { - let file_name = path.file_name(); - let file_name = file_name - .to_str() - .ok_or(OperationLogsError::FileFormatError)?; - - // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management" - // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077 - if file_name.starts_with("software-list") | file_name.starts_with("software-update") { - result.insert("software-management".to_string()); - } else { - let log_type = file_name.split('-').next(); - let log_type = log_type.ok_or(OperationLogsError::FileFormatError)?; - result.insert(log_type.to_string()); - } - } - } - Ok(Vec::from_iter(result)) -} - -fn create_supported_log_types_message(log_dir: &Path) -> Result<Message, ConversionError> { - let supported_operation_types = get_supported_log_types(log_dir)?; - let payload = SmartRestSetSupportedLogType::from(supported_operation_types).to_smartrest()?; - let topic = C8yTopic::SmartRestResponse.to_topic()?; - Ok(Message::new(&topic, payload)) -} - fn create_supported_operations_fragments_message() -> Result<Message, ConversionError> { let ops = Operations::try_new(SUPPORTED_OPERATIONS_DIRECTORY, C8Y_CLOUD)?; let ops = ops.get_operations_list(); diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index c15ce8f9..117bdced 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -57,13 +57,8 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 // Start SM Mapper let sm_mapper = start_c8y_mapper(broker.port).await; - // Expect both 118 and 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails. - mqtt_tests::assert_received_all_expected( - &mut messages, - TEST_TIMEOUT_MS, - &["software-management", "500\n"], - ) - .await; + // Expect 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails. + mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &["500\n"]).await; sm_mapper.unwrap().abort(); } @@ -300,7 +295,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result mqtt_tests::assert_received_all_expected( &mut responses, TEST_TIMEOUT_MS, - &["software-management", "500\n", "503,c8y_SoftwareUpdate,\n"], + &["500\n", "503,c8y_SoftwareUpdate,\n"], ) .await; @@ -896,6 +891,7 @@ impl C8YHttpProxy for FakeC8YHttpProxy { async fn upload_log_binary( &mut self, + _log_type: &str, _log_content: &str, ) -> Result<String, SMCumulocityMapperError> { Ok("fake/upload/url".into()) diff --git a/plugins/log_request_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 4cb07317..d8158abe 100644 --- a/plugins/log_request_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -1,38 +1,41 @@ [package] -name = "tedge_logfile_request_plugin" +name = "c8y_log_plugin" version = "0.6.4" authors = ["thin-edge.io team <info@thin-edge.io>"] edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" -description = "Thin.edge.io operation plugin for Cumulocity log request" +description = "Thin-edge device log file retriever for Cumulocity" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [package.metadata.deb] -pre-depends = "tedge_mapper" +maintainer-scripts = "../../configuration/debian/c8y_log_plugin" assets = [ - ["../../configuration/contrib/operations/c8y/c8y_LogfileRequest", "/etc/tedge/operations/c8y/", "644"], - ["target/release/tedge_logfile_request_plugin", "/usr/bin/tedge_logfile_request_plugin", "755"], + ["../../configuration/init/systemd/c8y-log-plugin.service", "/lib/systemd/system/c8y-log-plugin.service", "644"], + ["target/release/c8y_log_plugin", "/usr/bin/c8y_log_plugin", "755"], ] [dependencies] anyhow = "1.0" -async-trait = "0.1" c8y_api = { path = "../../crates/core/c8y_api" } c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } +clap = { version = "3.0", features = ["cargo", "derive"] } csv = "1.1" -futures = "0.3" -mockall = "0.10" -reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } +glob = "0.3" +inotify = "0.10" +mqtt_channel = { path = "../../crates/common/mqtt_channel" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +tedge_config = { path = "../../crates/common/tedge_config" } +tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } -tedge_config = { path = "../../crates/common/tedge_config" } -mqtt_channel = { path = "../../crates/common/mqtt_channel" } [dev-dependencies] +assert_matches = "1.5" +mockall = "0.11" tempfile = "3.3" +test-case = "2.0" +serial_test = "0.6" diff --git a/plugins/c8y_log_plugin/src/config.rs b/plugins/c8y_log_plugin/src/config.rs new file mode 100644 index 00000000..580fba95 --- /dev/null +++ b/plugins/c8y_log_plugin/src/config.rs @@ -0,0 +1,101 @@ +use c8y_smartrest::topic::C8yTopic; +use mqtt_channel::Message; +use serde::Deserialize; +use std::{borrow::Borrow, path::Path}; +use std::{collections::HashSet, fs}; +use tracing::warn; + +#[derive(Deserialize, Debug, Eq, PartialEq, Default)] +#[serde(deny_unknown_fields)] +pub struct LogPluginConfig { + pub files: Vec<FileEntry>, +} + +#[derive(Deserialize, Debug, Eq, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct FileEntry { + pub(crate) path: String, + #[serde(rename = "type")] + pub config_type: String, +} + +impl PartialEq for FileEntry { + fn eq(&self, other: &Self) -> bool { + self.config_type == other.config_type + } +} + +impl Borrow<String> for FileEntry { + fn borrow(&self) -> &String { + &self.config_type + } +} + +impl LogPluginConfig { + pub fn new(config_file_path: &Path) -> Self { + let config = Self::read_config(config_file_path); + config + } + + pub fn read_config(path: &Path) -> Self { + let path_str = path.display().to_string(); + match fs::read_to_string(path) { + Ok(contents) => match toml::from_str(contents.as_str()) { + Ok(config) => config, + _ => { + warn!("The config file {} is malformed.", path_str); + Self::default() + } + }, + Err(_) => { + warn!( + "The config file {} does not exist or is not readable.", + path_str + ); + Self::default() + } + } + } + + pub fn to_supported_config_types_message(&self) -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + Ok(Message::new(&topic, self.to_smartrest_payload())) + } + + pub fn get_all_file_types(&self) -> Vec<String> { + self.files + .iter() + .map(|x| x.config_type.to_string()) + .collect::<HashSet<_>>() + .iter() + .map(|x| x.to_string()) + .collect::<Vec<_>>() + } + + // 118,typeA,typeB,... + fn to_smartrest_payload(&self) -> String { + let mut config_types = self.get_all_file_types(); + let () = config_types.sort(); + let supported_config_types = config_types.join(","); + format!("118,{supported_config_types}") + } +} + +#[test] +fn test_no_duplicated_file_types() { + let files = vec![ + FileEntry { + path: "a/path".to_string(), + config_type: "type_one".to_string(), + }, + FileEntry { + path: "some/path".to_string(), + config_type: "type_one".to_string(), + }, + ]; + let logs_config = LogPluginConfig { files: files }; + assert_eq!( + logs_config.get_all_file_types(), + vec!["type_one".to_string()] + ); +} diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs new file mode 100644 index 00000000..045d4bf3 --- /dev/null +++ b/plugins/c8y_log_plugin/src/error.rs @@ -0,0 +1,8 @@ +#[derive(thiserror::Error, Debug)] +pub enum LogRetrievalError { + #[error(transparent)] + FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError), + + #[error(transparent)] + FromConfigSetting(#[from] tedge_config::ConfigSettingError), +} diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs new file mode 100644 index 00000000..35867948 --- /dev/null +++ b/plugins/c8y_log_plugin/src/logfile_request.rs @@ -0,0 +1,171 @@ +use std::path::Path; + +use glob::glob; + +use crate::config::LogPluginConfig; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::{ + smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}, + smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, + TryIntoOperationStatusMessage, + }, +}; +use mqtt_channel::{Connection, SinkExt}; + +pub struct LogfileRequest {} + +impl TryIntoOperationStatusMessage for LogfileRequest { + /// returns a c8y message specifying to set log status to executing. + /// + /// example message: '501,c8y_LogfileRequest' + fn status_executing() -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest() + } + + fn status_successful( + parameter: Option<String>, + ) -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(¶meter.unwrap()) + .to_smartrest() + } + + fn status_failed( + failure_reason: String, + ) -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yLogFileRequest, + failure_reason, + ) + .to_smartrest() + } +} +/// Reads tedge logs according to `SmartRestLogRequest`. +/// +/// If needed, logs are concatenated. +/// +/// Logs are sorted alphanumerically from oldest to newest. +/// +/// # Examples +/// +/// ``` +/// let smartrest_obj = SmartRestLogRequest::from_smartrest( +/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", +/// ) +/// .unwrap(); +/// +/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap(); +/// ``` +pub fn read_tedge_logs( + smartrest_obj: &SmartRestLogRequest, + plugin_config_path: &Path, +) -> Result<String, anyhow::Error> { + let plugin_config = LogPluginConfig::new(&plugin_config_path); + let mut output = String::new(); + + let mut files_to_send = Vec::new(); + for files in &plugin_config.files { + let maybe_file_path = files.path.as_str(); // because it can be a glob pattern + let file_type = files.config_type.as_str(); + + if !file_type.eq(&smartrest_obj.log_type) { + continue; + } + + // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see: + // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24 + for entry in glob(maybe_file_path)? { + let file_path = entry?; + if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) { + if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to) + { + files_to_send.push(file_path); + } + } else { + files_to_send.push(file_path); + } + } + } + + // loop sorted vector and push store log file to `output` + let mut line_counter: usize = 0; + for entry in files_to_send { + dbg!("files to read:", &entry); + let file_content = std::fs::read_to_string(&entry)?; + if file_content.is_empty() { + continue; + } + + // adding file header only if line_counter permits more lines to be added + match &entry.file_stem().and_then(|f| f.to_str()) { + Some(file_name) if line_counter < smartrest_obj.lines => { + output.push_str(&format!("filename: {}\n", file_name)); + } + _ => {} + } + + // split at new line delimiter ("\n") + let mut lines = file_content.lines().rev(); + while line_counter < smartrest_obj.lines { + if let Some(haystack) = lines.next() { + if let Some(needle) = &smartrest_obj.needle { + if haystack.contains(needle) { + output.push_str(&format!("{}\n", haystack)); + line_counter += 1; + } + } else { + output.push_str(&format!("{}\n", haystack)); + line_counter += 1; + } + } else { + // there are no lines.next() + break; + } + } + } + Ok(output) +} + +pub async fn handle_logfile_request_operation( + smartrest_request: &SmartRestLogRequest, + plugin_config_path: &Path, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + // executing + let executing = LogfileRequest::executing()?; + let () = mqtt_client.published.send(executing).await?; + + let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?; + + let upload_event_url = http_client + .upload_log_binary(&smartrest_request.log_type, &log_content) + .await?; + + let successful = LogfileRequest::successful(Some(upload_event_url))?; + let () = mqtt_client.published.send(successful).await?; + + Ok(()) +} + +pub async fn handle_dynamic_log_type_update( + mqtt_client: &mut Connection, + config_dir: &Path, +) -> Result<(), anyhow::Error> { + let plugin_config = LogPluginConfig::new(config_dir); + let msg = plugin_config.to_supported_config_types_message()?; + let () = mqtt_client.published.send(msg).await?; + Ok(()) +} diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs new file mode 100644 index 00000000..a799a9b2 --- /dev/null +++ b/plugins/c8y_log_plugin/src/main.rs @@ -0,0 +1,232 @@ +mod config; +mod error; +mod logfile_request; + +use anyhow::Result; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; +use c8y_smartrest::topic::C8yTopic; +use clap::Parser; + +use inotify::{EventMask, EventStream}; +use inotify::{Inotify, WatchMask}; +use mqtt_channel::{Connection, StreamExt}; +use std::path::{Path, PathBuf}; +use tedge_config::{ + ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, + DEFAULT_TEDGE_CONFIG_PATH, +}; +use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use tracing::{error, info}; + +use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation}; + +const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml"; +const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`. +`c8y_log_plugin` subscribes then to `c8y/s/ds` listening for logfile operation requests (`522`) notifying the Cumulocity tenant of their progress (messages `501`, `502` and `503`). + +The thin-edge `CONFIG_DIR` is used to store: + * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrived"#; + +#[derive(Debug, clap::Parser, Clone)] +#[clap( +name = clap::crate_name!(), +version = clap::crate_version!(), +about = clap::crate_description!(), +after_help = AFTER_HELP_TEXT +)] +pub struct LogfileRequestPluginOpt { + /// Turn-on the debug log level. + /// + /// If off only reports ERROR, WARN, and INFO + /// If on also reports DEBUG and TRACE + #[clap(long)] + pub debug: bool, + + /// Create supported operation files + #[clap(short, long)] + pub init: bool, + + #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)] + pub config_dir: PathBuf, +} + +async fn create_mqtt_client( + tedge_config: &TEdgeConfig, +) -> Result<mqtt_channel::Connection, anyhow::Error> { + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(mqtt_port) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestRequest.as_str(), + )); + + let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + Ok(mqtt_client) +} + +pub async fn create_http_client( + tedge_config: &TEdgeConfig, +) -> Result<JwtAuthHttpProxy, anyhow::Error> { + let mut http_proxy = JwtAuthHttpProxy::try_new(tedge_config).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + +fn create_inofity_file_watch_stream( + config_file: &Path, +) -> Result<EventStream<[u8; 1024]>, anyhow::Error> { + let buffer = [0; 1024]; + let mut inotify = |