summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-04-26 19:12:31 +0100
committerinitard <solo@softwareag.com>2022-05-03 17:54:08 +0100
commite27c8e758610166d50c2e1c7ebd64e709c5bf571 (patch)
tree822a079a5006f5c5dfb3401ae593e9686bead0ef /crates/core/tedge_mapper/src
parentda4f34614fd832c0e8699745325688ca825b1313 (diff)
operation logs improvement #1027
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs180
-rw-r--r--crates/core/tedge_mapper/src/c8y/error.rs7
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs19
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs9
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs3
5 files changed, 175 insertions, 43 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index d71e12a0..e830a7a4 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -23,14 +23,17 @@ use c8y_smartrest::{
},
};
use c8y_translator::json;
+
+use logged_command::LoggedCommand;
use mqtt_channel::{Message, Topic, TopicFilter};
+use plugin_sm::operation_logs::{OperationLogs, OperationLogsError};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
- fs::File,
+ fs::{self, File},
io::Read,
- path::Path,
- process::Stdio,
+ path::{Path, PathBuf},
};
+use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
use time::format_description::well_known::Rfc3339;
use tracing::{debug, info, log::error};
@@ -51,6 +54,7 @@ const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
+const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
const CREATE_EVENT_SMARTREST_CODE: u16 = 400;
@@ -66,6 +70,7 @@ where
device_type: String,
alarm_converter: AlarmConverter,
operations: Operations,
+ operation_logs: OperationLogs,
http_proxy: Proxy,
}
@@ -79,7 +84,7 @@ where
device_type: String,
operations: Operations,
http_proxy: Proxy,
- ) -> Self {
+ ) -> Result<Self, CumulocityMapperError> {
let mut topic_filter: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
@@ -102,7 +107,14 @@ where
let children: HashSet<String> = HashSet::new();
- CumulocityConverter {
+ let tedge_config = get_tedge_config()?;
+ let logs_path = tedge_config.query(LogPathSetting)?;
+
+ let log_dir = PathBuf::from(&format!("{}/{TEDGE_AGENT_LOG_DIR}", logs_path));
+
+ let operation_logs = OperationLogs::try_new(log_dir)?;
+
+ Ok(CumulocityConverter {
size_threshold,
children,
mapper_config,
@@ -110,8 +122,60 @@ where
device_type,
alarm_converter,
operations,
+ operation_logs,
http_proxy,
- }
+ })
+ }
+
+ #[cfg(test)]
+ pub fn from_logs_path(
+ size_threshold: SizeThreshold,
+ device_name: String,
+ device_type: String,
+ operations: Operations,
+ http_proxy: Proxy,
+ logs_path: PathBuf,
+ ) -> Result<Self, CumulocityMapperError> {
+ let mut topic_filter: TopicFilter = vec![
+ "tedge/measurements",
+ "tedge/measurements/+",
+ "tedge/alarms/+/+",
+ "c8y-internal/alarms/+/+",
+ "tedge/events/+",
+ ]
+ .try_into()
+ .expect("topics that mapper should subscribe to");
+
+ let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap());
+
+ let mapper_config = MapperConfig {
+ in_topic_filter: topic_filter,
+ out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"),
+ errors_topic: make_valid_topic_or_panic("tedge/errors"),
+ };
+
+ let alarm_converter = AlarmConverter::new();
+
+ let children: HashSet<String> = HashSet::new();
+
+ let log_dir = PathBuf::from(&format!(
+ "{}/{TEDGE_AGENT_LOG_DIR}",
+ logs_path.to_str().unwrap()
+ ));
+
+ let operation_logs = OperationLogs::try_new(log_dir)?;
+
+ Ok(CumulocityConverter {
+ size_threshold,
+ children,
+ mapper_config,
+ device_name,
+ device_type,
+ alarm_converter,
+ operations,
+ operation_logs,
+ http_proxy,
+ })
}
fn try_convert_measurement(
@@ -244,8 +308,13 @@ where
Ok(publish_restart_operation_status(message.payload_str()?).await?)
}
Ok(MapperSubscribeTopic::C8yTopic(_)) => {
- debug!("Cumulocity");
- parse_c8y_topics(message, &self.operations, &mut self.http_proxy).await
+ parse_c8y_topics(
+ message,
+ &self.operations,
+ &mut self.http_proxy,
+ &self.operation_logs,
+ )
+ .await
}
_ => Err(ConversionError::UnsupportedTopic(
message.topic.name.clone(),
@@ -263,7 +332,10 @@ where
&self.device_name,
&self.device_type,
));
- let supported_log_types_message = self.wrap_error(create_supported_log_types_message());
+
+ let supported_log_types_message = self.wrap_error(create_supported_log_types_message(
+ &self.operation_logs.log_dir,
+ ));
let pending_operations_message = self.wrap_error(create_get_pending_operations_message());
let software_list_message = self.wrap_error(create_get_software_list_message());
@@ -288,8 +360,16 @@ async fn parse_c8y_topics(
message: &Message,
operations: &Operations,
http_proxy: &mut impl C8YHttpProxy,
+ operation_logs: &OperationLogs,
) -> Result<Vec<Message>, ConversionError> {
- match process_smartrest(message.payload_str()?, operations, http_proxy).await {
+ match process_smartrest(
+ message.payload_str()?,
+ operations,
+ http_proxy,
+ operation_logs,
+ )
+ .await
+ {
Err(
ref err @ CumulocityMapperError::FromSmartRestDeserializer(
SmartRestDeserializerError::InvalidParameter { ref operation, .. },
@@ -476,8 +556,35 @@ fn create_get_pending_operations_message() -> Result<Message, ConversionError> {
Ok(Message::new(&topic, payload))
}
-fn create_supported_log_types_message() -> Result<Message, ConversionError> {
- let payload = SmartRestSetSupportedLogType::default().to_smartrest()?;
+fn get_supported_log_types(log_dir: &Path) -> Result<Vec<String>, ConversionError> {
+ let mut result = HashSet::new();
+
+ let paths = fs::read_dir(&log_dir).unwrap();
+ for path in paths {
+ let path = path?;
+ if fs::metadata(path.path())?.is_file() {
+ let file_name = path.file_name();
+ let file_name = file_name
+ .to_str()
+ .ok_or(OperationLogsError::FileFormatError)?;
+
+ // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management"
+ // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077
+ if file_name.starts_with("software-list") | file_name.starts_with("software-update") {
+ result.insert("software-management".to_string());
+ } else {
+ let log_type = file_name.split('-').next();
+ let log_type = log_type.ok_or(OperationLogsError::FileFormatError)?;
+ result.insert(log_type.to_string());
+ }
+ }
+ }
+ Ok(Vec::from_iter(result))
+}
+
+fn create_supported_log_types_message(log_dir: &Path) -> Result<Message, ConversionError> {
+ let supported_operation_types = get_supported_log_types(log_dir)?;
+ let payload = SmartRestSetSupportedLogType::from(supported_operation_types).to_smartrest()?;
let topic = C8yTopic::SmartRestResponse.to_topic()?;
Ok(Message::new(&topic, payload))
}
@@ -588,15 +695,15 @@ async fn execute_operation(
payload: &str,
command: &str,
operation_name: &str,
+ operation_logs: &OperationLogs,
) -> Result<(), CumulocityMapperError> {
let command = command.to_owned();
let payload = payload.to_string();
- let child = tokio::process::Command::new(&command)
- .args(&[&payload])
- .stdin(Stdio::null())
- .stdout(Stdio::null())
- .stderr(Stdio::null())
+ let mut logged = LoggedCommand::new(&command);
+ logged.arg(&payload);
+
+ let child = logged
.spawn()
.map_err(|e| CumulocityMapperError::ExecuteFailed {
error_message: e.to_string(),
@@ -604,29 +711,35 @@ async fn execute_operation(
operation_name: operation_name.to_string(),
});
+ let mut log_file = operation_logs
+ .new_log_file(plugin_sm::operation_logs::LogKind::Operation(
+ operation_name.to_string(),
+ ))
+ .await?;
+
match child {
- Ok(mut child) => {
- let _handle = tokio::spawn(async move {
- let _ = child.wait().await;
+ Ok(child) => {
+ tokio::spawn(async move {
+ let logger = log_file.buffer();
+ let _result = child.wait_with_output(logger).await.unwrap();
});
- return Ok(());
- }
- Err(err) => {
- return Err(err);
+ Ok(())
}
- };
+ Err(err) => Err(err),
+ }
}
async fn process_smartrest(
payload: &str,
operations: &Operations,
http_proxy: &mut impl C8YHttpProxy,
+ operation_logs: &OperationLogs,
) -> Result<Vec<Message>, CumulocityMapperError> {
let message_id: &str = &payload[..3];
match message_id {
"528" => forward_software_request(payload, http_proxy).await,
"510" => forward_restart_request(payload),
- template => forward_operation_request(payload, template, operations).await,
+ template => forward_operation_request(payload, template, operations, operation_logs).await,
}
}
@@ -677,11 +790,13 @@ async fn forward_operation_request(
payload: &str,
template: &str,
operations: &Operations,
+ operation_logs: &OperationLogs,
) -> Result<Vec<Message>, CumulocityMapperError> {
match operations.matching_smartrest_template(template) {
Some(operation) => {
if let Some(command) = operation.command() {
- execute_operation(payload, command.as_str(), &operation.name).await?;
+ execute_operation(payload, command.as_str(), &operation.name, operation_logs)
+ .await?;
}
Ok(vec![])
}
@@ -739,14 +854,21 @@ pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, Conversion
}
}
+#[cfg(test)]
mod tests {
+ use plugin_sm::operation_logs::OperationLogs;
+ use tempfile::TempDir;
+
#[tokio::test]
async fn test_execute_operation_is_not_blocked() {
+ let log_dir = TempDir::new().unwrap();
+ let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf()).unwrap();
+
let now = std::time::Instant::now();
- let () = super::execute_operation("5", "sleep", "sleep_one")
+ let () = super::execute_operation("5", "sleep", "sleep_one", &operation_logs)
.await
.unwrap();
- let () = super::execute_operation("5", "sleep", "sleep_two")
+ let () = super::execute_operation("5", "sleep", "sleep_two", &operation_logs)
.await
.unwrap();
diff --git a/crates/core/tedge_mapper/src/c8y/error.rs b/crates/core/tedge_mapper/src/c8y/error.rs
index add72d82..6a4f0a71 100644
--- a/crates/core/tedge_mapper/src/c8y/error.rs
+++ b/crates/core/tedge_mapper/src/c8y/error.rs
@@ -1,6 +1,7 @@
use c8y_smartrest::error::{
SMCumulocityMapperError, SmartRestDeserializerError, SmartRestSerializerError,
};
+use plugin_sm::operation_logs::OperationLogsError;
#[derive(thiserror::Error, Debug)]
#[allow(clippy::enum_variant_names)]
@@ -50,4 +51,10 @@ pub enum CumulocityMapperError {
#[error("An unknown operation template: {0}")]
UnknownOperation(String),
+
+ #[error(transparent)]
+ FromOperationLogs(#[from] OperationLogsError),
+
+ #[error(transparent)]
+ TedgeConfig(#[from] tedge_config::TEdgeConfigError),
}
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 3842b3a5..b5f9c275 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -77,7 +77,7 @@ impl TEdgeComponent for CumulocityMapper {
device_type,
operations,
http_proxy,
- ));
+ )?);
let mut mapper =
create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
@@ -180,13 +180,16 @@ mod tests {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let operations = Operations::default();
- let converter = Box::new(CumulocityConverter::new(
- size_threshold,
- DEVICE_NAME.into(),
- DEVICE_TYPE.into(),
- operations,
- proxy,
- ));
+ let converter = Box::new(
+ CumulocityConverter::new(
+ size_threshold,
+ DEVICE_NAME.into(),
+ DEVICE_TYPE.into(),
+ operations,
+ proxy,
+ )
+ .unwrap(),
+ );
let broker = test_mqtt_broker();
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index b591672e..03477298 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -60,7 +60,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
mqtt_tests::assert_received_all_expected(
&mut messages,
TEST_TIMEOUT_MS,
- &["118,software-management\n", "500\n"],
+ &["software-management", "500\n"],
)
.await;
@@ -299,11 +299,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
mqtt_tests::assert_received_all_expected(
&mut responses,
TEST_TIMEOUT_MS,
- &[
- "118,software-management\n",
- "500\n",
- "503,c8y_SoftwareUpdate,\n",
- ],
+ &["software-management", "500\n", "503,c8y_SoftwareUpdate,\n"],
)
.await;
@@ -950,6 +946,7 @@ fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> {
operations,
http_proxy,
)
+ .unwrap()
}
fn remove_whitespace(s: &str) -> String {
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index 743fd477..4280c738 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -98,4 +98,7 @@ pub enum ConversionError {
actual_size: usize,
threshold: usize,
},
+
+ #[error(transparent)]
+ FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError),
}