diff options
author | initard <solo@softwareag.com> | 2022-04-26 19:12:31 +0100 |
---|---|---|
committer | initard <solo@softwareag.com> | 2022-05-03 17:54:08 +0100 |
commit | e27c8e758610166d50c2e1c7ebd64e709c5bf571 (patch) | |
tree | 822a079a5006f5c5dfb3401ae593e9686bead0ef /crates/core | |
parent | da4f34614fd832c0e8699745325688ca825b1313 (diff) |
operation logs improvement #1027
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core')
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_serializer.rs | 8 | ||||
-rw-r--r-- | crates/core/plugin_sm/src/operation_logs.rs | 112 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 180 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/error.rs | 7 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 19 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 9 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/error.rs | 3 |
9 files changed, 241 insertions, 103 deletions
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs index cd3867b5..5eabba04 100644 --- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs @@ -38,14 +38,14 @@ where #[derive(Debug, Deserialize, Serialize, PartialEq)] pub struct SmartRestSetSupportedLogType { pub message_id: &'static str, - pub supported_operations: Vec<&'static str>, + pub supported_operations: Vec<String>, } -impl Default for SmartRestSetSupportedLogType { - fn default() -> Self { +impl From<Vec<String>> for SmartRestSetSupportedLogType { + fn from(operation_types: Vec<String>) -> Self { Self { message_id: "118", - supported_operations: vec!["software-management"], + supported_operations: operation_types, } } } diff --git a/crates/core/plugin_sm/src/operation_logs.rs b/crates/core/plugin_sm/src/operation_logs.rs index aa768725..7bf08eab 100644 --- a/crates/core/plugin_sm/src/operation_logs.rs +++ b/crates/core/plugin_sm/src/operation_logs.rs @@ -1,6 +1,6 @@ -use std::cmp::Reverse; -use std::collections::BinaryHeap; use std::path::PathBuf; +use std::{cmp::Reverse, collections::HashMap}; +use std::{collections::BinaryHeap, path::Path}; use time::{format_description, OffsetDateTime}; use tracing::log; @@ -13,33 +13,29 @@ pub enum OperationLogsError { #[error(transparent)] FromTimeFormat(#[from] time::error::Format), + + #[error("Incorrect file format. Expected: `operation_name`-`timestamp`.log")] + FileFormatError, } #[derive(Debug)] pub struct OperationLogs { - log_dir: PathBuf, - max_update_logs: usize, - max_list_logs: usize, + pub log_dir: PathBuf, } pub enum LogKind { SoftwareUpdate, SoftwareList, - Operation, + Operation(String), } const UPDATE_PREFIX: &str = "software-update"; const LIST_PREFIX: &str = "software-list"; -const OPERATION_PREFIX: &str = "operation"; impl OperationLogs { pub fn try_new(log_dir: PathBuf) -> Result<OperationLogs, OperationLogsError> { std::fs::create_dir_all(log_dir.clone())?; - let operation_logs = OperationLogs { - log_dir, - max_update_logs: 5, - max_list_logs: 1, - }; + let operation_logs = OperationLogs { log_dir }; if let Err(err) = operation_logs.remove_outdated_logs() { // In no case a log-cleaning error should prevent the agent to run. @@ -50,11 +46,7 @@ impl OperationLogs { Ok(operation_logs) } - pub async fn new_log_file( - &self, - kind: LogKind, - operation_name: Option<&str>, - ) -> Result<LogFile, OperationLogsError> { + pub async fn new_log_file(&self, kind: LogKind) -> Result<LogFile, OperationLogsError> { if let Err(err) = self.remove_outdated_logs() { // In no case a log-cleaning error should prevent the agent to run. // Hence the error is logged but not returned. @@ -63,20 +55,12 @@ impl OperationLogs { let now = OffsetDateTime::now_utc(); - let operation_name = { - if let Some(operation_name) = operation_name { - let operation_name = &format!("{OPERATION_PREFIX}-{}", operation_name); - operation_name.to_string() - } else { - OPERATION_PREFIX.to_string() - } - }; - let file_prefix = match kind { LogKind::SoftwareUpdate => UPDATE_PREFIX, LogKind::SoftwareList => LIST_PREFIX, - LogKind::Operation => operation_name.as_str(), + LogKind::Operation(ref operation_name) => operation_name.as_str(), }; + let file_name = format!( "{}-{}.log", file_prefix, @@ -92,42 +76,64 @@ impl OperationLogs { } pub fn remove_outdated_logs(&self) -> Result<(), OperationLogsError> { - let mut update_logs = BinaryHeap::new(); - let mut list_logs = BinaryHeap::new(); + let mut log_tracker: HashMap<String, BinaryHeap<Reverse<String>>> = HashMap::new(); + + // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management" + // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077 for file in (self.log_dir.read_dir()?).flatten() { if let Some(path) = file.path().file_name().and_then(|name| name.to_str()) { - if path.starts_with(UPDATE_PREFIX) { - // The paths are pushed using the reverse alphabetic order - update_logs.push(Reverse(path.to_string())); - } else if path.starts_with(LIST_PREFIX) { - // The paths are pushed using the reverse alphabetic order - list_logs.push(Reverse(path.to_string())); + if path.starts_with("software-list") { + log_tracker + .entry("software-list".to_string()) + .or_insert_with(BinaryHeap::new) + .push(Reverse(path.to_string())); + } else if path.starts_with("software-update") { + log_tracker + .entry("software-update".to_string()) + .or_insert_with(BinaryHeap::new) + .push(Reverse(path.to_string())); + } else { + let file_name = path + .split('-') + .next() + .ok_or(OperationLogsError::FileFormatError)?; + log_tracker + .entry(file_name.to_string()) + .or_insert_with(BinaryHeap::new) + .push(Reverse(path.to_string())); } } } - while update_logs.len() > self.max_update_logs { - if let Some(rname) = update_logs.pop() { - let name = rname.0; - let path = self.log_dir.join(name.clone()); - if let Err(err) = std::fs::remove_file(&path) { - log::warn!("Fail to remove out-dated log file {} : {}", name, err); - } + for (key, value) in log_tracker.iter_mut() { + if key.starts_with("software-list") { + // only allow one update list file in logs + let () = remove_old_logs(value, &self.log_dir, 1)?; + } else { + // allow most recent five + let () = remove_old_logs(value, &self.log_dir, 5)?; } } - while list_logs.len() > self.max_list_logs { - if let Some(rname) = list_logs.pop() { - let name = rname.0; - let path = self.log_dir.join(name.clone()); - if let Err(err) = std::fs::remove_file(&path) { - log::warn!("Fail to remove out-dated log file {} : {}", name, err); - } + Ok(()) + } +} + +fn remove_old_logs( + log_tracker: &mut BinaryHeap<Reverse<String>>, + dir_path: &Path, + n: usize, +) -> Result<(), OperationLogsError> { + while log_tracker.len() > n { + if let Some(rname) = log_tracker.pop() { + let name = rname.0; + let path = dir_path.join(name.clone()); + if let Err(err) = std::fs::remove_file(&path) { + log::warn!("Fail to remove out-dated log file {} : {}", name, err); } } - - Ok(()) } + Ok(()) } #[cfg(test)] @@ -197,9 +203,7 @@ mod tests { let update_log_7 = create_file(log_dir.path(), "software-update-1996-12-25T16:39:57z"); // Create a new log file - let new_log = operation_logs - .new_log_file(LogKind::SoftwareUpdate, None) - .await?; + let new_log = operation_logs.new_log_file(LogKind::SoftwareUpdate).await?; // The new log has been created let new_path = Path::new(new_log.path()); diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 77bc6429..b7d946b0 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -413,7 +413,7 @@ impl SmAgent { let response = match self .operation_logs - .new_log_file(LogKind::SoftwareList, None) + .new_log_file(LogKind::SoftwareList) .await { Ok(log_file) => plugins.lock().await.list(&request, log_file).await, @@ -476,7 +476,7 @@ impl SmAgent { let response = match self .operation_logs - .new_log_file(LogKind::SoftwareUpdate, None) + .new_log_file(LogKind::SoftwareUpdate) .await { Ok(log_file) => { diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index cf96048c..34e8e8da 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -37,8 +37,10 @@ csv = "1.1" download = { path = "../../common/download" } flockfile = { path = "../../common/flockfile" } futures = "0.3" +logged_command = { path = "../../common/logged_command" } mockall = "0.11" mqtt_channel = { path = "../../common/mqtt_channel" } +plugin_sm = { path = "../plugin_sm" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index d71e12a0..e830a7a4 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -23,14 +23,17 @@ use c8y_smartrest::{ }, }; use c8y_translator::json; + +use logged_command::LoggedCommand; use mqtt_channel::{Message, Topic, TopicFilter}; +use plugin_sm::operation_logs::{OperationLogs, OperationLogsError}; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - fs::File, + fs::{self, File}, io::Read, - path::Path, - process::Stdio, + path::{Path, PathBuf}, }; +use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting}; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; use time::format_description::well_known::Rfc3339; use tracing::{debug, info, log::error}; @@ -51,6 +54,7 @@ const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; +const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; const CREATE_EVENT_SMARTREST_CODE: u16 = 400; @@ -66,6 +70,7 @@ where device_type: String, alarm_converter: AlarmConverter, operations: Operations, + operation_logs: OperationLogs, http_proxy: Proxy, } @@ -79,7 +84,7 @@ where device_type: String, operations: Operations, http_proxy: Proxy, - ) -> Self { + ) -> Result<Self, CumulocityMapperError> { let mut topic_filter: TopicFilter = vec![ "tedge/measurements", "tedge/measurements/+", @@ -102,7 +107,14 @@ where let children: HashSet<String> = HashSet::new(); - CumulocityConverter { + let tedge_config = get_tedge_config()?; + let logs_path = tedge_config.query(LogPathSetting)?; + + let log_dir = PathBuf::from(&format!("{}/{TEDGE_AGENT_LOG_DIR}", logs_path)); + + let operation_logs = OperationLogs::try_new(log_dir)?; + + Ok(CumulocityConverter { size_threshold, children, mapper_config, @@ -110,8 +122,60 @@ where device_type, alarm_converter, operations, + operation_logs, http_proxy, - } + }) + } + + #[cfg(test)] + pub fn from_logs_path( + size_threshold: SizeThreshold, + device_name: String, + device_type: String, + operations: Operations, + http_proxy: Proxy, + logs_path: PathBuf, + ) -> Result<Self, CumulocityMapperError> { + let mut topic_filter: TopicFilter = vec![ + "tedge/measurements", + "tedge/measurements/+", + "tedge/alarms/+/+", + "c8y-internal/alarms/+/+", + "tedge/events/+", + ] + .try_into() + .expect("topics that mapper should subscribe to"); + + let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap()); + + let mapper_config = MapperConfig { + in_topic_filter: topic_filter, + out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"), + errors_topic: make_valid_topic_or_panic("tedge/errors"), + }; + + let alarm_converter = AlarmConverter::new(); + + let children: HashSet<String> = HashSet::new(); + + let log_dir = PathBuf::from(&format!( + "{}/{TEDGE_AGENT_LOG_DIR}", + logs_path.to_str().unwrap() + )); + + let operation_logs = OperationLogs::try_new(log_dir)?; + + Ok(CumulocityConverter { + size_threshold, + children, + mapper_config, + device_name, + device_type, + alarm_converter, + operations, + operation_logs, + http_proxy, + }) } fn try_convert_measurement( @@ -244,8 +308,13 @@ where Ok(publish_restart_operation_status(message.payload_str()?).await?) } Ok(MapperSubscribeTopic::C8yTopic(_)) => { - debug!("Cumulocity"); - parse_c8y_topics(message, &self.operations, &mut self.http_proxy).await + parse_c8y_topics( + message, + &self.operations, + &mut self.http_proxy, + &self.operation_logs, + ) + .await } _ => Err(ConversionError::UnsupportedTopic( message.topic.name.clone(), @@ -263,7 +332,10 @@ where &self.device_name, &self.device_type, )); - let supported_log_types_message = self.wrap_error(create_supported_log_types_message()); + + let supported_log_types_message = self.wrap_error(create_supported_log_types_message( + &self.operation_logs.log_dir, + )); let pending_operations_message = self.wrap_error(create_get_pending_operations_message()); let software_list_message = self.wrap_error(create_get_software_list_message()); @@ -288,8 +360,16 @@ async fn parse_c8y_topics( message: &Message, operations: &Operations, http_proxy: &mut impl C8YHttpProxy, + operation_logs: &OperationLogs, ) -> Result<Vec<Message>, ConversionError> { - match process_smartrest(message.payload_str()?, operations, http_proxy).await { + match process_smartrest( + message.payload_str()?, + operations, + http_proxy, + operation_logs, + ) + .await + { Err( ref err @ CumulocityMapperError::FromSmartRestDeserializer( SmartRestDeserializerError::InvalidParameter { ref operation, .. }, @@ -476,8 +556,35 @@ fn create_get_pending_operations_message() -> Result<Message, ConversionError> { Ok(Message::new(&topic, payload)) } -fn create_supported_log_types_message() -> Result<Message, ConversionError> { - let payload = SmartRestSetSupportedLogType::default().to_smartrest()?; +fn get_supported_log_types(log_dir: &Path) -> Result<Vec<String>, ConversionError> { + let mut result = HashSet::new(); + + let paths = fs::read_dir(&log_dir).unwrap(); + for path in paths { + let path = path?; + if fs::metadata(path.path())?.is_file() { + let file_name = path.file_name(); + let file_name = file_name + .to_str() + .ok_or(OperationLogsError::FileFormatError)?; + + // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management" + // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077 + if file_name.starts_with("software-list") | file_name.starts_with("software-update") { + result.insert("software-management".to_string()); + } else { + let log_type = file_name.split('-').next(); + let log_type = log_type.ok_or(OperationLogsError::FileFormatError)?; + result.insert(log_type.to_string()); + } + } + } + Ok(Vec::from_iter(result)) +} + +fn create_supported_log_types_message(log_dir: &Path) -> Result<Message, ConversionError> { + let supported_operation_types = get_supported_log_types(log_dir)?; + let payload = SmartRestSetSupportedLogType::from(supported_operation_types).to_smartrest()?; let topic = C8yTopic::SmartRestResponse.to_topic()?; Ok(Message::new(&topic, payload)) } @@ -588,15 +695,15 @@ async fn execute_operation( payload: &str, command: &str, operation_name: &str, + operation_logs: &OperationLogs, ) -> Result<(), CumulocityMapperError> { let command = command.to_owned(); let payload = payload.to_string(); - let child = tokio::process::Command::new(&command) - .args(&[&payload]) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) + let mut logged = LoggedCommand::new(&command); + logged.arg(&payload); + + let child = logged .spawn() .map_err(|e| CumulocityMapperError::ExecuteFailed { error_message: e.to_string(), @@ -604,29 +711,35 @@ async fn execute_operation( operation_name: operation_name.to_string(), }); + let mut log_file = operation_logs + .new_log_file(plugin_sm::operation_logs::LogKind::Operation( + operation_name.to_string(), + )) + .await?; + match child { - Ok(mut child) => { - let _handle = tokio::spawn(async move { - let _ = child.wait().await; + Ok(child) => { + tokio::spawn(async move { + let logger = log_file.buffer(); + let _result = child.wait_with_output(logger).await.unwrap(); }); - return Ok(()); - } - Err(err) => { - return Err(err); + Ok(()) } - }; + Err(err) => Err(err), + } } async fn process_smartrest( payload: &str, operations: &Operations, http_proxy: &mut impl C8YHttpProxy, + operation_logs: &OperationLogs, ) -> Result<Vec<Message>, CumulocityMapperError> { let message_id: &str = &payload[..3]; match message_id { "528" => forward_software_request(payload, http_proxy).await, "510" => forward_restart_request(payload), - template => forward_operation_request(payload, template, operations).await, + template => forward_operation_request(payload, template, operations, operation_logs).await, } } @@ -677,11 +790,13 @@ async fn forward_operation_request( payload: &str, template: &str, operations: &Operations, + operation_logs: &OperationLogs, ) -> Result<Vec<Message>, CumulocityMapperError> { match operations.matching_smartrest_template(template) { Some(operation) => { if let Some(command) = operation.command() { - execute_operation(payload, command.as_str(), &operation.name).await?; + execute_operation(payload, command.as_str(), &operation.name, operation_logs) + .await?; } Ok(vec![]) } @@ -739,14 +854,21 @@ pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, Conversion } } +#[cfg(test)] mod tests { + use plugin_sm::operation_logs::OperationLogs; + use tempfile::TempDir; + #[tokio::test] async fn test_execute_operation_is_not_blocked() { + let log_dir = TempDir::new().unwrap(); + let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf()).unwrap(); + let now = std::time::Instant::now(); - let () = super::execute_operation("5", "sleep", "sleep_one") + let () = super::execute_operation("5", "sleep", "sleep_one", &operation_logs) .await .unwrap(); - let () = super::execute_operation("5", "sleep", "sleep_two") + let () = super::execute_operation("5", "sleep", "sleep_two", &operation_logs) .await .unwrap(); diff --git a/crates/core/tedge_mapper/src/c8y/error.rs b/crates/core/tedge_mapper/src/c8y/error.rs index add72d82..6a4f0a71 100644 --- a/crates/core/tedge_mapper/src/c8y/error.rs +++ b/crates/core/tedge_mapper/src/c8y/error.rs @@ -1,6 +1,7 @@ use c8y_smartrest::error::{ SMCumulocityMapperError, SmartRestDeserializerError, SmartRestSerializerError, }; +use plugin_sm::operation_logs::OperationLogsError; #[derive(thiserror::Error, Debug)] #[allow(clippy::enum_variant_names)] @@ -50,4 +51,10 @@ pub enum CumulocityMapperError { #[error("An unknown operation template: {0}")] UnknownOperation(String), + + #[error(transparent)] + FromOperationLogs(#[from] OperationLogsError), + + #[error(transparent)] + TedgeConfig(#[from] tedge_config::TEdgeConfigError), } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 3842b3a5..b5f9c275 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -77,7 +77,7 @@ impl TEdgeComponent for CumulocityMapper { device_type, operations, http_proxy, - )); + )?); let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; @@ -180,13 +180,16 @@ mod tests { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); let operations = Operations::default(); - let converter = Box::new(CumulocityConverter::new( - size_threshold, - DEVICE_NAME.into(), - DEVICE_TYPE.into(), - operations, - proxy, - )); + let converter = Box::new( + CumulocityConverter::new( + size_threshold, + DEVICE_NAME.into(), + DEVICE_TYPE.into(), + operations, + proxy, + ) + .unwrap(), + ); let broker = test_mqtt_broker(); diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index b591672e..03477298 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -60,7 +60,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 mqtt_tests::assert_received_all_expected( &mut messages, TEST_TIMEOUT_MS, - &["118,software-management\n", "500\n"], + &["software-management", "500\n"], ) .await; @@ -299,11 +299,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result mqtt_tests::assert_received_all_expected( &mut responses, TEST_TIMEOUT_MS, - &[ - "118,software-management\n", - "500\n", - "503,c8y_SoftwareUpdate,\n", - ], + &["software-management", "500\n", "503,c8y_SoftwareUpdate,\n"], ) .await; @@ -950,6 +946,7 @@ fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> { operations, http_proxy, ) + .unwrap() } fn remove_whitespace(s: &str) -> String { diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs index 743fd477..4280c738 100644 --- a/crates/core/tedge_mapper/src/core/error.rs +++ b/crates/core/tedge_mapper/src/core/error.rs @@ -98,4 +98,7 @@ pub enum ConversionError { actual_size: usize, threshold: usize, }, + + #[error(transparent)] + FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError), } |