summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorAlex Solomes <alex.solomes@softwareag.com>2022-06-09 17:09:06 +0100
committerGitHub <noreply@github.com>2022-06-09 17:09:06 +0100
commitc95fe8e202f470e79123bf4c992f779df249ec42 (patch)
tree848540c0275a70b143440edfdacea0533e28d8ac /crates/core/tedge_mapper/src
parentb91cc0205399fef3b73e4a41e56b02fd8cfc7d87 (diff)
Testing utility: TempTedgeDir POC (#1148)
* temp tedge dir poc Signed-off-by: initard <solo@softwareag.com> * renaming crate and removing TedgeChildTempDir - renamed the crate to tedge_test_utils - removed TedgeChildTempDir as TempTedgeDir already had the same functions Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * changing agent.rs tests to use TempTedgeDir - changing agent.rs to use TempTedgeDir - un-ignoring a test to check if agent restart creates a file in the right place Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * changing tedge_mapper tests to use TempTedgeDir Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * changing tedge_config tests to use TempTedgeDir Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * changing c8y_configuration_plugin tests to use TempTedgeDir Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * changing tedge_apama_plugin tests to use TempTedgeDir Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * changing logged_command tests to use TempTedgeDir Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * adding another method to TempTedgeDir Signed-off-by: initard <solo@softwareag.com> Co-authored-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs4
-rw-r--r--crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs6
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs6
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs98
4 files changed, 53 insertions, 61 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index a7468ebe..9ec58cb0 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -837,11 +837,11 @@ pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, Conversion
#[cfg(test)]
mod tests {
use plugin_sm::operation_logs::OperationLogs;
- use tempfile::TempDir;
+ use tedge_test_utils::fs::TempTedgeDir;
#[tokio::test]
async fn test_execute_operation_is_not_blocked() {
- let log_dir = TempDir::new().unwrap();
+ let log_dir = TempTedgeDir::new();
let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf()).unwrap();
let now = std::time::Instant::now();
diff --git a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
index 535f396b..fca9057d 100644
--- a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
+++ b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
@@ -94,8 +94,8 @@ fn create_inotify_with_non_existing_dir() {
#[test]
fn create_inotify_with_right_directory() {
- use tempfile::TempDir;
- let dir = TempDir::new().unwrap().into_path();
- let res = create_inotify_watch(dir);
+ use tedge_test_utils::fs::TempTedgeDir;
+ let dir = TempTedgeDir::new();
+ let res = create_inotify_watch(dir.path().to_path_buf());
assert!(res.is_ok());
}
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 24dfde84..8ea3b4fc 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -130,7 +130,7 @@ mod tests {
use mqtt_tests::{assert_received_all_expected, test_mqtt_broker};
use serde_json::json;
use std::time::Duration;
- use tempfile::TempDir;
+ use tedge_test_utils::fs::TempTedgeDir;
use test_case::test_case;
const TEST_TIMEOUT_SECS: Duration = Duration::from_secs(5);
@@ -188,7 +188,7 @@ mod tests {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let operations = Operations::default();
- let tmp_dir = TempDir::new().unwrap();
+ let tmp_dir = TempTedgeDir::new();
let converter = Box::new(
CumulocityConverter::from_logs_path(
size_threshold,
@@ -196,7 +196,7 @@ mod tests {
DEVICE_TYPE.into(),
operations,
proxy,
- tmp_dir.into_path(),
+ tmp_dir.path().to_path_buf(),
)
.unwrap(),
);
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 8596f594..78eee84c 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -19,7 +19,7 @@ use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use serde_json::json;
use serial_test::serial;
use std::{path::Path, time::Duration};
-use tempfile::TempDir;
+use tedge_test_utils::fs::TempTedgeDir;
use test_case::test_case;
use tokio::task::JoinHandle;
@@ -39,12 +39,12 @@ async fn mapper_publishes_a_software_list_request() {
.await;
// Start the SM Mapper
- let sm_mapper = start_c8y_mapper(broker.port).await;
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Expect on `tedge/commands/req/software/list` a software list request.
mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &[r#"{"id":"#]).await;
- sm_mapper.unwrap().abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -55,12 +55,12 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
- let sm_mapper = start_c8y_mapper(broker.port).await;
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Expect 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails.
mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &["500\n"]).await;
- sm_mapper.unwrap().abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -73,8 +73,7 @@ async fn mapper_publishes_software_update_request() {
.messages_published_on("tedge/commands/req/software/update")
.await;
- let sm_mapper = start_c8y_mapper(broker.port).await;
-
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Prepare and publish a software update smartrest request on `c8y/s/ds`.
let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
@@ -101,7 +100,7 @@ async fn mapper_publishes_software_update_request() {
)
.await;
- sm_mapper.unwrap().abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -114,7 +113,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
- let sm_mapper = start_c8y_mapper(broker.port).await;
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
let _ = publish_a_fake_jwt_token(broker).await;
// Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`.
@@ -159,7 +158,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
)
.await;
- sm_mapper.unwrap().abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -169,7 +168,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
- let sm_mapper = start_c8y_mapper(broker.port).await;
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
let _ = publish_a_fake_jwt_token(broker).await;
// The agent publish an error
@@ -206,7 +205,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
)
.await;
- sm_mapper.unwrap().abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -232,7 +231,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
let mut responses = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
- let sm_mapper = start_c8y_mapper(broker.port).await?;
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Prepare and publish a software update smartrest request on `c8y/s/ds`.
let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
@@ -288,7 +287,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
.unwrap();
// Restart SM Mapper
- let sm_mapper = start_c8y_mapper(broker.port).await?;
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Validate that the mapper process the response and forward it on 'c8y/s/us'
// Expect init messages followed by a 503 (success)
@@ -317,8 +316,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
// Create a subscriber to receive messages on `c8y/s/us` topic.
let mut messages = broker.messages_published_on("c8y/s/us").await;
- let _sm_mapper = start_c8y_mapper(broker.port).await;
-
+ let (_tmp_dir, _sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Prepare and publish a c8y_SoftwareUpdate smartrest request on `c8y/s/ds` that contains a wrong action `remove`, that is not known by c8y.
let smartrest = r#"528,external_id,nodered,1.0.0::debian,,remove"#;
let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
@@ -341,7 +339,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start the C8Y Mapper
- let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap();
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
let _ = broker
.publish_with_opts(
@@ -372,7 +370,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
.await
.unwrap();
- c8y_mapper.abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
@@ -382,7 +380,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start the C8Y Mapper
- let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap();
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
let mut internal_messages = broker
.messages_published_on("c8y-internal/alarms/critical/temperature_alarm")
@@ -415,7 +413,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
.await;
// stop the mapper
- c8y_mapper.abort();
+ sm_mapper.abort();
//Publish a new alarm while the mapper is down
let _ = broker
@@ -441,7 +439,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
// .unwrap();
// Restart the C8Y Mapper
- let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap();
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Ignored until the rumqttd broker bug that doesn't handle empty retained messages
// Expect the previously missed clear temperature alarm message
@@ -461,13 +459,13 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
)
.await;
- c8y_mapper.abort();
+ sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn test_sync_alarms() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let alarm_topic = "tedge/alarms/critical/temperature_alarm";
let alarm_payload = r#"{ "text": "Temperature very high" }"#;
@@ -521,7 +519,7 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_thin_edge_json_with_child_id() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
@@ -554,7 +552,7 @@ async fn convert_thin_edge_json_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_invalid_payload = r#"{"temp": invalid}"#;
@@ -589,7 +587,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_two_thin_edge_json_messages_given_different_child_id() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
// First message from "child1"
@@ -655,7 +653,7 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
#[tokio::test]
async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let alarm_topic = "tedge/alarms/critical/temperature_alarm";
let big_alarm_text = create_packet(1024 * 20);
@@ -675,7 +673,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let event_topic = "tedge/events/click_event";
let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#;
let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
@@ -695,7 +693,7 @@ async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let event_topic = "tedge/events/click_event";
let event_payload = r#"{ "text": "tick", "foo": "bar" }"#;
let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
@@ -720,7 +718,7 @@ async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_convert_big_event() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let event_topic = "tedge/events/click_event";
let big_event_text = create_packet((16 + 1) * 1024); // Event payload > size_threshold
@@ -732,7 +730,7 @@ async fn test_convert_big_event() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_convert_big_measurement() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let measurement_topic = "tedge/measurements";
let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json
@@ -761,7 +759,7 @@ async fn test_convert_big_measurement() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_convert_small_measurement() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let measurement_topic = "tedge/measurements";
let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes
@@ -785,7 +783,7 @@ async fn test_convert_small_measurement() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_convert_big_measurement_for_child_device() {
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let measurement_topic = "tedge/measurements/child1";
let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json
@@ -822,7 +820,7 @@ async fn test_convert_small_measurement_for_child_device() {
&Topic::new_unchecked(measurement_topic),
big_measurement_payload,
);
- let mut converter = create_c8y_converter();
+ let (_temp_dir, mut converter) = create_c8y_converter();
let result = converter.convert(&big_measurement_message).await;
assert!(result
@@ -913,8 +911,8 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
}
}
-async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
- let converter = create_c8y_converter();
+async fn start_c8y_mapper(mqtt_port: u16) -> Result<(TempTedgeDir, JoinHandle<()>), anyhow::Error> {
+ let (_temp_dir, converter) = create_c8y_converter();
let mut mapper = create_mapper(
"c8y-mapper-test",
MQTT_HOST.to_string(),
@@ -926,37 +924,31 @@ async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Erro
let mapper_task = tokio::spawn(async move {
let _ = mapper.run(None).await;
});
- Ok(mapper_task)
+ Ok((_temp_dir, mapper_task))
}
-fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> {
+fn create_c8y_converter() -> (TempTedgeDir, CumulocityConverter<FakeC8YHttpProxy>) {
let size_threshold = SizeThreshold(16 * 1024);
let device_name = "test-device".into();
let device_type = "test-device-type".into();
let operations = Operations::default();
let http_proxy = FakeC8YHttpProxy {};
- let tmp_dir = TempDir::new().unwrap();
- std::fs::create_dir_all(&format!(
- "{}/tedge/agent/",
- &tmp_dir.path().to_str().unwrap()
- ))
- .unwrap();
- std::fs::File::create(&format!(
- "{}/tedge/agent/software-list-2011-11-11T11:11:11Z.log",
- &tmp_dir.path().to_str().unwrap()
- ))
- .unwrap();
-
- CumulocityConverter::from_logs_path(
+ let tmp_dir = TempTedgeDir::new();
+ tmp_dir
+ .dir("tedge")
+ .dir("agent")
+ .file("software-list-2011-11-11T11:11:11Z.log");
+ let converter = CumulocityConverter::from_logs_path(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
- tmp_dir.into_path(),
+ tmp_dir.path().to_path_buf(),
)
- .unwrap()
+ .unwrap();
+ (tmp_dir, converter)
}
fn remove_whitespace(s: &str) -> String {