diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-02-04 21:32:32 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-04 21:32:32 +0530 |
commit | c15cfc4bee97473411ed173903f808c1132b8d8c (patch) | |
tree | eb9c1337c25b81764ede6ad1834705c62ae72959 /crates | |
parent | 43bf45a3bd868586afe83a645be67f5042d9ac5d (diff) |
[#735] Detect and reconcile alarms updated while mapper was down (#791)
* [#735] Detect and reconcile cleared alarms while mapper was down on its restart
* Rust integration test for C8Y mapper
* Rust integration test for alarm syncing on startup
* New publish apis in mqtt_test crate that supports retain flag and QoS
* Refactor alarm conversion logic to dedicated AlarmConverter enum
Diffstat (limited to 'crates')
-rw-r--r-- | crates/common/download/src/download.rs | 9 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az_mapper.rs | 5 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y_converter.rs | 246 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y_mapper.rs | 7 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/converter.rs | 11 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/main.rs | 3 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapper.rs | 41 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 6 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/tests.rs | 187 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_client.rs | 17 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_server.rs | 13 |
11 files changed, 498 insertions, 47 deletions
diff --git a/crates/common/download/src/download.rs b/crates/common/download/src/download.rs index 92d7c168..472e11e0 100644 --- a/crates/common/download/src/download.rs +++ b/crates/common/download/src/download.rs @@ -1,9 +1,8 @@ use crate::error::DownloadError; use backoff::{future::retry, ExponentialBackoff}; -use nix::{ - fcntl::{fallocate, FallocateFlags}, - sys::statvfs, -}; +#[cfg(target_os = "linux")] +use nix::fcntl::{fallocate, FallocateFlags}; +use nix::sys::statvfs; use serde::{Deserialize, Serialize}; use std::{ fs::File, @@ -177,6 +176,7 @@ fn create_file_and_try_pre_allocate_space( return Err(DownloadError::InsufficientSpace); } // Reserve diskspace + #[cfg(target_os = "linux")] let _ = fallocate( file.as_raw_fd(), FallocateFlags::empty(), @@ -239,6 +239,7 @@ mod tests { Ok(()) } + #[cfg(target_os = "linux")] #[tokio::test] async fn downloader_download_with_content_length_larger_than_usable_disk_space( ) -> anyhow::Result<()> { diff --git a/crates/core/tedge_mapper/src/az_mapper.rs b/crates/core/tedge_mapper/src/az_mapper.rs index 4d2e2af2..57a94b66 100644 --- a/crates/core/tedge_mapper/src/az_mapper.rs +++ b/crates/core/tedge_mapper/src/az_mapper.rs @@ -4,8 +4,8 @@ use crate::mapper::*; use crate::size_threshold::SizeThreshold; use async_trait::async_trait; use clock::WallClock; -use tedge_config::ConfigSettingAccessor; use tedge_config::{AzureMapperTimestamp, TEdgeConfig}; +use tedge_config::{ConfigSettingAccessor, MqttPortSetting}; use tracing::{info_span, Instrument}; const AZURE_MAPPER_NAME: &str = "tedge-mapper-az"; @@ -22,12 +22,13 @@ impl AzureMapper { impl TEdgeComponent for AzureMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let clock = Box::new(WallClock); let size_threshold = SizeThreshold(255 * 1024); let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold)); - let mut mapper = create_mapper(AZURE_MAPPER_NAME, &tedge_config, converter).await?; + let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_port, converter).await?; mapper .run() diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs index 40c47f5f..ccc8a975 100644 --- a/crates/core/tedge_mapper/src/c8y_converter.rs +++ b/crates/core/tedge_mapper/src/c8y_converter.rs @@ -6,7 +6,8 @@ use c8y_smartrest::alarm; use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSupportedOperations}; use c8y_translator::json; use mqtt_channel::{Message, Topic}; -use std::collections::HashSet; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::Read; use std::path::Path; @@ -18,6 +19,8 @@ const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.jso const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations"; const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; +const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; +const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; pub struct CumulocityConverter { pub(crate) size_threshold: SizeThreshold, @@ -25,32 +28,38 @@ pub struct CumulocityConverter { pub(crate) mapper_config: MapperConfig, device_name: String, device_type: String, + + alarm_converter: AlarmConverter, } impl CumulocityConverter { pub fn new(size_threshold: SizeThreshold, device_name: String, device_type: String) -> Self { - let mut topic_fiter = make_valid_topic_filter_or_panic("tedge/measurements"); - let () = topic_fiter - .add("tedge/measurements/+") - .expect("invalid measurement topic filter"); - let () = topic_fiter - .add("tedge/alarms/+/+") - .expect("invalid alarm topic filter"); + let topics = vec![ + "tedge/measurements", + "tedge/measurements/+", + "tedge/alarms/+/+", + "c8y-internal/alarms/+/+", + ] + .try_into() + .expect("topics that mapper should subscribe to"); let mapper_config = MapperConfig { - in_topic_filter: topic_fiter, + in_topic_filter: topics, out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"), errors_topic: make_valid_topic_or_panic("tedge/errors"), }; let children: HashSet<String> = HashSet::new(); + let alarm_converter = AlarmConverter::new(); + CumulocityConverter { size_threshold, children, mapper_config, device_name, device_type, + alarm_converter, } } @@ -90,17 +99,6 @@ impl CumulocityConverter { } Ok(vec) } - - fn try_convert_alarm(&self, input: &Message) -> Result<Vec<Message>, ConversionError> { - let c8y_alarm_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - let mut vec: Vec<Message> = Vec::new(); - - let tedge_alarm = ThinEdgeAlarm::try_from(input.topic.name.as_str(), input.payload_str()?)?; - let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?; - vec.push(Message::new(&c8y_alarm_topic, smartrest_alarm)); - - Ok(vec) - } } impl Converter for CumulocityConverter { @@ -114,8 +112,11 @@ impl Converter for CumulocityConverter { let () = self.size_threshold.validate(input.payload_str()?)?; if input.topic.name.starts_with("tedge/measurement") { self.try_convert_measurement(input) - } else if input.topic.name.starts_with("tedge/alarms") { - self.try_convert_alarm(input) + } else if input.topic.name.starts_with(TEDGE_ALARMS_TOPIC) { + self.alarm_converter.try_convert_alarm(input) + } else if input.topic.name.starts_with(INTERNAL_ALARMS_TOPIC) { + self.alarm_converter.process_internal_alarm(input); + Ok(vec![]) } else { Err(ConversionError::UnsupportedTopic(input.topic.name.clone())) } @@ -135,6 +136,149 @@ impl Converter for CumulocityConverter { inventory_fragments_message, ]) } + + fn sync_messages(&mut self) -> Vec<Message> { + let sync_messages: Vec<Message> = self.alarm_converter.sync(); + self.alarm_converter = AlarmConverter::Synced; + sync_messages + } +} + +enum AlarmConverter { + Syncing { + pending_alarms_map: HashMap<String, Message>, + old_alarms_map: HashMap<String, Message>, + }, + Synced, +} + +impl AlarmConverter { + fn new() -> Self { + AlarmConverter::Syncing { + old_alarms_map: HashMap::new(), + pending_alarms_map: HashMap::new(), + } + } + + fn try_convert_alarm(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> { + let mut vec: Vec<Message> = Vec::new(); + + match self { + Self::Syncing { + pending_alarms_map, + old_alarms_map: _, + } => { + let alarm_id = input + .topic + .name + .strip_prefix(TEDGE_ALARMS_TOPIC) + .expect("Expected tedge/alarms prefix") + .to_string(); + pending_alarms_map.insert(alarm_id.clone(), input.clone()); + } + Self::Synced => { + //Regular conversion phase + let tedge_alarm = + ThinEdgeAlarm::try_from(input.topic.name.as_str(), input.payload_str()?)?; + let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?; + let c8y_alarm_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); + vec.push(Message::new(&c8y_alarm_topic, smartrest_alarm)); + + // Persist a copy of the alarm to an internal topic for reconciliation on next restart + let alarm_id = input + .topic + .name + .strip_prefix(TEDGE_ALARMS_TOPIC) + .expect("Expected tedge/alarms prefix") + .to_string(); + let topic = + Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str()); + let alarm_copy = + Message::new(&topic, input.payload_bytes().to_owned()).with_retain(); + vec.push(alarm_copy); + } + } + + Ok(vec) + } + + fn process_internal_alarm(&mut self, input: &Message) { + match self { + Self::Syncing { + pending_alarms_map: _, + old_alarms_map, + } => { + let alarm_id = input + .topic + .name + .strip_prefix(INTERNAL_ALARMS_TOPIC) + .expect("Expected c8y-internal/alarms prefix") + .to_string(); + old_alarms_map.insert(alarm_id, input.clone()); + } + Self::Synced => { + // Ignore + } + } + } + + /// Detect and sync any alarms that were raised/cleared while this mapper process was not running. + /// For this syncing logic, converter maintains an internal journal of all the alarms processed by this mapper, + /// which is compared against all the live alarms seen by the mapper on every startup. + /// + /// All the live alarms are received from tedge/alarms topic on startup. + /// Similarly, all the previously processed alarms are received from c8y-internal/alarms topic. + /// Sync detects the difference between these two sets, which are the missed messages. + /// + /// An alarm that is present in c8y-internal/alarms, but not in tedge/alarms topic + /// is assumed to have been cleared while the mapper process was down. + /// Similarly, an alarm that is present in tedge/alarms, but not in c8y-internal/alarms topic + /// is one that was raised while the mapper process was down. + /// An alarm present in both, if their payload is the same, is one that was already processed before the restart + /// and hence can be ignored during sync. + fn sync(&mut self) -> Vec<Message> { + let mut sync_messages: Vec<Message> = Vec::new(); + + match self { + Self::Syncing { + pending_alarms_map, + old_alarms_map, + } => { + // Compare the differences between alarms in tedge/alarms topic to the ones in c8y-internal/alarms topic + old_alarms_map.drain().for_each(|(alarm_id, old_message)| { + match pending_alarms_map.entry(alarm_id.clone()) { + // If an alarm that is present in c8y-internal/alarms topic is not present in tedge/alarms topic, + // it is assumed to have been cleared while the mapper process was down + Entry::Vacant(_) => { + let topic = Topic::new_unchecked( + format!("{}{}", TEDGE_ALARMS_TOPIC, alarm_id).as_str(), + ); + let message = Message::new(&topic, vec![]).with_retain(); + // Recreate the clear alarm message and add it to the pending alarms list to be processed later + sync_messages.push(message); + } + + // If the payload of a message received from tedge/alarms is same as one received from c8y-internal/alarms, + // it is assumed to be one that was already processed earlier and hence removed from the pending alarms list. + Entry::Occupied(entry) => { + if entry.get().payload_bytes() == old_message.payload_bytes() { + entry.remove(); + } + } + } + }); + + pending_alarms_map + .drain() + .for_each(|(_key, message)| sync_messages.push(message)); + } + Self::Synced => { + // Ignore + } + } + + sync_messages + } } fn create_device_data_fragments( @@ -404,4 +548,62 @@ mod test { } buffer } + + #[test] + fn test_sync_alarms() { + let size_threshold = SizeThreshold(16 * 1024); + let device_name = String::from("test"); + let device_type = String::from("test_type"); + + let mut converter = CumulocityConverter::new(size_threshold, device_name, device_type); + + let alarm_topic = "tedge/alarms/critical/temperature_alarm"; + let alarm_payload = r#"{ "message": "Temperature very high" }"#; + let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + + // During the sync phase, alarms are not converted immediately, but only cached to be synced later + assert!(converter.convert(&alarm_message).is_empty()); + + let non_alarm_topic = "tedge/measurements"; + let non_alarm_payload = r#"{"temp": 1}"#; + let non_alarm_message = + Message::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload); + + // But non-alarms are converted immediately, even during the sync phase + assert!(!converter.convert(&non_alarm_message).is_empty()); + + let internal_alarm_topic = "c8y-internal/alarms/major/pressure_alarm"; + let internal_alarm_payload = r#"{ "message": "Temperature very high" }"#; + let internal_alarm_message = Message::new( + &Topic::new_unchecked(internal_alarm_topic), + internal_alarm_payload, + ); + + // During the sync phase, internal alarms are not converted, but only cached to be synced later + assert!(converter.convert(&internal_alarm_message).is_empty()); + + // When sync phase is complete, all pending alarms are returned + let sync_messages = converter.sync_messages(); + assert_eq!(sync_messages.len(), 2); + + // The first message will be clear alarm message for pressure_alarm + let alarm_message = sync_messages.get(0).unwrap(); + assert_eq!( + alarm_message.topic.name, + "tedge/alarms/major/pressure_alarm" + ); + assert_eq!(alarm_message.payload_bytes().len(), 0); //Clear messages are empty messages + + // The second message will be the temperature_alarm + let alarm_message = sync_messages.get(1).unwrap(); + assert_eq!(alarm_message.topic.name, alarm_topic); + assert_eq!(alarm_message.payload_str().unwrap(), alarm_payload); + + // After the sync phase, the conversion of both non-alarms as well as alarms are done immediately + assert!(!converter.convert(&alarm_message).is_empty()); + assert!(!converter.convert(&non_alarm_message).is_empty()); + + // But, even after the sync phase, internal alarms are not converted and just ignored, as they are purely internal + assert!(converter.convert(&internal_alarm_message).is_empty()); + } } diff --git a/crates/core/tedge_mapper/src/c8y_mapper.rs b/crates/core/tedge_mapper/src/c8y_mapper.rs index cf38dee9..9bdac114 100644 --- a/crates/core/tedge_mapper/src/c8y_mapper.rs +++ b/crates/core/tedge_mapper/src/c8y_mapper.rs @@ -3,7 +3,9 @@ use crate::component::TEdgeComponent; use crate::mapper::*; use crate::size_threshold::SizeThreshold; use async_trait::async_trait; -use tedge_config::{ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, TEdgeConfig}; +use tedge_config::{ + ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig, +}; use tracing::{info_span, Instrument}; const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y"; @@ -23,6 +25,7 @@ impl TEdgeComponent for CumulocityMapper { let device_name = tedge_config.query(DeviceIdSetting)?; let device_type = tedge_config.query(DeviceTypeSetting)?; + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let converter = Box::new(CumulocityConverter::new( size_threshold, @@ -30,7 +33,7 @@ impl TEdgeComponent for CumulocityMapper { device_type, )); - let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, &tedge_config, converter).await?; + let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?; mapper .run() diff --git a/crates/core/tedge_mapper/src/converter.rs b/crates/core/tedge_mapper/src/converter.rs index 071a9b3a..6b86486d 100644 --- a/crates/core/tedge_mapper/src/converter.rs +++ b/crates/core/tedge_mapper/src/converter.rs @@ -42,6 +42,8 @@ pub trait Converter: Send + Sync { Ok(vec![]) } + /// This function will be the first method that's called on the converter after it's instantiated. + /// Return any initialization messages that must be processed before the converter starts converting regular messages. fn init_messages(&self) -> Vec<Message> { match self.try_init_messages() { Ok(messages) => messages, @@ -54,6 +56,15 @@ pub trait Converter: Send + Sync { } } } + + /// This function will be the called after a brief period(sync window) after the converter starts converting messages. + /// This gives the converter an opportunity to process the messages received during the sync window and + /// produce any additional messages as "sync messages" as a result of this processing. + /// These sync messages will be processed by the mapper right after the sync window before it starts converting further messages. + /// Typically used to do some processing on all messages received on mapper startup and derive additional messages out of those. + fn sync_messages(&mut self) -> Vec<Message> { + vec![] + } } pub fn make_valid_topic_or_panic(topic_name: &str) -> Topic { diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs index 0d524627..b28cb0ee 100644 --- a/crates/core/tedge_mapper/src/main.rs +++ b/crates/core/tedge_mapper/src/main.rs @@ -24,6 +24,9 @@ mod operations; mod size_threshold; mod sm_c8y_mapper; +#[cfg(test)] +mod tests; + fn lookup_component(component_name: &MapperName) -> Box<dyn TEdgeComponent> { match component_name { MapperName::Az => Box::new(AzureMapper::new()), diff --git a/crates/core/tedge_mapper/src/mapper.rs b/crates/core/tedge_mapper/src/mapper.rs index 83ce47e1..e38325f3 100644 --- a/crates/core/tedge_mapper/src/mapper.rs +++ b/crates/core/tedge_mapper/src/mapper.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::converter::*; use crate::error::*; @@ -5,12 +7,13 @@ use mqtt_channel::{ Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver, UnboundedSender, }; -use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; use tracing::{error, info, instrument}; +const SYNC_WINDOW: Duration = Duration::from_secs(3); + pub async fn create_mapper<'a>( app_name: &'a str, - tedge_config: &'a TEdgeConfig, + mqtt_port: u16, converter: Box<dyn Converter<Error = ConversionError>>, ) -> Result<Mapper, anyhow::Error> { info!("{} starting", app_name); @@ -18,7 +21,7 @@ pub async fn create_mapper<'a>( let mapper_config = converter.get_mapper_config(); let mqtt_client = Connection::new(&mqtt_config( app_name, - tedge_config, + mqtt_port, mapper_config.in_topic_filter.clone().into(), )?) .await?; @@ -34,11 +37,11 @@ pub async fn create_mapper<'a>( pub(crate) fn mqtt_config( name: &str, - tedge_config: &TEdgeConfig, + port: u16, topics: TopicFilter, ) -> Result<mqtt_channel::Config, anyhow::Error> { Ok(mqtt_channel::Config::default() - .with_port(tedge_config.query(MqttPortSetting)?.into()) + .with_port(port) .with_session_name(name) .with_subscriptions(topics) .with_max_packet_size(10 * 1024 * 1024)) @@ -85,14 +88,34 @@ impl Mapper { let _ = self.output.send(init_message).await; } - while let Some(message) = &mut self.input.next().await { - let converted_messages = self.converter.convert(&message); - for converted_message in converted_messages.into_iter() { - let _ = self.output.send(converted_message).await; + // Start the sync phase here and process messages until the sync window times out + let _ = tokio::time::timeout(SYNC_WINDOW, async { + while let Some(message) = self.input.next().await { + self.process_message(message).await; } + }) + .await; + + // Once the sync phase is complete, retrieve all sync messages from the converter and process them + let sync_messages = self.converter.sync_messages(); + for message in sync_messages { + self.process_message(message).await; + } + + // Continue processing messages after the sync period + while let Some(message) = self.input.next().await { + self.process_message(message).await; } + Ok(()) } + + async fn process_message(&mut self, message: Message) { + let converted_messages = self.converter.convert(&message); + for converted_message in converted_messages.into_iter() { + let _ = self.output.send(converted_message).await; + } + } } #[cfg(test)] diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index 3637fe56..88077a09 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::{convert::TryInto, process::Stdio}; -use tedge_config::TEdgeConfig; +use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; use tracing::{debug, error, info, instrument}; const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; @@ -125,7 +125,9 @@ where operations: Operations, ) -> Result<Self, anyhow::Error> { let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; - let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, &tedge_config, mqtt_topic)?; + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + + let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, mqtt_port, mqtt_topic)?; let client = Connection::new(&mqtt_config).await?; Ok(Self { diff --git a/crates/core/tedge_mapper/src/tests.rs b/crates/core/tedge_mapper/src/tests.rs new file mode 100644 index 00000000..35da6fe5 --- /dev/null +++ b/crates/core/tedge_mapper/src/tests.rs @@ -0,0 +1,187 @@ +use std::time::Duration; + +use mqtt_tests::with_timeout::{Maybe, WithTimeout}; +use serial_test::serial; +use tokio::task::JoinHandle; + +use crate::{ + c8y_converter::CumulocityConverter, mapper::create_mapper, size_threshold::SizeThreshold, +}; + +const ALARM_SYNC_TIMEOUT_MS: Duration = Duration::from_millis(5000); + +#[tokio::test] +#[serial] +async fn c8y_mapper_alarm_mapping_to_smartrest() { + let broker = mqtt_tests::test_mqtt_broker(); + + let mut messages = broker.messages_published_on("c8y/s/us").await; + + // Start the C8Y Mapper + let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap(); + + let _ = broker + .publish_with_opts( + "tedge/alarms/major/temperature_alarm", + r#"{ "message": "Temperature high" }"#, + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + let mut msg = messages + .recv() + .with_timeout(ALARM_SYNC_TIMEOUT_MS) + .await + .expect_or("No message received before timeout"); + dbg!(&msg); + + // The first message could be SmartREST 114 for supported operations + if msg.contains("114") { + // Fetch the next message which should be the alarm + msg = messages + .recv() + .with_timeout(ALARM_SYNC_TIMEOUT_MS) + .await + .expect_or("No message received before timeout"); + } + + // Expect converted temperature alarm message + dbg!(&msg); + assert!(msg.contains("302,temperature_alarm")); + + //Clear the previously published alarm + let _ = broker + .publish_with_opts( + "tedge/alarms/major/temperature_alarm", + "", + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + c8y_mapper.abort(); +} + +#[tokio::test] +#[serial] +async fn c8y_mapper_syncs_pending_alarms_on_startup() { + let broker = mqtt_tests::test_mqtt_broker(); + + let mut messages = broker.messages_published_on("c8y/s/us").await; + + // Start the C8Y Mapper + let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap(); + + let _ = broker + .publish_with_opts( + "tedge/alarms/critical/temperature_alarm", + r#"{ "message": "Temperature very high" }"#, + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + let mut msg = messages + .recv() + .with_timeout(ALARM_SYNC_TIMEOUT_MS) + .await + .expect_or("No message received before timeout"); + dbg!(&msg); + + // The first message could be SmartREST 114 for supported operations + if msg.contains("114") { + // Fetch the next message which should be the alarm + msg = messages + .recv() + .with_timeout(ALARM_SYNC_TIMEOUT_MS) + .await + .expect_or("No message received before timeout"); + dbg!(&msg); + } + + // Expect converted temperature alarm message + assert!(&msg.contains("301,temperature_alarm")); + + c8y_mapper.abort(); + + //Publish a new alarm while the mapper is down + let _ = broker + .publish_with_opts( + "tedge/alarms/critical/pressure_alarm", + r#"{ "message": "Pressure very high" }"#, + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + // Ignored until the rumqttd broker bug that doesn't handle empty retained messages + //Clear the existing alarm while the mapper is down + // let _ = broker + // .publish_with_opts( + // "tedge/alarms/critical/temperature_alarm", + // "", + // mqtt_channel::QoS::AtLeastOnce, + // true, + // ) + // .await + // .unwrap(); + + // Restart the C8Y Mapper + let _ = start_c8y_mapper(broker.port).await.unwrap(); + + let mut msg = messages + .recv() + .with_timeout(ALARM_SYNC_TIMEOUT_MS) + .await + .expect_or("No message received before timeout"); + dbg!(&msg); + + // The first message could be SmartREST 114 for supported operations + if msg.contains("114") { + // Fetch the next message which should be the alarm + msg = messages + .recv() + .with_timeout(ALARM_SYNC_TIMEOUT_MS) + .await + .expect_or("No message received before timeout"); + dbg!(&msg); + } + + // Ignored until the rumqttd broker bug that doesn't handle empty retained messages + // Expect the previously missed clear temperature alarm message + // let msg = messages + // .recv() + // .with_timeout(ALARM_SYNC_TIMEOUT_MS) + // .await + // .expect_or("No message received after a second."); + // dbg!(&msg); + // assert!(&msg.contains("306,temperature_alarm")); + + // Expect the new pressure alarm message + assert!(&msg.contains("301,pressure_alarm")); + + c8y_mapper.abort(); +} + +async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> { + let device_name = "test-device".into(); + let device_type = "test-device-type".into(); + let size_threshold = SizeThreshold(16 * 1024); + let converter = Box::new(CumulocityConverter::new( + size_threshold, + device_name, + device_type, + )); + + let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?; + + let mapper_task = tokio::spawn(async move { + let _ = mapper.run().await; + }); + Ok(mapper_task) +} diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs index 7efa73a5..b4fe0fa4 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs @@ -54,10 +54,16 @@ pub async fn assert_received<T>( /// Publish a message /// /// Return only when the message has been acknowledged. -pub async fn publish(mqtt_port: u16, topic: &str, payload: &str) -> Result<(), anyhow::Error> { +pub async fn publish( + mqtt_port: u16, + topic: &str, + payload: &str, |