diff options
Diffstat (limited to 'plugins/c8y_log_plugin')
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 41 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/config.rs | 101 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/error.rs | 8 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/logfile_request.rs | 171 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 232 |
5 files changed, 553 insertions, 0 deletions
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml new file mode 100644 index 00000000..d8158abe --- /dev/null +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "c8y_log_plugin" +version = "0.6.4" +authors = ["thin-edge.io team <info@thin-edge.io>"] +edition = "2021" +rust-version = "1.58.1" +license = "Apache-2.0" +description = "Thin-edge device log file retriever for Cumulocity" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[package.metadata.deb] +maintainer-scripts = "../../configuration/debian/c8y_log_plugin" +assets = [ + ["../../configuration/init/systemd/c8y-log-plugin.service", "/lib/systemd/system/c8y-log-plugin.service", "644"], + ["target/release/c8y_log_plugin", "/usr/bin/c8y_log_plugin", "755"], +] + +[dependencies] +anyhow = "1.0" +c8y_api = { path = "../../crates/core/c8y_api" } +c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } +clap = { version = "3.0", features = ["cargo", "derive"] } +csv = "1.1" +glob = "0.3" +inotify = "0.10" +mqtt_channel = { path = "../../crates/common/mqtt_channel" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tedge_config = { path = "../../crates/common/tedge_config" } +tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +thiserror = "1.0" +tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } +toml = "0.5" +tracing = { version = "0.1", features = ["attributes", "log"] } + +[dev-dependencies] +assert_matches = "1.5" +mockall = "0.11" +tempfile = "3.3" +test-case = "2.0" +serial_test = "0.6" diff --git a/plugins/c8y_log_plugin/src/config.rs b/plugins/c8y_log_plugin/src/config.rs new file mode 100644 index 00000000..580fba95 --- /dev/null +++ b/plugins/c8y_log_plugin/src/config.rs @@ -0,0 +1,101 @@ +use c8y_smartrest::topic::C8yTopic; +use mqtt_channel::Message; +use serde::Deserialize; +use std::{borrow::Borrow, path::Path}; +use std::{collections::HashSet, fs}; +use tracing::warn; + +#[derive(Deserialize, Debug, Eq, PartialEq, Default)] +#[serde(deny_unknown_fields)] +pub struct LogPluginConfig { + pub files: Vec<FileEntry>, +} + +#[derive(Deserialize, Debug, Eq, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct FileEntry { + pub(crate) path: String, + #[serde(rename = "type")] + pub config_type: String, +} + +impl PartialEq for FileEntry { + fn eq(&self, other: &Self) -> bool { + self.config_type == other.config_type + } +} + +impl Borrow<String> for FileEntry { + fn borrow(&self) -> &String { + &self.config_type + } +} + +impl LogPluginConfig { + pub fn new(config_file_path: &Path) -> Self { + let config = Self::read_config(config_file_path); + config + } + + pub fn read_config(path: &Path) -> Self { + let path_str = path.display().to_string(); + match fs::read_to_string(path) { + Ok(contents) => match toml::from_str(contents.as_str()) { + Ok(config) => config, + _ => { + warn!("The config file {} is malformed.", path_str); + Self::default() + } + }, + Err(_) => { + warn!( + "The config file {} does not exist or is not readable.", + path_str + ); + Self::default() + } + } + } + + pub fn to_supported_config_types_message(&self) -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + Ok(Message::new(&topic, self.to_smartrest_payload())) + } + + pub fn get_all_file_types(&self) -> Vec<String> { + self.files + .iter() + .map(|x| x.config_type.to_string()) + .collect::<HashSet<_>>() + .iter() + .map(|x| x.to_string()) + .collect::<Vec<_>>() + } + + // 118,typeA,typeB,... + fn to_smartrest_payload(&self) -> String { + let mut config_types = self.get_all_file_types(); + let () = config_types.sort(); + let supported_config_types = config_types.join(","); + format!("118,{supported_config_types}") + } +} + +#[test] +fn test_no_duplicated_file_types() { + let files = vec![ + FileEntry { + path: "a/path".to_string(), + config_type: "type_one".to_string(), + }, + FileEntry { + path: "some/path".to_string(), + config_type: "type_one".to_string(), + }, + ]; + let logs_config = LogPluginConfig { files: files }; + assert_eq!( + logs_config.get_all_file_types(), + vec!["type_one".to_string()] + ); +} diff --git a/plugins/c8y_log_plugin/src/error.rs b/plugins/c8y_log_plugin/src/error.rs new file mode 100644 index 00000000..045d4bf3 --- /dev/null +++ b/plugins/c8y_log_plugin/src/error.rs @@ -0,0 +1,8 @@ +#[derive(thiserror::Error, Debug)] +pub enum LogRetrievalError { + #[error(transparent)] + FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError), + + #[error(transparent)] + FromConfigSetting(#[from] tedge_config::ConfigSettingError), +} diff --git a/plugins/c8y_log_plugin/src/logfile_request.rs b/plugins/c8y_log_plugin/src/logfile_request.rs new file mode 100644 index 00000000..35867948 --- /dev/null +++ b/plugins/c8y_log_plugin/src/logfile_request.rs @@ -0,0 +1,171 @@ +use std::path::Path; + +use glob::glob; + +use crate::config::LogPluginConfig; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::{ + smartrest_deserializer::{get_datetime_from_file_path, SmartRestLogRequest}, + smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, + TryIntoOperationStatusMessage, + }, +}; +use mqtt_channel::{Connection, SinkExt}; + +pub struct LogfileRequest {} + +impl TryIntoOperationStatusMessage for LogfileRequest { + /// returns a c8y message specifying to set log status to executing. + /// + /// example message: '501,c8y_LogfileRequest' + fn status_executing() -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest() + } + + fn status_successful( + parameter: Option<String>, + ) -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(¶meter.unwrap()) + .to_smartrest() + } + + fn status_failed( + failure_reason: String, + ) -> Result< + c8y_smartrest::smartrest_serializer::SmartRest, + c8y_smartrest::error::SmartRestSerializerError, + > { + SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yLogFileRequest, + failure_reason, + ) + .to_smartrest() + } +} +/// Reads tedge logs according to `SmartRestLogRequest`. +/// +/// If needed, logs are concatenated. +/// +/// Logs are sorted alphanumerically from oldest to newest. +/// +/// # Examples +/// +/// ``` +/// let smartrest_obj = SmartRestLogRequest::from_smartrest( +/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", +/// ) +/// .unwrap(); +/// +/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap(); +/// ``` +pub fn read_tedge_logs( + smartrest_obj: &SmartRestLogRequest, + plugin_config_path: &Path, +) -> Result<String, anyhow::Error> { + let plugin_config = LogPluginConfig::new(&plugin_config_path); + let mut output = String::new(); + + let mut files_to_send = Vec::new(); + for files in &plugin_config.files { + let maybe_file_path = files.path.as_str(); // because it can be a glob pattern + let file_type = files.config_type.as_str(); + + if !file_type.eq(&smartrest_obj.log_type) { + continue; + } + + // NOTE: According to the glob documentation paths are yielded in alphabetical order hence re-ordering is no longer required see: + // https://github.com/thin-edge/thin-edge.io/blob/0320741b109f50d1b0f7cda44e33dc31ba04902d/plugins/log_request_plugin/src/smartrest.rs#L24 + for entry in glob(maybe_file_path)? { + let file_path = entry?; + if let Some(dt_from_file) = get_datetime_from_file_path(&file_path) { + if !(dt_from_file < smartrest_obj.date_from || dt_from_file > smartrest_obj.date_to) + { + files_to_send.push(file_path); + } + } else { + files_to_send.push(file_path); + } + } + } + + // loop sorted vector and push store log file to `output` + let mut line_counter: usize = 0; + for entry in files_to_send { + dbg!("files to read:", &entry); + let file_content = std::fs::read_to_string(&entry)?; + if file_content.is_empty() { + continue; + } + + // adding file header only if line_counter permits more lines to be added + match &entry.file_stem().and_then(|f| f.to_str()) { + Some(file_name) if line_counter < smartrest_obj.lines => { + output.push_str(&format!("filename: {}\n", file_name)); + } + _ => {} + } + + // split at new line delimiter ("\n") + let mut lines = file_content.lines().rev(); + while line_counter < smartrest_obj.lines { + if let Some(haystack) = lines.next() { + if let Some(needle) = &smartrest_obj.needle { + if haystack.contains(needle) { + output.push_str(&format!("{}\n", haystack)); + line_counter += 1; + } + } else { + output.push_str(&format!("{}\n", haystack)); + line_counter += 1; + } + } else { + // there are no lines.next() + break; + } + } + } + Ok(output) +} + +pub async fn handle_logfile_request_operation( + smartrest_request: &SmartRestLogRequest, + plugin_config_path: &Path, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + // executing + let executing = LogfileRequest::executing()?; + let () = mqtt_client.published.send(executing).await?; + + let log_content = read_tedge_logs(&smartrest_request, &plugin_config_path)?; + + let upload_event_url = http_client + .upload_log_binary(&smartrest_request.log_type, &log_content) + .await?; + + let successful = LogfileRequest::successful(Some(upload_event_url))?; + let () = mqtt_client.published.send(successful).await?; + + Ok(()) +} + +pub async fn handle_dynamic_log_type_update( + mqtt_client: &mut Connection, + config_dir: &Path, +) -> Result<(), anyhow::Error> { + let plugin_config = LogPluginConfig::new(config_dir); + let msg = plugin_config.to_supported_config_types_message()?; + let () = mqtt_client.published.send(msg).await?; + Ok(()) +} diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs new file mode 100644 index 00000000..a799a9b2 --- /dev/null +++ b/plugins/c8y_log_plugin/src/main.rs @@ -0,0 +1,232 @@ +mod config; +mod error; +mod logfile_request; + +use anyhow::Result; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRequestGeneric}; +use c8y_smartrest::topic::C8yTopic; +use clap::Parser; + +use inotify::{EventMask, EventStream}; +use inotify::{Inotify, WatchMask}; +use mqtt_channel::{Connection, StreamExt}; +use std::path::{Path, PathBuf}; +use tedge_config::{ + ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, + DEFAULT_TEDGE_CONFIG_PATH, +}; +use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use tracing::{error, info}; + +use crate::logfile_request::{handle_dynamic_log_type_update, handle_logfile_request_operation}; + +const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-log-plugin.toml"; +const AFTER_HELP_TEXT: &str = r#"On start, `c8y_log_plugin` notifies the cloud tenant of the log types listed in the `CONFIG_FILE`, sending this list with a `118` on `c8y/s/us`. +`c8y_log_plugin` subscribes then to `c8y/s/ds` listening for logfile operation requests (`522`) notifying the Cumulocity tenant of their progress (messages `501`, `502` and `503`). + +The thin-edge `CONFIG_DIR` is used to store: + * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrived"#; + +#[derive(Debug, clap::Parser, Clone)] +#[clap( +name = clap::crate_name!(), +version = clap::crate_version!(), +about = clap::crate_description!(), +after_help = AFTER_HELP_TEXT +)] +pub struct LogfileRequestPluginOpt { + /// Turn-on the debug log level. + /// + /// If off only reports ERROR, WARN, and INFO + /// If on also reports DEBUG and TRACE + #[clap(long)] + pub debug: bool, + + /// Create supported operation files + #[clap(short, long)] + pub init: bool, + + #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)] + pub config_dir: PathBuf, +} + +async fn create_mqtt_client( + tedge_config: &TEdgeConfig, +) -> Result<mqtt_channel::Connection, anyhow::Error> { + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(mqtt_port) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestRequest.as_str(), + )); + + let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + Ok(mqtt_client) +} + +pub async fn create_http_client( + tedge_config: &TEdgeConfig, +) -> Result<JwtAuthHttpProxy, anyhow::Error> { + let mut http_proxy = JwtAuthHttpProxy::try_new(tedge_config).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + +fn create_inofity_file_watch_stream( + config_file: &Path, +) -> Result<EventStream<[u8; 1024]>, anyhow::Error> { + let buffer = [0; 1024]; + let mut inotify = Inotify::init().expect("Error while initializing inotify instance"); + + inotify + .add_watch(&config_file, WatchMask::CLOSE_WRITE) + .expect("Failed to add file watch"); + + Ok(inotify.event_stream(buffer)?) +} + +async fn run( + config_file: &Path, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + + let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + + loop { + tokio::select! { + message = mqtt_client.received.next() => { + if let Some(message) = message { + if let Ok(payload) = message.payload_str() { + let result = match payload.split(',').next().unwrap_or_default() { + "522" => { + // retrieve smartrest object from payload + let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; + handle_logfile_request_operation( + &smartrest_obj, + &config_file, + mqtt_client, + http_client, + ) + .await + } + _ => { + // Ignore operation messages not meant for this plugin + Ok(()) + } + }; + + if let Err(err) = result { + error!("Handling of operation: '{}' failed with {}", payload, err); + } + } + } + else { + // message is None and the connection has been closed + return Ok(()); + } + } + Some(event_or_error) = inotify_stream.next() => { + if let Ok(event) = event_or_error { + match event.mask { + EventMask::CLOSE_WRITE => { + let () = handle_dynamic_log_type_update(mqtt_client, config_file).await?; + } + _ => {} + } + } + + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let config_plugin_opt = LogfileRequestPluginOpt::parse(); + let config_file = PathBuf::from(&format!( + "{}/{DEFAULT_PLUGIN_CONFIG_FILE}", + &config_plugin_opt + .config_dir + .to_str() + .unwrap_or(DEFAULT_TEDGE_CONFIG_PATH) + )); + + tedge_utils::logging::initialise_tracing_subscriber(config_plugin_opt.debug); + + // Load tedge config from the provided location + let tedge_config_location = + tedge_config::TEdgeConfigLocation::from_custom_root(&config_plugin_opt.config_dir); + let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()); + let tedge_config = config_repository.load()?; + + let logs_dir = tedge_config.query(LogPathSetting)?; + let logs_dir = PathBuf::from(logs_dir.to_string()); + + if config_plugin_opt.init { + let () = init(&config_plugin_opt.config_dir, &logs_dir)?; + return Ok(()); + } + + // Create required clients + let mut mqtt_client = create_mqtt_client(&tedge_config).await?; + let mut http_client = create_http_client(&tedge_config).await?; + + let () = run(&config_file, &mut mqtt_client, &mut http_client).await?; + Ok(()) +} + +fn init(config_dir: &PathBuf, logs_dir: &PathBuf) -> Result<(), anyhow::Error> { + info!("Creating supported operation files"); + let config_dir = config_dir.as_path().display().to_string(); + let logs_dir = logs_dir.as_path().display().to_string(); + let () = create_init_logs_directories_and_files(config_dir.as_str(), logs_dir.as_str())?; + Ok(()) +} + +/// for the log plugin to work the following directories and files are needed: +/// +/// Directories: +/// - LOGS_DIR/tedge/agent +/// - config_dir/operations/c8y +/// - CONFIG_DIR/c8y +/// +/// Files: +/// - config_dir/operations/c8y/c8y_LogfileRequest +/// - CONFIG_DIR/c8y/log/c8y-log-plugin.toml +fn create_init_logs_directories_and_files( + config_dir: &str, + logs_dir: &str, +) -> Result<(), anyhow::Error> { + // creating logs_dir + create_directory_with_user_group(&format!("{logs_dir}/tedge"), "tedge", "tedge", 0o755)?; + create_directory_with_user_group(&format!("{logs_dir}/tedge/agent"), "tedge", "tedge", 0o755)?; + // creating /operations/c8y directories + create_directory_with_user_group(&format!("{config_dir}/operations"), "tedge", "tedge", 0o755)?; + create_directory_with_user_group( + &format!("{config_dir}/operations/c8y"), + "tedge", + "tedge", + 0o755, + )?; + // creating c8y_LogfileRequest operation file + create_file_with_user_group( + &format!("{config_dir}/operations/c8y/c8y_LogfileRequest"), + "tedge", + "tedge", + 0o755, + )?; + // creating c8y directory + create_directory_with_user_group(&format!("{config_dir}/c8y"), "tedge", "tedge", 0o755)?; + // creating c8y-log-plugin.toml + // NOTE: file needs 775 permission or inotify can not watch for changes inside the file + create_file_with_user_group( + &format!("{config_dir}/{DEFAULT_PLUGIN_CONFIG_FILE}"), + "tedge", + "tedge", + 0o775, + )?; + Ok(()) +} |