summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/tests.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/tests.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs211
1 files changed, 211 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 03125102..0413c33a 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -372,6 +372,61 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
sm_mapper.abort();
}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[serial]
+async fn c8y_mapper_child_alarm_mapping_to_smartrest() {
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker
+ .messages_published_on("c8y/s/us/external_sensor")
+ .await;
+
+ // Start the C8Y Mapper
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/minor/temperature_high/external_sensor",
+ r#"{ "text": "Temperature high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/minor/temperature_high/external_sensor",
+ r#"{ "text": "Temperature high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Expect converted temperature alarm message
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &["303,temperature_high"],
+ )
+ .await;
+
+ //Clear the previously published alarm
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/minor/temperature_high/external_sensor",
+ "",
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ sm_mapper.abort();
+}
+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_syncs_pending_alarms_on_startup() {
@@ -464,6 +519,108 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
+async fn c8y_mapper_syncs_pending_child_alarms_on_startup() {
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker
+ .messages_published_on("c8y/s/us/external_sensor")
+ .await;
+
+ // Start the C8Y Mapper
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
+
+ let mut internal_messages = broker
+ .messages_published_on("c8y-internal/alarms/critical/temperature_alarm/external_sensor")
+ .await;
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/temperature_alarm/external_sensor",
+ r#"{ "text": "Temperature very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Expect converted temperature alarm message
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &["301,temperature_alarm"],
+ )
+ .await;
+
+ // Wait till the message get synced to internal topic
+ mqtt_tests::assert_received_all_expected(
+ &mut internal_messages,
+ TEST_TIMEOUT_MS,
+ &["Temperature very high"],
+ )
+ .await;
+
+ // stop the mapper
+ sm_mapper.abort();
+
+ //Publish a new alarm while the mapper is down
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/pressure_alarm/external_sensor",
+ r#"{ "text": "Pressure very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/pressure_alarm/external_sensor",
+ r#"{ "text": "Pressure very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Ignored until the rumqttd broker bug that doesn't handle empty retained messages
+ //Clear the existing alarm while the mapper is down
+ // let _ = broker
+ // .publish_with_opts(
+ // "tedge/alarms/critical/temperature_alarm/external_sensor",
+ // "",
+ // mqtt_channel::QoS::AtLeastOnce,
+ // true,
+ // )
+ // .await
+ // .unwrap();
+
+ // Restart the C8Y Mapper
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
+
+ // Ignored until the rumqttd broker bug that doesn't handle empty retained messages
+ // Expect the previously missed clear temperature alarm message
+ // let msg = messages
+ // .next()
+ // .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ // .await
+ // .expect_or("No message received after a second.");
+ // dbg!(&msg);
+ // assert!(&msg.contains("306,temperature_alarm"));
+
+ // Expect the new pressure alarm message
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &["301,pressure_alarm"],
+ )
+ .await;
+
+ sm_mapper.abort();
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[serial]
async fn test_sync_alarms() {
let (_temp_dir, mut converter) = create_c8y_converter();
@@ -518,6 +675,60 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
+async fn test_sync_child_alarms() {
+ let (_temp_dir, mut converter) = create_c8y_converter();
+
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm/external_sensor";
+ let alarm_payload = r#"{ "text": "Temperature very high" }"#;
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ // During the sync phase, alarms are not converted immediately, but only cached to be synced later
+ assert_eq!(converter.convert(&alarm_message).await.len(), 1);
+
+ let non_alarm_topic = "tedge/measurements/external_sensor";
+ let non_alarm_payload = r#"{"temp": 1}"#;
+ let non_alarm_message = Message::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload);
+
+ // But non-alarms are converted immediately, even during the sync phase
+ assert!(!converter.convert(&non_alarm_message).await.is_empty());
+
+ let internal_alarm_topic = "c8y-internal/alarms/major/pressure_alarm/external_sensor";
+ let internal_alarm_payload = r#"{ "text": "Temperature very high" }"#;
+ let internal_alarm_message = Message::new(
+ &Topic::new_unchecked(internal_alarm_topic),
+ internal_alarm_payload,
+ );
+
+ // During the sync phase, internal alarms are not converted, but only cached to be synced later
+ assert!(converter.convert(&internal_alarm_message).await.is_empty());
+
+ // When sync phase is complete, all pending alarms are returned
+ let sync_messages = converter.sync_messages();
+ assert_eq!(sync_messages.len(), 2);
+
+ // The first message will be clear alarm message for pressure_alarm
+ let alarm_message = sync_messages.get(0).unwrap();
+ assert_eq!(
+ alarm_message.topic.name,
+ "tedge/alarms/major/pressure_alarm/external_sensor"
+ );
+ assert_eq!(alarm_message.payload_bytes().len(), 0); //Clear messages are empty messages
+
+ // The second message will be the temperature_alarm
+ let alarm_message = sync_messages.get(1).unwrap();
+ assert_eq!(alarm_message.topic.name, alarm_topic);
+ assert_eq!(alarm_message.payload_str().unwrap(), alarm_payload);
+
+ // After the sync phase, the conversion of both non-alarms as well as alarms are done immediately
+ assert!(!converter.convert(alarm_message).await.is_empty());
+ assert!(!converter.convert(&non_alarm_message).await.is_empty());
+
+ // But, even after the sync phase, internal alarms are not converted and just ignored, as they are purely internal
+ assert!(converter.convert(&internal_alarm_message).await.is_empty());
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[serial]
async fn convert_thin_edge_json_with_child_id() {
let (_temp_dir, mut converter) = create_c8y_converter();