summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-08-08 16:56:00 +0530
committerGitHub <noreply@github.com>2022-08-08 16:56:00 +0530
commit098cd1bc0b8a5fb46d114733c5f5c632fc25587e (patch)
treeabb80a0228d80966ea100db3ef402507ddbe3bf1
parentdfbe050440728b5996c8255959b67cdb42ed166a (diff)
Alarms for child devices (#1314)
With this feature the child device now able to send an alarm message to Cumulocity cloud. The child device must use the mqtt topic to publish the alarm message, then thin-edge device will pickup this alarm message and forward it to the cumulocity cloud. Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs92
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs211
-rw-r--r--crates/core/thin_edge_json/src/alarm.rs79
3 files changed, 358 insertions, 24 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 9912a10b..12f3ec3c 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -90,7 +90,9 @@ where
"tedge/measurements",
"tedge/measurements/+",
"tedge/alarms/+/+",
+ "tedge/alarms/+/+/+",
"c8y-internal/alarms/+/+",
+ "c8y-internal/alarms/+/+/+",
"tedge/events/+",
"tedge/events/+/+",
]
@@ -142,7 +144,9 @@ where
"tedge/measurements",
"tedge/measurements/+",
"tedge/alarms/+/+",
+ "tedge/alarms/+/+/+",
"c8y-internal/alarms/+/+",
+ "c8y-internal/alarms/+/+/+",
"tedge/events/+",
"tedge/events/+/+",
]
@@ -263,6 +267,37 @@ where
Ok(messages)
}
+ pub fn process_alarm_messages(
+ &mut self,
+ topic: &Topic,
+ message: &Message,
+ ) -> Result<Vec<Message>, ConversionError> {
+ if topic.name.starts_with("tedge/alarms") {
+ let mut mqtt_messages: Vec<Message> = Vec::new();
+ let topic_split: Vec<&str> = topic.name.split('/').collect();
+ if topic_split.len() == 5 {
+ let child_id = topic_split[4];
+ // Create a child device, if it does not exists already
+ if !child_id.is_empty() && !self.children.contains(child_id) {
+ self.children.insert(child_id.to_string());
+ mqtt_messages.push(Message::new(
+ &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
+ format!("101,{child_id},{child_id},thin-edge.io-child"),
+ ));
+ }
+ }
+ self.size_threshold.validate(message)?;
+ let mut messages = self.alarm_converter.try_convert_alarm(message)?;
+ mqtt_messages.append(&mut messages);
+ Ok(mqtt_messages)
+ } else if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) {
+ self.alarm_converter.process_internal_alarm(message);
+ Ok(vec![])
+ } else {
+ Err(ConversionError::UnsupportedTopic(topic.name.clone()))
+ }
+ }
+
fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> Result<String, ConversionError> {
Ok(format!(
"{},{},\"{}\",{}",
@@ -313,13 +348,11 @@ where
let () = self.size_threshold.validate(message)?;
self.try_convert_measurement(message)
}
- topic if topic.name.starts_with("tedge/alarms") => {
- let () = self.size_threshold.validate(message)?;
- self.alarm_converter.try_convert_alarm(message)
- }
- topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {
- self.alarm_converter.process_internal_alarm(message);
- Ok(vec![])
+ topic
+ if topic.name.starts_with("tedge/alarms")
+ | topic.name.starts_with(INTERNAL_ALARMS_TOPIC) =>
+ {
+ self.process_alarm_messages(topic, message)
}
topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => {
self.try_convert_event(message).await
@@ -461,32 +494,39 @@ impl AlarmConverter {
}
}
- fn try_convert_alarm(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
- let mut vec: Vec<Message> = Vec::new();
-
+ fn try_convert_alarm(
+ &mut self,
+ input_message: &Message,
+ ) -> Result<Vec<Message>, ConversionError> {
+ let mut output_messages: Vec<Message> = Vec::new();
match self {
Self::Syncing {
pending_alarms_map,
old_alarms_map: _,
} => {
- let alarm_id = input
+ let alarm_id = input_message
.topic
.name
.strip_prefix(TEDGE_ALARMS_TOPIC)
.expect("Expected tedge/alarms prefix")
.to_string();
- pending_alarms_map.insert(alarm_id, input.clone());
+ pending_alarms_map.insert(alarm_id, input_message.clone());
}
Self::Synced => {
//Regular conversion phase
- let tedge_alarm =
- ThinEdgeAlarm::try_from(input.topic.name.as_str(), input.payload_str()?)?;
+ let tedge_alarm = ThinEdgeAlarm::try_from(
+ input_message.topic.name.as_str(),
+ input_message.payload_str()?,
+ )?;
let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?;
- let c8y_alarm_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- vec.push(Message::new(&c8y_alarm_topic, smartrest_alarm));
+ let c8y_alarm_topic = Topic::new_unchecked(
+ self.get_c8y_alarm_topic(input_message.topic.name.as_str())?
+ .as_str(),
+ );
+ output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm));
// Persist a copy of the alarm to an internal topic for reconciliation on next restart
- let alarm_id = input
+ let alarm_id = input_message
.topic
.name
.strip_prefix(TEDGE_ALARMS_TOPIC)
@@ -495,12 +535,22 @@ impl AlarmConverter {
let topic =
Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str());
let alarm_copy =
- Message::new(&topic, input.payload_bytes().to_owned()).with_retain();
- vec.push(alarm_copy);
+ Message::new(&topic, input_message.payload_bytes().to_owned()).with_retain();
+ output_messages.push(alarm_copy);
}
}
+ Ok(output_messages)
+ }
- Ok(vec)
+ fn get_c8y_alarm_topic(&self, topic: &str) -> Result<String, ConversionError> {
+ let topic_split: Vec<&str> = topic.split('/').collect();
+ if topic_split.len() == 4 {
+ Ok(SMARTREST_PUBLISH_TOPIC.to_string())
+ } else if topic_split.len() == 5 {
+ Ok(format!("{SMARTREST_PUBLISH_TOPIC}/{}", topic_split[4]))
+ } else {
+ Err(ConversionError::UnsupportedTopic(topic.to_string()))
+ }
}
fn process_internal_alarm(&mut self, input: &Message) {
@@ -577,10 +627,10 @@ impl AlarmConverter {
// Ignore
}
}
-
sync_messages
}
}
+
fn create_device_data_fragments(
device_name: &str,
device_type: &str,
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 03125102..0413c33a 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -372,6 +372,61 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
sm_mapper.abort();
}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[serial]
+async fn c8y_mapper_child_alarm_mapping_to_smartrest() {
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker
+ .messages_published_on("c8y/s/us/external_sensor")
+ .await;
+
+ // Start the C8Y Mapper
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/minor/temperature_high/external_sensor",
+ r#"{ "text": "Temperature high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/minor/temperature_high/external_sensor",
+ r#"{ "text": "Temperature high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Expect converted temperature alarm message
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &["303,temperature_high"],
+ )
+ .await;
+
+ //Clear the previously published alarm
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/minor/temperature_high/external_sensor",
+ "",
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ sm_mapper.abort();
+}
+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_syncs_pending_alarms_on_startup() {
@@ -464,6 +519,108 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
+async fn c8y_mapper_syncs_pending_child_alarms_on_startup() {
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker
+ .messages_published_on("c8y/s/us/external_sensor")
+ .await;
+
+ // Start the C8Y Mapper
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
+
+ let mut internal_messages = broker
+ .messages_published_on("c8y-internal/alarms/critical/temperature_alarm/external_sensor")
+ .await;
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/temperature_alarm/external_sensor",
+ r#"{ "text": "Temperature very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Expect converted temperature alarm message
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &["301,temperature_alarm"],
+ )
+ .await;
+
+ // Wait till the message get synced to internal topic
+ mqtt_tests::assert_received_all_expected(
+ &mut internal_messages,
+ TEST_TIMEOUT_MS,
+ &["Temperature very high"],
+ )
+ .await;
+
+ // stop the mapper
+ sm_mapper.abort();
+
+ //Publish a new alarm while the mapper is down
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/pressure_alarm/external_sensor",
+ r#"{ "text": "Pressure very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/pressure_alarm/external_sensor",
+ r#"{ "text": "Pressure very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Ignored until the rumqttd broker bug that doesn't handle empty retained messages
+ //Clear the existing alarm while the mapper is down
+ // let _ = broker
+ // .publish_with_opts(
+ // "tedge/alarms/critical/temperature_alarm/external_sensor",
+ // "",
+ // mqtt_channel::QoS::AtLeastOnce,
+ // true,
+ // )
+ // .await
+ // .unwrap();
+
+ // Restart the C8Y Mapper
+ let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
+
+ // Ignored until the rumqttd broker bug that doesn't handle empty retained messages
+ // Expect the previously missed clear temperature alarm message
+ // let msg = messages
+ // .next()
+ // .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ // .await
+ // .expect_or("No message received after a second.");
+ // dbg!(&msg);
+ // assert!(&msg.contains("306,temperature_alarm"));
+
+ // Expect the new pressure alarm message
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &["301,pressure_alarm"],
+ )
+ .await;
+
+ sm_mapper.abort();
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[serial]
async fn test_sync_alarms() {
let (_temp_dir, mut converter) = create_c8y_converter();
@@ -518,6 +675,60 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
+async fn test_sync_child_alarms() {
+ let (_temp_dir, mut converter) = create_c8y_converter();
+
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm/external_sensor";
+ let alarm_payload = r#"{ "text": "Temperature very high" }"#;
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ // During the sync phase, alarms are not converted immediately, but only cached to be synced later
+ assert_eq!(converter.convert(&alarm_message).await.len(), 1);
+
+ let non_alarm_topic = "tedge/measurements/external_sensor";
+ let non_alarm_payload = r#"{"temp": 1}"#;
+ let non_alarm_message = Message::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload);
+
+ // But non-alarms are converted immediately, even during the sync phase
+ assert!(!converter.convert(&non_alarm_message).await.is_empty());
+
+ let internal_alarm_topic = "c8y-internal/alarms/major/pressure_alarm/external_sensor";
+ let internal_alarm_payload = r#"{ "text": "Temperature very high" }"#;
+ let internal_alarm_message = Message::new(
+ &Topic::new_unchecked(internal_alarm_topic),
+ internal_alarm_payload,
+ );
+
+ // During the sync phase, internal alarms are not converted, but only cached to be synced later
+ assert!(converter.convert(&internal_alarm_message).await.is_empty());
+
+ // When sync phase is complete, all pending alarms are returned
+ let sync_messages = converter.sync_messages();
+ assert_eq!(sync_messages.len(), 2);
+
+ // The first message will be clear alarm message for pressure_alarm
+ let alarm_message = sync_messages.get(0).unwrap();
+ assert_eq!(
+ alarm_message.topic.name,
+ "tedge/alarms/major/pressure_alarm/external_sensor"
+ );
+ assert_eq!(alarm_message.payload_bytes().len(), 0); //Clear messages are empty messages
+
+ // The second message will be the temperature_alarm
+ let alarm_message = sync_messages.get(1).unwrap();
+ assert_eq!(alarm_message.topic.name, alarm_topic);
+ assert_eq!(alarm_message.payload_str().unwrap(), alarm_payload);
+
+ // After the sync phase, the conversion of both non-alarms as well as alarms are done immediately
+ assert!(!converter.convert(alarm_message).await.is_empty());
+ assert!(!converter.convert(&non_alarm_message).await.is_empty());
+
+ // But, even after the sync phase, internal alarms are not converted and just ignored, as they are purely internal
+ assert!(converter.convert(&internal_alarm_message).await.is_empty());
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[serial]
async fn convert_thin_edge_json_with_child_id() {
let (_temp_dir, mut converter) = create_c8y_converter();
diff --git a/crates/core/thin_edge_json/src/alarm.rs b/crates/core/thin_edge_json/src/alarm.rs
index 9029d673..783fc355 100644
--- a/crates/core/thin_edge_json/src/alarm.rs
+++ b/crates/core/thin_edge_json/src/alarm.rs
@@ -39,6 +39,9 @@ pub enum ThinEdgeJsonDeserializerError {
#[error(transparent)]
SerdeJsonError(#[from] serde_json::error::Error),
+
+ #[error("Unsupported external device ID in topic: {0}")]
+ UnsupportedExternalDeviceId(String),
}
impl TryFrom<&str> for AlarmSeverity {
@@ -63,15 +66,28 @@ impl ThinEdgeAlarm {
mqtt_payload: &str,
) -> Result<Self, ThinEdgeJsonDeserializerError> {
let topic_split: Vec<&str> = mqtt_topic.split('/').collect();
- if topic_split.len() == 4 {
+ if topic_split.len() == 4 || topic_split.len() == 5 {
+ let alarm_severity = topic_split[2];
let alarm_name = topic_split[3];
+
+ if alarm_severity.is_empty() {
+ return Err(ThinEdgeJsonDeserializerError::UnsupportedAlarmSeverity(
+ mqtt_topic.into(),
+ ));
+ }
+
if alarm_name.is_empty() {
return Err(ThinEdgeJsonDeserializerError::UnsupportedTopic(
mqtt_topic.into(),
));
}
- let alarm_severity = topic_split[2];
+ // Return error if child id in the topic is empty
+ if topic_split.len() == 5 && topic_split[4].is_empty() {
+ return Err(ThinEdgeJsonDeserializerError::UnsupportedExternalDeviceId(
+ mqtt_topic.into(),
+ ));
+ }
let alarm_data = if mqtt_payload.is_empty() {
None
@@ -159,6 +175,22 @@ mod tests {
};
"warning alarm parsing without text or timestamp"
)]
+ #[test_case(
+ "tedge/alarms/critical/temperature_alarm/extern_sensor",
+ json!({
+ "text": "I raised it",
+ "time": "2021-04-23T19:00:00+05:00",
+ }),
+ ThinEdgeAlarm {
+ name: "temperature_alarm".into(),
+ severity: AlarmSeverity::Critical,
+ data: Some(ThinEdgeAlarmData {
+ text: Some("I raised it".into()),
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ }),
+ };
+ "critical alarm parsing with childId"
+ )]
fn parse_thin_edge_alarm_json(
alarm_topic: &str,
alarm_payload: Value,
@@ -196,7 +228,7 @@ mod tests {
assert_matches!(
result,
- Err(ThinEdgeJsonDeserializerError::UnsupportedTopic(_))
+ Err(ThinEdgeJsonDeserializerError::UnsupportedAlarmSeverity(_))
);
}
@@ -215,4 +247,45 @@ mod tests {
let result = ThinEdgeAlarm::try_from("tedge/alarms/critical/temperature_high_alarm", "");
assert_matches!(result.unwrap().data, None);
}
+
+ #[test]
+ fn alarm_translation_invalid_topic_levels() {
+ let result = ThinEdgeAlarm::try_from("tedge/alarms/critical/temperature_alarm//", "{}");
+ assert_matches!(
+ result,
+ Err(ThinEdgeJsonDeserializerError::UnsupportedTopic(_))
+ );
+ }
+
+ #[test]
+ fn child_alarm_translation_empty_external_device_name() {
+ let result = ThinEdgeAlarm::try_from("tedge/alarms/critical/temperature_alarm/", "{}");
+
+ assert_matches!(
+ result,
+ Err(ThinEdgeJsonDeserializerError::UnsupportedExternalDeviceId(
+ _
+ ))
+ );
+ }
+
+ #[test]
+ fn child_alarm_translation_empty_alarm_name() {
+ let result = ThinEdgeAlarm::try_from("tedge/alarms/critical//external_sensor", "{}");
+
+ assert_matches!(
+ result,
+ Err(ThinEdgeJsonDeserializerError::UnsupportedTopic(_))
+ );
+ }
+
+ #[test]
+ fn child_alarm_translation_empty_severity() {
+ let result = ThinEdgeAlarm::try_from("tedge/alarms//some_alarm/external_sensor", "{}");
+
+ assert_matches!(
+ result,
+ Err(ThinEdgeJsonDeserializerError::UnsupportedAlarmSeverity(_))
+ );
+ }
}