diff options
author | Alex Solomes <alex.solomes@softwareag.com> | 2022-06-09 17:09:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-09 17:09:06 +0100 |
commit | c95fe8e202f470e79123bf4c992f779df249ec42 (patch) | |
tree | 848540c0275a70b143440edfdacea0533e28d8ac /crates/core/tedge_mapper | |
parent | b91cc0205399fef3b73e4a41e56b02fd8cfc7d87 (diff) |
Testing utility: TempTedgeDir POC (#1148)
* temp tedge dir poc
Signed-off-by: initard <solo@softwareag.com>
* renaming crate and removing TedgeChildTempDir
- renamed the crate to tedge_test_utils
- removed TedgeChildTempDir as TempTedgeDir already had the same
functions
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* changing agent.rs tests to use TempTedgeDir
- changing agent.rs to use TempTedgeDir
- un-ignoring a test to check if agent restart creates a file in the
right place
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* changing tedge_mapper tests to use TempTedgeDir
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* changing tedge_config tests to use TempTedgeDir
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* changing c8y_configuration_plugin tests to use TempTedgeDir
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* changing tedge_apama_plugin tests to use TempTedgeDir
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* changing logged_command tests to use TempTedgeDir
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* adding another method to TempTedgeDir
Signed-off-by: initard <solo@softwareag.com>
Co-authored-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs | 6 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 6 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 98 |
5 files changed, 54 insertions, 62 deletions
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index a7876bdf..d7464294 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -64,7 +64,7 @@ mockito = "0.31" mqtt_tests = { path = "../../tests/mqtt_tests" } serde_json = "1.0" serial_test = "0.6" -tempfile = "3.2" +tedge_test_utils = { path = "../../tests/tedge_test_utils" } test-case = "2.0" time = { version = "0.3", features = ["macros"] } tokio-test = "0.4" diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index a7468ebe..9ec58cb0 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -837,11 +837,11 @@ pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, Conversion #[cfg(test)] mod tests { use plugin_sm::operation_logs::OperationLogs; - use tempfile::TempDir; + use tedge_test_utils::fs::TempTedgeDir; #[tokio::test] async fn test_execute_operation_is_not_blocked() { - let log_dir = TempDir::new().unwrap(); + let log_dir = TempTedgeDir::new(); let operation_logs = OperationLogs::try_new(log_dir.path().to_path_buf()).unwrap(); let now = std::time::Instant::now(); diff --git a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs index 535f396b..fca9057d 100644 --- a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs +++ b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs @@ -94,8 +94,8 @@ fn create_inotify_with_non_existing_dir() { #[test] fn create_inotify_with_right_directory() { - use tempfile::TempDir; - let dir = TempDir::new().unwrap().into_path(); - let res = create_inotify_watch(dir); + use tedge_test_utils::fs::TempTedgeDir; + let dir = TempTedgeDir::new(); + let res = create_inotify_watch(dir.path().to_path_buf()); assert!(res.is_ok()); } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 24dfde84..8ea3b4fc 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -130,7 +130,7 @@ mod tests { use mqtt_tests::{assert_received_all_expected, test_mqtt_broker}; use serde_json::json; use std::time::Duration; - use tempfile::TempDir; + use tedge_test_utils::fs::TempTedgeDir; use test_case::test_case; const TEST_TIMEOUT_SECS: Duration = Duration::from_secs(5); @@ -188,7 +188,7 @@ mod tests { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); let operations = Operations::default(); - let tmp_dir = TempDir::new().unwrap(); + let tmp_dir = TempTedgeDir::new(); let converter = Box::new( CumulocityConverter::from_logs_path( size_threshold, @@ -196,7 +196,7 @@ mod tests { DEVICE_TYPE.into(), operations, proxy, - tmp_dir.into_path(), + tmp_dir.path().to_path_buf(), ) .unwrap(), ); diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 8596f594..78eee84c 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -19,7 +19,7 @@ use mqtt_tests::test_mqtt_server::MqttProcessHandler; use serde_json::json; use serial_test::serial; use std::{path::Path, time::Duration}; -use tempfile::TempDir; +use tedge_test_utils::fs::TempTedgeDir; use test_case::test_case; use tokio::task::JoinHandle; @@ -39,12 +39,12 @@ async fn mapper_publishes_a_software_list_request() { .await; // Start the SM Mapper - let sm_mapper = start_c8y_mapper(broker.port).await; + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Expect on `tedge/commands/req/software/list` a software list request. mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &[r#"{"id":"#]).await; - sm_mapper.unwrap().abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -55,12 +55,12 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper - let sm_mapper = start_c8y_mapper(broker.port).await; + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Expect 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails. mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &["500\n"]).await; - sm_mapper.unwrap().abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -73,8 +73,7 @@ async fn mapper_publishes_software_update_request() { .messages_published_on("tedge/commands/req/software/update") .await; - let sm_mapper = start_c8y_mapper(broker.port).await; - + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Prepare and publish a software update smartrest request on `c8y/s/ds`. let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#; let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); @@ -101,7 +100,7 @@ async fn mapper_publishes_software_update_request() { ) .await; - sm_mapper.unwrap().abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -114,7 +113,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper - let sm_mapper = start_c8y_mapper(broker.port).await; + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); let _ = publish_a_fake_jwt_token(broker).await; // Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`. @@ -159,7 +158,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { ) .await; - sm_mapper.unwrap().abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -169,7 +168,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper - let sm_mapper = start_c8y_mapper(broker.port).await; + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); let _ = publish_a_fake_jwt_token(broker).await; // The agent publish an error @@ -206,7 +205,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { ) .await; - sm_mapper.unwrap().abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -232,7 +231,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result let mut responses = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper - let sm_mapper = start_c8y_mapper(broker.port).await?; + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Prepare and publish a software update smartrest request on `c8y/s/ds`. let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#; @@ -288,7 +287,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result .unwrap(); // Restart SM Mapper - let sm_mapper = start_c8y_mapper(broker.port).await?; + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Validate that the mapper process the response and forward it on 'c8y/s/us' // Expect init messages followed by a 503 (success) @@ -317,8 +316,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { // Create a subscriber to receive messages on `c8y/s/us` topic. let mut messages = broker.messages_published_on("c8y/s/us").await; - let _sm_mapper = start_c8y_mapper(broker.port).await; - + let (_tmp_dir, _sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Prepare and publish a c8y_SoftwareUpdate smartrest request on `c8y/s/ds` that contains a wrong action `remove`, that is not known by c8y. let smartrest = r#"528,external_id,nodered,1.0.0::debian,,remove"#; let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); @@ -341,7 +339,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { let mut messages = broker.messages_published_on("c8y/s/us").await; // Start the C8Y Mapper - let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap(); + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); let _ = broker .publish_with_opts( @@ -372,7 +370,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { .await .unwrap(); - c8y_mapper.abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] @@ -382,7 +380,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { let mut messages = broker.messages_published_on("c8y/s/us").await; // Start the C8Y Mapper - let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap(); + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); let mut internal_messages = broker .messages_published_on("c8y-internal/alarms/critical/temperature_alarm") @@ -415,7 +413,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { .await; // stop the mapper - c8y_mapper.abort(); + sm_mapper.abort(); //Publish a new alarm while the mapper is down let _ = broker @@ -441,7 +439,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { // .unwrap(); // Restart the C8Y Mapper - let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap(); + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); // Ignored until the rumqttd broker bug that doesn't handle empty retained messages // Expect the previously missed clear temperature alarm message @@ -461,13 +459,13 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { ) .await; - c8y_mapper.abort(); + sm_mapper.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn test_sync_alarms() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let alarm_topic = "tedge/alarms/critical/temperature_alarm"; let alarm_payload = r#"{ "text": "Temperature very high" }"#; @@ -521,7 +519,7 @@ async fn test_sync_alarms() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_thin_edge_json_with_child_id() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let in_topic = "tedge/measurements/child1"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -554,7 +552,7 @@ async fn convert_thin_edge_json_with_child_id() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let in_topic = "tedge/measurements/child1"; let in_invalid_payload = r#"{"temp": invalid}"#; @@ -589,7 +587,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_two_thin_edge_json_messages_given_different_child_id() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; // First message from "child1" @@ -655,7 +653,7 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) { #[tokio::test] async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let alarm_topic = "tedge/alarms/critical/temperature_alarm"; let big_alarm_text = create_packet(1024 * 20); @@ -675,7 +673,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let event_topic = "tedge/events/click_event"; let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); @@ -695,7 +693,7 @@ async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let event_topic = "tedge/events/click_event"; let event_payload = r#"{ "text": "tick", "foo": "bar" }"#; let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); @@ -720,7 +718,7 @@ async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_convert_big_event() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let event_topic = "tedge/events/click_event"; let big_event_text = create_packet((16 + 1) * 1024); // Event payload > size_threshold @@ -732,7 +730,7 @@ async fn test_convert_big_event() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_convert_big_measurement() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let measurement_topic = "tedge/measurements"; let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json @@ -761,7 +759,7 @@ async fn test_convert_big_measurement() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_convert_small_measurement() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let measurement_topic = "tedge/measurements"; let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes @@ -785,7 +783,7 @@ async fn test_convert_small_measurement() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_convert_big_measurement_for_child_device() { - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let measurement_topic = "tedge/measurements/child1"; let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json @@ -822,7 +820,7 @@ async fn test_convert_small_measurement_for_child_device() { &Topic::new_unchecked(measurement_topic), big_measurement_payload, ); - let mut converter = create_c8y_converter(); + let (_temp_dir, mut converter) = create_c8y_converter(); let result = converter.convert(&big_measurement_message).await; assert!(result @@ -913,8 +911,8 @@ impl C8YHttpProxy for FakeC8YHttpProxy { } } -async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> { - let converter = create_c8y_converter(); +async fn start_c8y_mapper(mqtt_port: u16) -> Result<(TempTedgeDir, JoinHandle<()>), anyhow::Error> { + let (_temp_dir, converter) = create_c8y_converter(); let mut mapper = create_mapper( "c8y-mapper-test", MQTT_HOST.to_string(), @@ -926,37 +924,31 @@ async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Erro let mapper_task = tokio::spawn(async move { let _ = mapper.run(None).await; }); - Ok(mapper_task) + Ok((_temp_dir, mapper_task)) } -fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> { +fn create_c8y_converter() -> (TempTedgeDir, CumulocityConverter<FakeC8YHttpProxy>) { let size_threshold = SizeThreshold(16 * 1024); let device_name = "test-device".into(); let device_type = "test-device-type".into(); let operations = Operations::default(); let http_proxy = FakeC8YHttpProxy {}; - let tmp_dir = TempDir::new().unwrap(); - std::fs::create_dir_all(&format!( - "{}/tedge/agent/", - &tmp_dir.path().to_str().unwrap() - )) - .unwrap(); - std::fs::File::create(&format!( - "{}/tedge/agent/software-list-2011-11-11T11:11:11Z.log", - &tmp_dir.path().to_str().unwrap() - )) - .unwrap(); - - CumulocityConverter::from_logs_path( + let tmp_dir = TempTedgeDir::new(); + tmp_dir + .dir("tedge") + .dir("agent") + .file("software-list-2011-11-11T11:11:11Z.log"); + let converter = CumulocityConverter::from_logs_path( size_threshold, device_name, device_type, operations, http_proxy, - tmp_dir.into_path(), + tmp_dir.path().to_path_buf(), ) - .unwrap() + .unwrap(); + (tmp_dir, converter) } fn remove_whitespace(s: &str) -> String { |