summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-04-26 19:12:31 +0100
committerinitard <solo@softwareag.com>2022-05-03 17:54:08 +0100
commite27c8e758610166d50c2e1c7ebd64e709c5bf571 (patch)
tree822a079a5006f5c5dfb3401ae593e9686bead0ef /crates
parentda4f34614fd832c0e8699745325688ca825b1313 (diff)
operation logs improvement #1027
Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates')
-rw-r--r--crates/common/logged_command/src/logged_command.rs1
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_serializer.rs8
-rw-r--r--crates/core/plugin_sm/src/operation_logs.rs112
-rw-r--r--crates/core/tedge_agent/src/agent.rs4
-rw-r--r--crates/core/tedge_mapper/Cargo.toml2
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs180
-rw-r--r--crates/core/tedge_mapper/src/c8y/error.rs7
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs19
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs9
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs3
10 files changed, 242 insertions, 103 deletions
diff --git a/crates/common/logged_command/src/logged_command.rs b/crates/common/logged_command/src/logged_command.rs
index 08ce5990..ef40ab7d 100644
--- a/crates/common/logged_command/src/logged_command.rs
+++ b/crates/common/logged_command/src/logged_command.rs
@@ -9,6 +9,7 @@ use tokio::{
process::{Child, Command},
};
+#[derive(Debug)]
pub struct LoggingChild {
command_line: String,
pub inner_child: Child,
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
index cd3867b5..5eabba04 100644
--- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
@@ -38,14 +38,14 @@ where
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct SmartRestSetSupportedLogType {
pub message_id: &'static str,
- pub supported_operations: Vec<&'static str>,
+ pub supported_operations: Vec<String>,
}
-impl Default for SmartRestSetSupportedLogType {
- fn default() -> Self {
+impl From<Vec<String>> for SmartRestSetSupportedLogType {
+ fn from(operation_types: Vec<String>) -> Self {
Self {
message_id: "118",
- supported_operations: vec!["software-management"],
+ supported_operations: operation_types,
}
}
}
diff --git a/crates/core/plugin_sm/src/operation_logs.rs b/crates/core/plugin_sm/src/operation_logs.rs
index aa768725..7bf08eab 100644
--- a/crates/core/plugin_sm/src/operation_logs.rs
+++ b/crates/core/plugin_sm/src/operation_logs.rs
@@ -1,6 +1,6 @@
-use std::cmp::Reverse;
-use std::collections::BinaryHeap;
use std::path::PathBuf;
+use std::{cmp::Reverse, collections::HashMap};
+use std::{collections::BinaryHeap, path::Path};
use time::{format_description, OffsetDateTime};
use tracing::log;
@@ -13,33 +13,29 @@ pub enum OperationLogsError {
#[error(transparent)]
FromTimeFormat(#[from] time::error::Format),
+
+ #[error("Incorrect file format. Expected: `operation_name`-`timestamp`.log")]
+ FileFormatError,
}
#[derive(Debug)]
pub struct OperationLogs {
- log_dir: PathBuf,
- max_update_logs: usize,
- max_list_logs: usize,
+ pub log_dir: PathBuf,
}
pub enum LogKind {
SoftwareUpdate,
SoftwareList,
- Operation,
+ Operation(String),
}
const UPDATE_PREFIX: &str = "software-update";
const LIST_PREFIX: &str = "software-list";
-const OPERATION_PREFIX: &str = "operation";
impl OperationLogs {
pub fn try_new(log_dir: PathBuf) -> Result<OperationLogs, OperationLogsError> {
std::fs::create_dir_all(log_dir.clone())?;
- let operation_logs = OperationLogs {
- log_dir,
- max_update_logs: 5,
- max_list_logs: 1,
- };
+ let operation_logs = OperationLogs { log_dir };
if let Err(err) = operation_logs.remove_outdated_logs() {
// In no case a log-cleaning error should prevent the agent to run.
@@ -50,11 +46,7 @@ impl OperationLogs {
Ok(operation_logs)
}
- pub async fn new_log_file(
- &self,
- kind: LogKind,
- operation_name: Option<&str>,
- ) -> Result<LogFile, OperationLogsError> {
+ pub async fn new_log_file(&self, kind: LogKind) -> Result<LogFile, OperationLogsError> {
if let Err(err) = self.remove_outdated_logs() {
// In no case a log-cleaning error should prevent the agent to run.
// Hence the error is logged but not returned.
@@ -63,20 +55,12 @@ impl OperationLogs {
let now = OffsetDateTime::now_utc();
- let operation_name = {
- if let Some(operation_name) = operation_name {
- let operation_name = &format!("{OPERATION_PREFIX}-{}", operation_name);
- operation_name.to_string()
- } else {
- OPERATION_PREFIX.to_string()
- }
- };
-
let file_prefix = match kind {
LogKind::SoftwareUpdate => UPDATE_PREFIX,
LogKind::SoftwareList => LIST_PREFIX,
- LogKind::Operation => operation_name.as_str(),
+ LogKind::Operation(ref operation_name) => operation_name.as_str(),
};
+
let file_name = format!(
"{}-{}.log",
file_prefix,
@@ -92,42 +76,64 @@ impl OperationLogs {
}
pub fn remove_outdated_logs(&self) -> Result<(), OperationLogsError> {
- let mut update_logs = BinaryHeap::new();
- let mut list_logs = BinaryHeap::new();
+ let mut log_tracker: HashMap<String, BinaryHeap<Reverse<String>>> = HashMap::new();
+
+ // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management"
+ // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077
for file in (self.log_dir.read_dir()?).flatten() {
if let Some(path) = file.path().file_name().and_then(|name| name.to_str()) {
- if path.starts_with(UPDATE_PREFIX) {
- // The paths are pushed using the reverse alphabetic order
- update_logs.push(Reverse(path.to_string()));
- } else if path.starts_with(LIST_PREFIX) {
- // The paths are pushed using the reverse alphabetic order
- list_logs.push(Reverse(path.to_string()));
+ if path.starts_with("software-list") {
+ log_tracker
+ .entry("software-list".to_string())
+ .or_insert_with(BinaryHeap::new)
+ .push(Reverse(path.to_string()));
+ } else if path.starts_with("software-update") {
+ log_tracker
+ .entry("software-update".to_string())
+ .or_insert_with(BinaryHeap::new)
+ .push(Reverse(path.to_string()));
+ } else {
+ let file_name = path
+ .split('-')
+ .next()
+ .ok_or(OperationLogsError::FileFormatError)?;
+ log_tracker
+ .entry(file_name.to_string())
+ .or_insert_with(BinaryHeap::new)
+ .push(Reverse(path.to_string()));
}
}
}
- while update_logs.len() > self.max_update_logs {
- if let Some(rname) = update_logs.pop() {
- let name = rname.0;
- let path = self.log_dir.join(name.clone());
- if let Err(err) = std::fs::remove_file(&path) {
- log::warn!("Fail to remove out-dated log file {} : {}", name, err);
- }
+ for (key, value) in log_tracker.iter_mut() {
+ if key.starts_with("software-list") {
+ // only allow one update list file in logs
+ let () = remove_old_logs(value, &self.log_dir, 1)?;
+ } else {
+ // allow most recent five
+ let () = remove_old_logs(value, &self.log_dir, 5)?;
}
}
- while list_logs.len() > self.max_list_logs {
- if let Some(rname) = list_logs.pop() {
- let name = rname.0;
- let path = self.log_dir.join(name.clone());
- if let Err(err) = std::fs::remove_file(&path) {
- log::warn!("Fail to remove out-dated log file {} : {}", name, err);
- }
+ Ok(())
+ }
+}
+
+fn remove_old_logs(
+ log_tracker: &mut BinaryHeap<Reverse<String>>,
+ dir_path: &Path,
+ n: usize,
+) -> Result<(), OperationLogsError> {
+ while log_tracker.len() > n {
+ if let Some(rname) = log_tracker.pop() {
+ let name = rname.0;
+ let path = dir_path.join(name.clone());
+ if let Err(err) = std::fs::remove_file(&path) {
+ log::warn!("Fail to remove out-dated log file {} : {}", name, err);
}
}
-
- Ok(())
}
+ Ok(())
}
#[cfg(test)]
@@ -197,9 +203,7 @@ mod tests {
let update_log_7 = create_file(log_dir.path(), "software-update-1996-12-25T16:39:57z");
// Create a new log file
- let new_log = operation_logs
- .new_log_file(LogKind::SoftwareUpdate, None)
- .await?;
+ let new_log = operation_logs.new_log_file(LogKind::SoftwareUpdate).await?;
// The new log has been created
let new_path = Path::new(new_log.path());
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 77bc6429..b7d946b0 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -413,7 +413,7 @@ impl SmAgent {
let response = match self
.operation_logs
- .new_log_file(LogKind::SoftwareList, None)
+ .new_log_file(LogKind::SoftwareList)
.await
{
Ok(log_file) => plugins.lock().await.list(&request, log_file).await,
@@ -476,7 +476,7 @@ impl SmAgent {
let response = match self
.operation_logs
- .new_log_file(LogKind::SoftwareUpdate, None)
+ .new_log_file(LogKind::SoftwareUpdate)
.await
{
Ok(log_file) => {
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index cf96048c..34e8e8da 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -37,8 +37,10 @@ csv = "1.1"
download = { path = "../../common/download" }
flockfile = { path = "../../common/flockfile" }
futures = "0.3"
+logged_command = { path = "../../common/logged_command" }
mockall = "0.11"
mqtt_channel = { path = "../../common/mqtt_channel" }
+plugin_sm = { path = "../plugin_sm" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index d71e12a0..e830a7a4 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -23,14 +23,17 @@ use c8y_smartrest::{
},
};
use c8y_translator::json;
+
+use logged_command::LoggedCommand;
use mqtt_channel::{Message, Topic, TopicFilter};
+use plugin_sm::operation_logs::{OperationLogs, OperationLogsError};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
- fs::File,
+ fs::{self, File},
io::Read,
- path::Path,
- process::Stdio,
+ path::{Path, PathBuf},
};
+use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
use time::format_description::well_known::Rfc3339;
use tracing::{debug, info, log::error};
@@ -51,6 +54,7 @@ const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
+const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
const CREATE_EVENT_SMARTREST_CODE: u16 = 400;
@@ -66,6 +70,7 @@ where
device_type: String,
alarm_converter: AlarmConverter,
operations: Operations,
+ operation_logs: OperationLogs,
http_proxy: Proxy,
}
@@ -79,7 +84,7 @@ where
device_type: String,
operations: Operations,
http_proxy: Proxy,
- ) -> Self {
+ ) -> Result<Self, CumulocityMapperError> {
let mut topic_filter: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
@@ -102,7 +107,14 @@ where
let children: HashSet<String> = HashSet::new();
- CumulocityConverter {
+ let tedge_config = get_tedge_config()?;
+ let logs_path = tedge_config.query(LogPathSetting)?;
+
+ let log_dir = PathBuf::from(&format!("{}/{TEDGE_AGENT_LOG_DIR}", logs_path));
+
+ let operation_logs = OperationLogs::try_new(log_dir)?;
+
+ Ok(CumulocityConverter {
size_threshold,
children,
mapper_config,
@@ -110,8 +122,60 @@ where
device_type,
alarm_converter,
operations,
+ operation_logs,
http_proxy,
- }
+ })
+ }
+
+ #[cfg(test)]
+ pub fn from_logs_path(
+ size_threshold: SizeThreshold,
+ device_name: String,
+ device_type: String,
+ operations: Operations,
+ http_proxy: Proxy,
+ logs_path: PathBuf,
+ ) -> Result<Self, CumulocityMapperError> {
+ let mut topic_filter: TopicFilter = vec![
+ "tedge/measurements",
+ "tedge/measurements/+",
+ "tedge/alarms/+/+",
+ "c8y-internal/alarms/+/+",
+ "tedge/events/+",
+ ]
+ .try_into()
+ .expect("topics that mapper should subscribe to");
+
+ let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap());
+
+ let mapper_config = MapperConfig {
+ in_topic_filter: topic_filter,
+ out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"),
+ errors_topic: make_valid_topic_or_panic("tedge/errors"),
+ };
+
+ let alarm_converter = AlarmConverter::new();
+
+ let children: HashSet<String> = HashSet::new();
+
+ let log_dir = PathBuf::from(&format!(
+ "{}/{TEDGE_AGENT_LOG_DIR}",
+ logs_path.to_str().unwrap()
+ ));
+
+ let operation_logs = OperationLogs::try_new(log_dir)?;
+
+ Ok(CumulocityConverter {
+ size_threshold,
+ children,
+ mapper_config,
+ device_name,
+ device_type,
+ alarm_converter,
+ operations,
+ operation_logs,
+ http_proxy,
+ })
}
fn try_convert_measurement(
@@ -244,8 +308,13 @@ where
Ok(publish_restart_operation_status(message.payload_str()?).await?)
}
Ok(MapperSubscribeTopic::C8yTopic(_)) => {
- debug!("Cumulocity");
- parse_c8y_topics(message, &self.operations, &mut self.http_proxy).await
+ parse_c8y_topics(
+ message,
+ &self.operations,
+ &mut self.http_proxy,
+ &self.operation_logs,
+ )
+ .await
}
_ => Err(ConversionError::UnsupportedTopic(
message.topic.name.clone(),
@@ -263,7 +332,10 @@ where
&self.device_name,
&self.device_type,
));
- let supported_log_types_message = self.wrap_error(create_supported_log_types_message());
+
+ let supported_log_types_message = self.wrap_error(create_supported_log_types_message(
+ &self.operation_logs.log_dir,
+ ));
let pending_operations_message = self.wrap_error(create_get_pending_operations_message());
let software_list_message = self.wrap_error(create_get_software_list_message());
@@ -288,8 +360,16 @@ async fn parse_c8y_topics(
message: &Message,
operations: &Operations,
http_proxy: &mut impl C8YHttpProxy,
+ operation_logs: &OperationLogs,
) -> Result<Vec<Message>, ConversionError> {
- match process_smartrest(message.payload_str()?, operations, http_proxy).await {
+ match process_smartrest(
+ message.payload_str()?,
+ operations,
+ http_proxy,
+ operation_logs,
+ )
+ .await
+ {
Err(
ref err @ CumulocityMapperError::FromSmartRestDeserializer(
SmartRestDeserializerError::InvalidParameter { ref operation, .. },
@@ -476,8 +556,35 @@ fn create_get_pending_operations_message() -> Result<Message, ConversionError> {
Ok(Message::new(&topic, payload))
}
-fn create_supported_log_types_message() -> Result<Message, ConversionError> {
- let payload = SmartRestSetSupportedLogType::default().to_smartrest()?;
+fn get_supported_log_types(log_dir: &Path) -> Result<Vec<String>, ConversionError> {
+ let mut result = HashSet::new();
+
+ let paths = fs::read_dir(&log_dir).unwrap();
+ for path in paths {
+ let path = path?;
+ if fs::metadata(path.path())?.is_file() {
+ let file_name = path.file_name();
+ let file_name = file_name
+ .to_str()
+ .ok_or(OperationLogsError::FileFormatError)?;
+
+ // FIXME: this is a hotfix to map "software-list" and "software-update" to "software-management"
+ // this should be fixed in https://github.com/thin-edge/thin-edge.io/issues/1077
+ if file_name.starts_with("software-list") | file_name.starts_with("software-update") {
+ result.insert("software-management".to_string());
+ } else {
+ let log_type = file_name.split('-').next();
+ let log_type = log_type.ok_or(OperationLogsError::FileFormatError)?;
+ result.insert(log_type.to_string());
+ }
+ }
+ }
+ Ok(Vec::from_iter(result))
+}
+
+fn create_supported_log_types_message(log_dir: &Path) -> Result<Message, ConversionError> {
+ let supported_operation_types = get_supported_log_types(log_dir)?;
+ let payload = SmartRestSetSupportedLogType::from(supported_operation_types).to_smartrest()?;
let topic = C8yTopic::SmartRestResponse.to_topic()?;
Ok(Message::new(&topic, payload))
}
@@ -588,15 +695,15 @@ async fn execute_operation(
payload: &str,
command: &str,
operation_name: &str,
+ operation_logs: &OperationLogs,
) -> Result<(), CumulocityMapperError> {
let command = command.to_owned();
let payload = payload.to_string();
- let child = tokio::process::Command::new(&command)
- .args(&[&payload])
- .stdin(Stdio::null())
- .stdout(Stdio::null())
- .stderr(Stdio::null())
+ let mut logged = LoggedCommand::new(&command);
+ logged.arg(&payload);
+
+ let child = logged
.spawn()
.map_err(|e| CumulocityMapperError::ExecuteFailed {
error_message: e.to_string(),
@@ -604,29 +711,35 @@ async fn execute_operation(
operation_name: operation_name.to_string(),
});
+ let mut log_file = operation_logs
+ .new_log_file(plugin_sm::operation_logs::LogKind::Operation(
+ operation_name.to_string(),
+ ))
+ .await?;
+
match child {
- Ok(mut child) => {
- let _handle = tokio::spawn(async move {
- let _ = child.wait().await;
+ Ok(child) => {
+ tokio::spawn(async move {
+ let logger = log_file.buffer();
+ let _result = child.wait_with_output(logger).await.unwrap();
});
- return Ok(());
- }
- Err(err) => {
- return Err(err);
+ Ok(())
}
- };
+ Err(err) => Err(err),
+ }
}
async fn process_smartrest(
payload: &str,
operations: &Operations,
http_proxy: &mut impl C8YHttpProxy,
+ operation_logs: &OperationLogs,
) -> Result<Vec<Message>, CumulocityMapperError> {
let message_id: &str = &payload[..3];
match message_id {
"528" => forward_software_request(payload, http_proxy).await,
"510" => forward_restart_request(payload),
- template => forward_operation_request(payload, template, operations).await,
+ template => forward_operation_request(payload, template, operations, operation_logs).await,
}
}
@@ -677,11 +790,13 @@ async fn forward_operation_request(
payload: &str,
template: &str,
operations: &Operations,
+ operation_logs: &OperationLogs,
) -> Result<Vec<Message>, CumulocityMapperError> {
match operations.matching_smartrest_template(template) {
Some(operation) => {
if let Some(command) = operation.command() {
- execute_operation(payload, command.as_str(), &operation.name).await?;
+ execute_operation(payload, command.as_str(), &operation.name, operation_logs)
+ .await?;
}
Ok(vec![])
}
@@ -739,14 +854,21 @@ pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, Conversion
}
}
+#[cfg(test)]
mod tests {
+ use plugin_sm::operation_logs::OperationLogs;
+ use tempfile::TempDir;
+
#[tokio::test]
async fn test_execute_operation_is_not_blocked() {
+ let log_dir = TempDir::new().unwrap();
+ let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf()).unwrap();
+
let now = std::time::Instant::now();
- let () = super::execute_operation("5", "sleep", "sleep_one")
+ let () = super::execute_operation("5", "sleep", "sleep_one", &operation_logs)
.await
.unwrap();
- let () = super::execute_operation("5", "sleep", "sleep_two")
+ let () = super::execute_operation("5", "sleep", "sleep_two", &operation_logs)
.await
.unwrap();
diff --git a/crates/core/tedge_mapper/src/c8y/error.rs b/crates/core/tedge_mapper/src/c8y/error.rs
index add72d82..6a4f0a71 100644
--- a/crates/core/tedge_mapper/src/c8y/error.rs
+++ b/crates/core/tedge_mapper/src/c8y/error.rs
@@ -1,6 +1,7 @@
use c8y_smartrest::error::{
SMCumulocityMapperError, SmartRestDeserializerError, SmartRestSerializerError,
};
+use plugin_sm::operation_logs::OperationLogsError;
#[derive(thiserror::Error, Debug)]
#[allow(clippy::enum_variant_names)]
@@ -50,4 +51,10 @@ pub enum CumulocityMapperError {
#[error("An unknown operation template: {0}")]
UnknownOperation(String),
+
+ #[error(transparent)]
+ FromOperationLogs(#[from] OperationLogsError),
+
+ #[error(transparent)]
+ TedgeConfig(#[from] tedge_config::TEdgeConfigError),
}
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 3842b3a5..b5f9c275 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -77,7 +77,7 @@ impl TEdgeComponent for CumulocityMapper {
device_type,
operations,
http_proxy,
- ));
+ )?);
let mut mapper =
create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
@@ -180,13 +180,16 @@ mod tests {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let operations = Operations::default();
- let converter = Box::new(CumulocityConverter::new(
- size_threshold,
- DEVICE_NAME.into(),
- DEVICE_TYPE.into(),
- operations,
- proxy,
- ));
+ let converter = Box::new(
+ CumulocityConverter::new(
+ size_threshold,
+ DEVICE_NAME.into(),
+ DEVICE_TYPE.into(),
+ operations,
+ proxy,
+ )
+ .unwrap(),
+ );
let broker = test_mqtt_broker();
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index b591672e..03477298 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -60,7 +60,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
mqtt_tests::assert_received_all_expected(
&mut messages,
TEST_TIMEOUT_MS,
- &["118,software-management\n", "500\n"],
+ &["software-management", "500\n"],
)
.await;
@@ -299,11 +299,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
mqtt_tests::assert_received_all_expected(
&mut responses,
TEST_TIMEOUT_MS,
- &[
- "118,software-management\n",
- "500\n",
- "503,c8y_SoftwareUpdate,\n",
- ],
+ &["software-management", "500\n", "503,c8y_SoftwareUpdate,\n"],
)
.await;
@@ -950,6 +946,7 @@ fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> {
operations,
http_proxy,
)
+ .unwrap()
}
fn remove_whitespace(s: &str) -> String {
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index 743fd477..4280c738 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -98,4 +98,7 @@ pub enum ConversionError {
actual_size: usize,
threshold: usize,
},
+
+ #[error(transparent)]
+ FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError),
}