summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--crates/common/download/src/download.rs9
-rw-r--r--crates/core/tedge_mapper/src/az_mapper.rs5
-rw-r--r--crates/core/tedge_mapper/src/c8y_converter.rs246
-rw-r--r--crates/core/tedge_mapper/src/c8y_mapper.rs7
-rw-r--r--crates/core/tedge_mapper/src/converter.rs11
-rw-r--r--crates/core/tedge_mapper/src/main.rs3
-rw-r--r--crates/core/tedge_mapper/src/mapper.rs41
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs6
-rw-r--r--crates/core/tedge_mapper/src/tests.rs187
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_client.rs17
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_server.rs13
11 files changed, 498 insertions, 47 deletions
diff --git a/crates/common/download/src/download.rs b/crates/common/download/src/download.rs
index 92d7c168..472e11e0 100644
--- a/crates/common/download/src/download.rs
+++ b/crates/common/download/src/download.rs
@@ -1,9 +1,8 @@
use crate::error::DownloadError;
use backoff::{future::retry, ExponentialBackoff};
-use nix::{
- fcntl::{fallocate, FallocateFlags},
- sys::statvfs,
-};
+#[cfg(target_os = "linux")]
+use nix::fcntl::{fallocate, FallocateFlags};
+use nix::sys::statvfs;
use serde::{Deserialize, Serialize};
use std::{
fs::File,
@@ -177,6 +176,7 @@ fn create_file_and_try_pre_allocate_space(
return Err(DownloadError::InsufficientSpace);
}
// Reserve diskspace
+ #[cfg(target_os = "linux")]
let _ = fallocate(
file.as_raw_fd(),
FallocateFlags::empty(),
@@ -239,6 +239,7 @@ mod tests {
Ok(())
}
+ #[cfg(target_os = "linux")]
#[tokio::test]
async fn downloader_download_with_content_length_larger_than_usable_disk_space(
) -> anyhow::Result<()> {
diff --git a/crates/core/tedge_mapper/src/az_mapper.rs b/crates/core/tedge_mapper/src/az_mapper.rs
index 4d2e2af2..57a94b66 100644
--- a/crates/core/tedge_mapper/src/az_mapper.rs
+++ b/crates/core/tedge_mapper/src/az_mapper.rs
@@ -4,8 +4,8 @@ use crate::mapper::*;
use crate::size_threshold::SizeThreshold;
use async_trait::async_trait;
use clock::WallClock;
-use tedge_config::ConfigSettingAccessor;
use tedge_config::{AzureMapperTimestamp, TEdgeConfig};
+use tedge_config::{ConfigSettingAccessor, MqttPortSetting};
use tracing::{info_span, Instrument};
const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
@@ -22,12 +22,13 @@ impl AzureMapper {
impl TEdgeComponent for AzureMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let clock = Box::new(WallClock);
let size_threshold = SizeThreshold(255 * 1024);
let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold));
- let mut mapper = create_mapper(AZURE_MAPPER_NAME, &tedge_config, converter).await?;
+ let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_port, converter).await?;
mapper
.run()
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs
index 40c47f5f..ccc8a975 100644
--- a/crates/core/tedge_mapper/src/c8y_converter.rs
+++ b/crates/core/tedge_mapper/src/c8y_converter.rs
@@ -6,7 +6,8 @@ use c8y_smartrest::alarm;
use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSupportedOperations};
use c8y_translator::json;
use mqtt_channel::{Message, Topic};
-use std::collections::HashSet;
+use std::collections::hash_map::Entry;
+use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::Read;
use std::path::Path;
@@ -18,6 +19,8 @@ const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.jso
const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations";
const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
+const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
+const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
pub struct CumulocityConverter {
pub(crate) size_threshold: SizeThreshold,
@@ -25,32 +28,38 @@ pub struct CumulocityConverter {
pub(crate) mapper_config: MapperConfig,
device_name: String,
device_type: String,
+
+ alarm_converter: AlarmConverter,
}
impl CumulocityConverter {
pub fn new(size_threshold: SizeThreshold, device_name: String, device_type: String) -> Self {
- let mut topic_fiter = make_valid_topic_filter_or_panic("tedge/measurements");
- let () = topic_fiter
- .add("tedge/measurements/+")
- .expect("invalid measurement topic filter");
- let () = topic_fiter
- .add("tedge/alarms/+/+")
- .expect("invalid alarm topic filter");
+ let topics = vec![
+ "tedge/measurements",
+ "tedge/measurements/+",
+ "tedge/alarms/+/+",
+ "c8y-internal/alarms/+/+",
+ ]
+ .try_into()
+ .expect("topics that mapper should subscribe to");
let mapper_config = MapperConfig {
- in_topic_filter: topic_fiter,
+ in_topic_filter: topics,
out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"),
errors_topic: make_valid_topic_or_panic("tedge/errors"),
};
let children: HashSet<String> = HashSet::new();
+ let alarm_converter = AlarmConverter::new();
+
CumulocityConverter {
size_threshold,
children,
mapper_config,
device_name,
device_type,
+ alarm_converter,
}
}
@@ -90,17 +99,6 @@ impl CumulocityConverter {
}
Ok(vec)
}
-
- fn try_convert_alarm(&self, input: &Message) -> Result<Vec<Message>, ConversionError> {
- let c8y_alarm_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- let mut vec: Vec<Message> = Vec::new();
-
- let tedge_alarm = ThinEdgeAlarm::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?;
- vec.push(Message::new(&c8y_alarm_topic, smartrest_alarm));
-
- Ok(vec)
- }
}
impl Converter for CumulocityConverter {
@@ -114,8 +112,11 @@ impl Converter for CumulocityConverter {
let () = self.size_threshold.validate(input.payload_str()?)?;
if input.topic.name.starts_with("tedge/measurement") {
self.try_convert_measurement(input)
- } else if input.topic.name.starts_with("tedge/alarms") {
- self.try_convert_alarm(input)
+ } else if input.topic.name.starts_with(TEDGE_ALARMS_TOPIC) {
+ self.alarm_converter.try_convert_alarm(input)
+ } else if input.topic.name.starts_with(INTERNAL_ALARMS_TOPIC) {
+ self.alarm_converter.process_internal_alarm(input);
+ Ok(vec![])
} else {
Err(ConversionError::UnsupportedTopic(input.topic.name.clone()))
}
@@ -135,6 +136,149 @@ impl Converter for CumulocityConverter {
inventory_fragments_message,
])
}
+
+ fn sync_messages(&mut self) -> Vec<Message> {
+ let sync_messages: Vec<Message> = self.alarm_converter.sync();
+ self.alarm_converter = AlarmConverter::Synced;
+ sync_messages
+ }
+}
+
+enum AlarmConverter {
+ Syncing {
+ pending_alarms_map: HashMap<String, Message>,
+ old_alarms_map: HashMap<String, Message>,
+ },
+ Synced,
+}
+
+impl AlarmConverter {
+ fn new() -> Self {
+ AlarmConverter::Syncing {
+ old_alarms_map: HashMap::new(),
+ pending_alarms_map: HashMap::new(),
+ }
+ }
+
+ fn try_convert_alarm(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
+ let mut vec: Vec<Message> = Vec::new();
+
+ match self {
+ Self::Syncing {
+ pending_alarms_map,
+ old_alarms_map: _,
+ } => {
+ let alarm_id = input
+ .topic
+ .name
+ .strip_prefix(TEDGE_ALARMS_TOPIC)
+ .expect("Expected tedge/alarms prefix")
+ .to_string();
+ pending_alarms_map.insert(alarm_id.clone(), input.clone());
+ }
+ Self::Synced => {
+ //Regular conversion phase
+ let tedge_alarm =
+ ThinEdgeAlarm::try_from(input.topic.name.as_str(), input.payload_str()?)?;
+ let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?;
+ let c8y_alarm_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+ vec.push(Message::new(&c8y_alarm_topic, smartrest_alarm));
+
+ // Persist a copy of the alarm to an internal topic for reconciliation on next restart
+ let alarm_id = input
+ .topic
+ .name
+ .strip_prefix(TEDGE_ALARMS_TOPIC)
+ .expect("Expected tedge/alarms prefix")
+ .to_string();
+ let topic =
+ Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str());
+ let alarm_copy =
+ Message::new(&topic, input.payload_bytes().to_owned()).with_retain();
+ vec.push(alarm_copy);
+ }
+ }
+
+ Ok(vec)
+ }
+
+ fn process_internal_alarm(&mut self, input: &Message) {
+ match self {
+ Self::Syncing {
+ pending_alarms_map: _,
+ old_alarms_map,
+ } => {
+ let alarm_id = input
+ .topic
+ .name
+ .strip_prefix(INTERNAL_ALARMS_TOPIC)
+ .expect("Expected c8y-internal/alarms prefix")
+ .to_string();
+ old_alarms_map.insert(alarm_id, input.clone());
+ }
+ Self::Synced => {
+ // Ignore
+ }
+ }
+ }
+
+ /// Detect and sync any alarms that were raised/cleared while this mapper process was not running.
+ /// For this syncing logic, converter maintains an internal journal of all the alarms processed by this mapper,
+ /// which is compared against all the live alarms seen by the mapper on every startup.
+ ///
+ /// All the live alarms are received from tedge/alarms topic on startup.
+ /// Similarly, all the previously processed alarms are received from c8y-internal/alarms topic.
+ /// Sync detects the difference between these two sets, which are the missed messages.
+ ///
+ /// An alarm that is present in c8y-internal/alarms, but not in tedge/alarms topic
+ /// is assumed to have been cleared while the mapper process was down.
+ /// Similarly, an alarm that is present in tedge/alarms, but not in c8y-internal/alarms topic
+ /// is one that was raised while the mapper process was down.
+ /// An alarm present in both, if their payload is the same, is one that was already processed before the restart
+ /// and hence can be ignored during sync.
+ fn sync(&mut self) -> Vec<Message> {
+ let mut sync_messages: Vec<Message> = Vec::new();
+
+ match self {
+ Self::Syncing {
+ pending_alarms_map,
+ old_alarms_map,
+ } => {
+ // Compare the differences between alarms in tedge/alarms topic to the ones in c8y-internal/alarms topic
+ old_alarms_map.drain().for_each(|(alarm_id, old_message)| {
+ match pending_alarms_map.entry(alarm_id.clone()) {
+ // If an alarm that is present in c8y-internal/alarms topic is not present in tedge/alarms topic,
+ // it is assumed to have been cleared while the mapper process was down
+ Entry::Vacant(_) => {
+ let topic = Topic::new_unchecked(
+ format!("{}{}", TEDGE_ALARMS_TOPIC, alarm_id).as_str(),
+ );
+ let message = Message::new(&topic, vec![]).with_retain();
+ // Recreate the clear alarm message and add it to the pending alarms list to be processed later
+ sync_messages.push(message);
+ }
+
+ // If the payload of a message received from tedge/alarms is same as one received from c8y-internal/alarms,
+ // it is assumed to be one that was already processed earlier and hence removed from the pending alarms list.
+ Entry::Occupied(entry) => {
+ if entry.get().payload_bytes() == old_message.payload_bytes() {
+ entry.remove();
+ }
+ }
+ }
+ });
+
+ pending_alarms_map
+ .drain()
+ .for_each(|(_key, message)| sync_messages.push(message));
+ }
+ Self::Synced => {
+ // Ignore
+ }
+ }
+
+ sync_messages
+ }
}
fn create_device_data_fragments(
@@ -404,4 +548,62 @@ mod test {
}
buffer
}
+
+ #[test]
+ fn test_sync_alarms() {
+ let size_threshold = SizeThreshold(16 * 1024);
+ let device_name = String::from("test");
+ let device_type = String::from("test_type");
+
+ let mut converter = CumulocityConverter::new(size_threshold, device_name, device_type);
+
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm";
+ let alarm_payload = r#"{ "message": "Temperature very high" }"#;
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ // During the sync phase, alarms are not converted immediately, but only cached to be synced later
+ assert!(converter.convert(&alarm_message).is_empty());
+
+ let non_alarm_topic = "tedge/measurements";
+ let non_alarm_payload = r#"{"temp": 1}"#;
+ let non_alarm_message =
+ Message::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload);
+
+ // But non-alarms are converted immediately, even during the sync phase
+ assert!(!converter.convert(&non_alarm_message).is_empty());
+
+ let internal_alarm_topic = "c8y-internal/alarms/major/pressure_alarm";
+ let internal_alarm_payload = r#"{ "message": "Temperature very high" }"#;
+ let internal_alarm_message = Message::new(
+ &Topic::new_unchecked(internal_alarm_topic),
+ internal_alarm_payload,
+ );
+
+ // During the sync phase, internal alarms are not converted, but only cached to be synced later
+ assert!(converter.convert(&internal_alarm_message).is_empty());
+
+ // When sync phase is complete, all pending alarms are returned
+ let sync_messages = converter.sync_messages();
+ assert_eq!(sync_messages.len(), 2);
+
+ // The first message will be clear alarm message for pressure_alarm
+ let alarm_message = sync_messages.get(0).unwrap();
+ assert_eq!(
+ alarm_message.topic.name,
+ "tedge/alarms/major/pressure_alarm"
+ );
+ assert_eq!(alarm_message.payload_bytes().len(), 0); //Clear messages are empty messages
+
+ // The second message will be the temperature_alarm
+ let alarm_message = sync_messages.get(1).unwrap();
+ assert_eq!(alarm_message.topic.name, alarm_topic);
+ assert_eq!(alarm_message.payload_str().unwrap(), alarm_payload);
+
+ // After the sync phase, the conversion of both non-alarms as well as alarms are done immediately
+ assert!(!converter.convert(&alarm_message).is_empty());
+ assert!(!converter.convert(&non_alarm_message).is_empty());
+
+ // But, even after the sync phase, internal alarms are not converted and just ignored, as they are purely internal
+ assert!(converter.convert(&internal_alarm_message).is_empty());
+ }
}
diff --git a/crates/core/tedge_mapper/src/c8y_mapper.rs b/crates/core/tedge_mapper/src/c8y_mapper.rs
index cf38dee9..9bdac114 100644
--- a/crates/core/tedge_mapper/src/c8y_mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y_mapper.rs
@@ -3,7 +3,9 @@ use crate::component::TEdgeComponent;
use crate::mapper::*;
use crate::size_threshold::SizeThreshold;
use async_trait::async_trait;
-use tedge_config::{ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, TEdgeConfig};
+use tedge_config::{
+ ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig,
+};
use tracing::{info_span, Instrument};
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
@@ -23,6 +25,7 @@ impl TEdgeComponent for CumulocityMapper {
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let converter = Box::new(CumulocityConverter::new(
size_threshold,
@@ -30,7 +33,7 @@ impl TEdgeComponent for CumulocityMapper {
device_type,
));
- let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, &tedge_config, converter).await?;
+ let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?;
mapper
.run()
diff --git a/crates/core/tedge_mapper/src/converter.rs b/crates/core/tedge_mapper/src/converter.rs
index 071a9b3a..6b86486d 100644
--- a/crates/core/tedge_mapper/src/converter.rs
+++ b/crates/core/tedge_mapper/src/converter.rs
@@ -42,6 +42,8 @@ pub trait Converter: Send + Sync {
Ok(vec![])
}
+ /// This function will be the first method that's called on the converter after it's instantiated.
+ /// Return any initialization messages that must be processed before the converter starts converting regular messages.
fn init_messages(&self) -> Vec<Message> {
match self.try_init_messages() {
Ok(messages) => messages,
@@ -54,6 +56,15 @@ pub trait Converter: Send + Sync {
}
}
}
+
+ /// This function will be the called after a brief period(sync window) after the converter starts converting messages.
+ /// This gives the converter an opportunity to process the messages received during the sync window and
+ /// produce any additional messages as "sync messages" as a result of this processing.
+ /// These sync messages will be processed by the mapper right after the sync window before it starts converting further messages.
+ /// Typically used to do some processing on all messages received on mapper startup and derive additional messages out of those.
+ fn sync_messages(&mut self) -> Vec<Message> {
+ vec![]
+ }
}
pub fn make_valid_topic_or_panic(topic_name: &str) -> Topic {
diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs
index 0d524627..b28cb0ee 100644
--- a/crates/core/tedge_mapper/src/main.rs
+++ b/crates/core/tedge_mapper/src/main.rs
@@ -24,6 +24,9 @@ mod operations;
mod size_threshold;
mod sm_c8y_mapper;
+#[cfg(test)]
+mod tests;
+
fn lookup_component(component_name: &MapperName) -> Box<dyn TEdgeComponent> {
match component_name {
MapperName::Az => Box::new(AzureMapper::new()),
diff --git a/crates/core/tedge_mapper/src/mapper.rs b/crates/core/tedge_mapper/src/mapper.rs
index 83ce47e1..e38325f3 100644
--- a/crates/core/tedge_mapper/src/mapper.rs
+++ b/crates/core/tedge_mapper/src/mapper.rs
@@ -1,3 +1,5 @@
+use std::time::Duration;
+
use crate::converter::*;
use crate::error::*;
@@ -5,12 +7,13 @@ use mqtt_channel::{
Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver,
UnboundedSender,
};
-use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
use tracing::{error, info, instrument};
+const SYNC_WINDOW: Duration = Duration::from_secs(3);
+
pub async fn create_mapper<'a>(
app_name: &'a str,
- tedge_config: &'a TEdgeConfig,
+ mqtt_port: u16,
converter: Box<dyn Converter<Error = ConversionError>>,
) -> Result<Mapper, anyhow::Error> {
info!("{} starting", app_name);
@@ -18,7 +21,7 @@ pub async fn create_mapper<'a>(
let mapper_config = converter.get_mapper_config();
let mqtt_client = Connection::new(&mqtt_config(
app_name,
- tedge_config,
+ mqtt_port,
mapper_config.in_topic_filter.clone().into(),
)?)
.await?;
@@ -34,11 +37,11 @@ pub async fn create_mapper<'a>(
pub(crate) fn mqtt_config(
name: &str,
- tedge_config: &TEdgeConfig,
+ port: u16,
topics: TopicFilter,
) -> Result<mqtt_channel::Config, anyhow::Error> {
Ok(mqtt_channel::Config::default()
- .with_port(tedge_config.query(MqttPortSetting)?.into())
+ .with_port(port)
.with_session_name(name)
.with_subscriptions(topics)
.with_max_packet_size(10 * 1024 * 1024))
@@ -85,14 +88,34 @@ impl Mapper {
let _ = self.output.send(init_message).await;
}
- while let Some(message) = &mut self.input.next().await {
- let converted_messages = self.converter.convert(&message);
- for converted_message in converted_messages.into_iter() {
- let _ = self.output.send(converted_message).await;
+ // Start the sync phase here and process messages until the sync window times out
+ let _ = tokio::time::timeout(SYNC_WINDOW, async {
+ while let Some(message) = self.input.next().await {
+ self.process_message(message).await;
}
+ })
+ .await;
+
+ // Once the sync phase is complete, retrieve all sync messages from the converter and process them
+ let sync_messages = self.converter.sync_messages();
+ for message in sync_messages {
+ self.process_message(message).await;
+ }
+
+ // Continue processing messages after the sync period
+ while let Some(message) = self.input.next().await {
+ self.process_message(message).await;
}
+
Ok(())
}
+
+ async fn process_message(&mut self, message: Message) {
+ let converted_messages = self.converter.convert(&message);
+ for converted_message in converted_messages.into_iter() {
+ let _ = self.output.send(converted_message).await;
+ }
+ }
}
#[cfg(test)]
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index 3637fe56..88077a09 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{convert::TryInto, process::Stdio};
-use tedge_config::TEdgeConfig;
+use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
use tracing::{debug, error, info, instrument};
const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
@@ -125,7 +125,9 @@ where
operations: Operations,
) -> Result<Self, anyhow::Error> {
let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
- let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, &tedge_config, mqtt_topic)?;
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+
+ let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, mqtt_port, mqtt_topic)?;
let client = Connection::new(&mqtt_config).await?;
Ok(Self {
diff --git a/crates/core/tedge_mapper/src/tests.rs b/crates/core/tedge_mapper/src/tests.rs
new file mode 100644
index 00000000..35da6fe5
--- /dev/null
+++ b/crates/core/tedge_mapper/src/tests.rs
@@ -0,0 +1,187 @@
+use std::time::Duration;
+
+use mqtt_tests::with_timeout::{Maybe, WithTimeout};
+use serial_test::serial;
+use tokio::task::JoinHandle;
+
+use crate::{
+ c8y_converter::CumulocityConverter, mapper::create_mapper, size_threshold::SizeThreshold,
+};
+
+const ALARM_SYNC_TIMEOUT_MS: Duration = Duration::from_millis(5000);
+
+#[tokio::test]
+#[serial]
+async fn c8y_mapper_alarm_mapping_to_smartrest() {
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker.messages_published_on("c8y/s/us").await;
+
+ // Start the C8Y Mapper
+ let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/major/temperature_alarm",
+ r#"{ "message": "Temperature high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ let mut msg = messages
+ .recv()
+ .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ .await
+ .expect_or("No message received before timeout");
+ dbg!(&msg);
+
+ // The first message could be SmartREST 114 for supported operations
+ if msg.contains("114") {
+ // Fetch the next message which should be the alarm
+ msg = messages
+ .recv()
+ .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ .await
+ .expect_or("No message received before timeout");
+ }
+
+ // Expect converted temperature alarm message
+ dbg!(&msg);
+ assert!(msg.contains("302,temperature_alarm"));
+
+ //Clear the previously published alarm
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/major/temperature_alarm",
+ "",
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ c8y_mapper.abort();
+}
+
+#[tokio::test]
+#[serial]
+async fn c8y_mapper_syncs_pending_alarms_on_startup() {
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker.messages_published_on("c8y/s/us").await;
+
+ // Start the C8Y Mapper
+ let c8y_mapper = start_c8y_mapper(broker.port).await.unwrap();
+
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/temperature_alarm",
+ r#"{ "message": "Temperature very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ let mut msg = messages
+ .recv()
+ .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ .await
+ .expect_or("No message received before timeout");
+ dbg!(&msg);
+
+ // The first message could be SmartREST 114 for supported operations
+ if msg.contains("114") {
+ // Fetch the next message which should be the alarm
+ msg = messages
+ .recv()
+ .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ .await
+ .expect_or("No message received before timeout");
+ dbg!(&msg);
+ }
+
+ // Expect converted temperature alarm message
+ assert!(&msg.contains("301,temperature_alarm"));
+
+ c8y_mapper.abort();
+
+ //Publish a new alarm while the mapper is down
+ let _ = broker
+ .publish_with_opts(
+ "tedge/alarms/critical/pressure_alarm",
+ r#"{ "message": "Pressure very high" }"#,
+ mqtt_channel::QoS::AtLeastOnce,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Ignored until the rumqttd broker bug that doesn't handle empty retained messages
+ //Clear the existing alarm while the mapper is down
+ // let _ = broker
+ // .publish_with_opts(
+ // "tedge/alarms/critical/temperature_alarm",
+ // "",
+ // mqtt_channel::QoS::AtLeastOnce,
+ // true,
+ // )
+ // .await
+ // .unwrap();
+
+ // Restart the C8Y Mapper
+ let _ = start_c8y_mapper(broker.port).await.unwrap();
+
+ let mut msg = messages
+ .recv()
+ .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ .await
+ .expect_or("No message received before timeout");
+ dbg!(&msg);
+
+ // The first message could be SmartREST 114 for supported operations
+ if msg.contains("114") {
+ // Fetch the next message which should be the alarm
+ msg = messages
+ .recv()
+ .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ .await
+ .expect_or("No message received before timeout");
+ dbg!(&msg);
+ }
+
+ // Ignored until the rumqttd broker bug that doesn't handle empty retained messages
+ // Expect the previously missed clear temperature alarm message
+ // let msg = messages
+ // .recv()
+ // .with_timeout(ALARM_SYNC_TIMEOUT_MS)
+ // .await
+ // .expect_or("No message received after a second.");
+ // dbg!(&msg);
+ // assert!(&msg.contains("306,temperature_alarm"));
+
+ // Expect the new pressure alarm message
+ assert!(&msg.contains("301,pressure_alarm"));
+
+ c8y_mapper.abort();
+}
+
+async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
+ let device_name = "test-device".into();
+ let device_type = "test-device-type".into();
+ let size_threshold = SizeThreshold(16 * 1024);
+ let converter = Box::new(CumulocityConverter::new(
+ size_threshold,
+ device_name,
+ device_type,
+ ));
+
+ let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?;
+
+ let mapper_task = tokio::spawn(async move {
+ let _ = mapper.run().await;
+ });
+ Ok(mapper_task)
+}
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
index 7efa73a5..b4fe0fa4 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
@@ -54,10 +54,16 @@ pub async fn assert_received<T>(
/// Publish a message
///
/// Return only when the message has been acknowledged.
-pub async fn publish(mqtt_port: u16, topic: &str, payload: &str) -> Result<(), anyhow::Error> {
+pub async fn publish(
+ mqtt_port: u16,
+ topic: &str,
+ payload: &str,
+ qos: QoS,
+ retain: bool,
+) -> Result<(), anyhow::Error> {
let mut con = TestCon::new(mqtt_port);
- con.publish(topic, QoS::AtLeastOnce, payload).await
+ con.publish(topic, qos, retain, payload).await
}
/// Publish the `pub_message` on the `pub_topic` only when ready to receive a message on `sub_topic`.
@@ -77,7 +83,7 @@ pub async fn wait_for_response_on_publish(
let mut con = TestCon::new(mqtt_port);
con.subscribe(sub_topic, QoS::AtLeastOnce).await.ok()?;
- con.publish(pub_topic, QoS::AtLeastOnce, pub_message)
+ con.publish(pub_topic, QoS::AtLeastOnce, false, pub_message)
.await
.ok()?;
match tokio::time::timeout(timeout, con.next_message()).await {
@@ -100,7 +106,7 @@ where
if let Ok(message) = con.next_topic_payload().await {
dbg!(&message);
for (topic, response) in func(message).iter() {
- let _ = con.publish(topic, QoS::AtLeastOnce, response).await;
+ let _ = con.publish(topic, QoS::AtLeastOnce, false, response).await;
}
}
}
@@ -143,9 +149,10 @@ impl TestCon {
&mut self,
topic: &str,
qos: QoS,
+ retain: bool,
payload: &str,