From e27c8e758610166d50c2e1c7ebd64e709c5bf571 Mon Sep 17 00:00:00 2001 From: initard Date: Tue, 26 Apr 2022 19:12:31 +0100 Subject: operation logs improvement #1027 Signed-off-by: initard --- crates/core/tedge_mapper/src/c8y/converter.rs | 180 +++++++++++++++++++++----- crates/core/tedge_mapper/src/c8y/error.rs | 7 + crates/core/tedge_mapper/src/c8y/mapper.rs | 19 +-- crates/core/tedge_mapper/src/c8y/tests.rs | 9 +- crates/core/tedge_mapper/src/core/error.rs | 3 + 5 files changed, 175 insertions(+), 43 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index d71e12a0..e830a7a4 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -23,14 +23,17 @@ use c8y_smartrest::{ }, }; use c8y_translator::json; + +use logged_command::LoggedCommand; use mqtt_channel::{Message, Topic, TopicFilter}; +use plugin_sm::operation_logs::{OperationLogs, OperationLogsError}; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - fs::File, + fs::{self, File}, io::Read, - path::Path, - process::Stdio, + path::{Path, PathBuf}, }; +use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting}; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; use time::format_description::well_known::Rfc3339; use tracing::{debug, info, log::error}; @@ -51,6 +54,7 @@ const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; +const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; const CREATE_EVENT_SMARTREST_CODE: u16 = 400; @@ -66,6 +70,7 @@ where device_type: String, alarm_converter: AlarmConverter, operations: Operations, + operation_logs: OperationLogs, http_proxy: Proxy, } @@ -79,7 +84,7 @@ where device_type: String, operations: Operations, http_proxy: Proxy, - ) -> Self { + ) -> Result { let mut topic_filter: TopicFilter = vec![ "tedge/measurements", "tedge/measurements/+", @@ -102,7 +107,14 @@ where let children: HashSet = HashSet::new(); - CumulocityConverter { + let tedge_config = get_tedge_config()?; + let logs_path = tedge_config.query(LogPathSetting)?; + + let log_dir = PathBuf::from(&format!("{}/{TEDGE_AGENT_LOG_DIR}", logs_path)); + + let operation_logs = OperationLogs::try_new(log_dir)?; + + Ok(CumulocityConverter { size_threshold, children, mapper_config, @@ -110,8 +122,60 @@ where device_type, alarm_converter, operations, + operation_logs, http_proxy, - } + }) + } + + #[cfg(test)] + pub fn from_logs_path( + size_threshold: SizeThreshold, + device_name: String, + device_type: String, + operations: Operations, + http_proxy: Proxy, + logs_path: PathBuf, + ) -> Result { + let mut topic_filter: TopicFilter = vec![ + "tedge/measurements", + "tedge/measurements/+", + "tedge/alarms/+/+", + "c8y-internal/alarms/+/+", + "tedge/events/+", + ] + .try_into() + .expect("topics that mapper should subscribe to"); + + let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap()); + + let mapper_config = MapperConfig { + in_topic_filter: topic_filter, + out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"), + errors_topic: make_valid_topic_or_panic("tedge/errors"), + }; + + let alarm_converter = AlarmConverter::new(); + + let children: HashSet = HashSet::new(); + + let log_dir = PathBuf::from(&format!( + "{}/{TEDGE_AGENT_LOG_DIR}", + logs_path.to_str().unwrap() + )); + + let operation_logs = OperationLogs::try_new(log_dir)?; + + Ok(CumulocityConverter { + size_threshold, + children, + mapper_config, + device_name, + device_type, + alarm_converter, + operations, + operation_logs, + http_proxy, + }) } fn try_convert_measurement( @@ -244,8 +308,13 @@ where Ok(publish_restart_operation_status(message.payload_str()?).await?) } Ok(MapperSubscribeTopic::C8yTopic(_)) => { - debug!("Cumulocity"); - parse_c8y_topics(message, &self.operations, &mut self.http_proxy).await + parse_c8y_topics( + message, + &self.operations, + &mut self.http_proxy, + &self.operation_logs, + ) + .await } _ => Err(ConversionError::UnsupportedTopic( message.topic.name.clone(), @@ -263,7 +332,10 @@ where &self.device_name, &self.device_type, )); - let supported_log_types_message = self.wrap_error(create_supported_log_types_message()); + + let supported_log_types_message = self.wrap_error(create_supported_log_types_message( + &self.operation_logs.log_dir, + )); let pending_operations_message = self.wrap_error(create_get_pending_operations_message()); let software_list_message = self.wrap_error(create_get_software_list_message()); @@ -288,8 +360,16 @@ async fn parse_c8y_topics( message: &Message, operations: &Operations, http_proxy: &mut impl C8YHttpProxy, + operation_logs: &OperationLogs, ) -> Result, ConversionError> { - match process_smartrest(message.payload_str()?, operations, http_proxy).await { + match process_smartrest( + message.payload_str()?, + operations, + http_proxy, + operation_logs, + ) + .await + { Err( ref err @ CumulocityMapperError::FromSmartRestDeserializer( SmartRestDeserializerError::InvalidParameter { ref operation, .. }, @@ -476,8 +556,35 @@ fn create_get_pending_operations_message() -> Result { Ok(Message::new(&topic, payload)) } -fn create_supported_log_types_message() -> Result { - let payload = SmartRestSetSupportedLogType::default().to_smartrest()?; +fn get_supported_log_types(log_dir: &Path) -> Result, ConversionError> { + let mut result = HashSet::new(); + + let paths = fs::read_dir(&log_dir).unwrap(); + for path in paths { + let path = path?; + if fs::metadata(path.path())?.is_file() { + let file_name = path.file_name(); + let file_name = file_name + .to_str() + .ok_or(OperationLogsError::FileFormatError)?; + + // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management" + // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077 + if file_name.starts_with("software-list") | file_name.starts_with("software-update") { + result.insert("software-management".to_string()); + } else { + let log_type = file_name.split('-').next(); + let log_type = log_type.ok_or(OperationLogsError::FileFormatError)?; + result.insert(log_type.to_string()); + } + } + } + Ok(Vec::from_iter(result)) +} + +fn create_supported_log_types_message(log_dir: &Path) -> Result { + let supported_operation_types = get_supported_log_types(log_dir)?; + let payload = SmartRestSetSupportedLogType::from(supported_operation_types).to_smartrest()?; let topic = C8yTopic::SmartRestResponse.to_topic()?; Ok(Message::new(&topic, payload)) } @@ -588,15 +695,15 @@ async fn execute_operation( payload: &str, command: &str, operation_name: &str, + operation_logs: &OperationLogs, ) -> Result<(), CumulocityMapperError> { let command = command.to_owned(); let payload = payload.to_string(); - let child = tokio::process::Command::new(&command) - .args(&[&payload]) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) + let mut logged = LoggedCommand::new(&command); + logged.arg(&payload); + + let child = logged .spawn() .map_err(|e| CumulocityMapperError::ExecuteFailed { error_message: e.to_string(), @@ -604,29 +711,35 @@ async fn execute_operation( operation_name: operation_name.to_string(), }); + let mut log_file = operation_logs + .new_log_file(plugin_sm::operation_logs::LogKind::Operation( + operation_name.to_string(), + )) + .await?; + match child { - Ok(mut child) => { - let _handle = tokio::spawn(async move { - let _ = child.wait().await; + Ok(child) => { + tokio::spawn(async move { + let logger = log_file.buffer(); + let _result = child.wait_with_output(logger).await.unwrap(); }); - return Ok(()); - } - Err(err) => { - return Err(err); + Ok(()) } - }; + Err(err) => Err(err), + } } async fn process_smartrest( payload: &str, operations: &Operations, http_proxy: &mut impl C8YHttpProxy, + operation_logs: &OperationLogs, ) -> Result, CumulocityMapperError> { let message_id: &str = &payload[..3]; match message_id { "528" => forward_software_request(payload, http_proxy).await, "510" => forward_restart_request(payload), - template => forward_operation_request(payload, template, operations).await, + template => forward_operation_request(payload, template, operations, operation_logs).await, } } @@ -677,11 +790,13 @@ async fn forward_operation_request( payload: &str, template: &str, operations: &Operations, + operation_logs: &OperationLogs, ) -> Result, CumulocityMapperError> { match operations.matching_smartrest_template(template) { Some(operation) => { if let Some(command) = operation.command() { - execute_operation(payload, command.as_str(), &operation.name).await?; + execute_operation(payload, command.as_str(), &operation.name, operation_logs) + .await?; } Ok(vec![]) } @@ -739,14 +854,21 @@ pub fn get_child_id_from_topic(topic: &str) -> Result, Conversion } } +#[cfg(test)] mod tests { + use plugin_sm::operation_logs::OperationLogs; + use tempfile::TempDir; + #[tokio::test] async fn test_execute_operation_is_not_blocked() { + let log_dir = TempDir::new().unwrap(); + let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf()).unwrap(); + let now = std::time::Instant::now(); - let () = super::execute_operation("5", "sleep", "sleep_one") + let () = super::execute_operation("5", "sleep", "sleep_one", &operation_logs) .await .unwrap(); - let () = super::execute_operation("5", "sleep", "sleep_two") + let () = super::execute_operation("5", "sleep", "sleep_two", &operation_logs) .await .unwrap(); diff --git a/crates/core/tedge_mapper/src/c8y/error.rs b/crates/core/tedge_mapper/src/c8y/error.rs index add72d82..6a4f0a71 100644 --- a/crates/core/tedge_mapper/src/c8y/error.rs +++ b/crates/core/tedge_mapper/src/c8y/error.rs @@ -1,6 +1,7 @@ use c8y_smartrest::error::{ SMCumulocityMapperError, SmartRestDeserializerError, SmartRestSerializerError, }; +use plugin_sm::operation_logs::OperationLogsError; #[derive(thiserror::Error, Debug)] #[allow(clippy::enum_variant_names)] @@ -50,4 +51,10 @@ pub enum CumulocityMapperError { #[error("An unknown operation template: {0}")] UnknownOperation(String), + + #[error(transparent)] + FromOperationLogs(#[from] OperationLogsError), + + #[error(transparent)] + TedgeConfig(#[from] tedge_config::TEdgeConfigError), } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 3842b3a5..b5f9c275 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -77,7 +77,7 @@ impl TEdgeComponent for CumulocityMapper { device_type, operations, http_proxy, - )); + )?); let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; @@ -180,13 +180,16 @@ mod tests { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); let operations = Operations::default(); - let converter = Box::new(CumulocityConverter::new( - size_threshold, - DEVICE_NAME.into(), - DEVICE_TYPE.into(), - operations, - proxy, - )); + let converter = Box::new( + CumulocityConverter::new( + size_threshold, + DEVICE_NAME.into(), + DEVICE_TYPE.into(), + operations, + proxy, + ) + .unwrap(), + ); let broker = test_mqtt_broker(); diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index b591672e..03477298 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -60,7 +60,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 mqtt_tests::assert_received_all_expected( &mut messages, TEST_TIMEOUT_MS, - &["118,software-management\n", "500\n"], + &["software-management", "500\n"], ) .await; @@ -299,11 +299,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result mqtt_tests::assert_received_all_expected( &mut responses, TEST_TIMEOUT_MS, - &[ - "118,software-management\n", - "500\n", - "503,c8y_SoftwareUpdate,\n", - ], + &["software-management", "500\n", "503,c8y_SoftwareUpdate,\n"], ) .await; @@ -950,6 +946,7 @@ fn create_c8y_converter() -> CumulocityConverter { operations, http_proxy, ) + .unwrap() } fn remove_whitespace(s: &str) -> String { diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs index 743fd477..4280c738 100644 --- a/crates/core/tedge_mapper/src/core/error.rs +++ b/crates/core/tedge_mapper/src/core/error.rs @@ -98,4 +98,7 @@ pub enum ConversionError { actual_size: usize, threshold: usize, }, + + #[error(transparent)] + FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError), } -- cgit v1.2.3