summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-08-30 12:07:28 +0530
committerGitHub <noreply@github.com>2022-08-30 12:07:28 +0530
commit61882a9660170cbbd5b9e8ad010102d8ffd91580 (patch)
tree187dbc89c98658b6f08de0a37290ab3b666be716
parent26d9f9a5b71ee137714eb095ac1537da71cdef98 (diff)
Move alarms converter code to a separate file (#1366)
* Move c8y alarms converter code to a separate file Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
-rw-r--r--crates/core/tedge_mapper/src/c8y/alarm_converter.rs165
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs161
-rw-r--r--crates/core/tedge_mapper/src/c8y/mod.rs1
3 files changed, 169 insertions, 158 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/alarm_converter.rs b/crates/core/tedge_mapper/src/c8y/alarm_converter.rs
new file mode 100644
index 00000000..f921eef1
--- /dev/null
+++ b/crates/core/tedge_mapper/src/c8y/alarm_converter.rs
@@ -0,0 +1,165 @@
+use std::collections::{hash_map::Entry, HashMap};
+
+use c8y_smartrest::alarm;
+use mqtt_channel::{Message, Topic};
+use thin_edge_json::alarm::ThinEdgeAlarm;
+
+use crate::core::error::ConversionError;
+
+const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
+const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
+const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) enum AlarmConverter {
+ Syncing {
+ pending_alarms_map: HashMap<String, Message>,
+ old_alarms_map: HashMap<String, Message>,
+ },
+ Synced,
+}
+
+impl AlarmConverter {
+ pub(crate) fn new() -> Self {
+ AlarmConverter::Syncing {
+ old_alarms_map: HashMap::new(),
+ pending_alarms_map: HashMap::new(),
+ }
+ }
+
+ pub(crate) fn try_convert_alarm(
+ &mut self,
+ input_message: &Message,
+ ) -> Result<Vec<Message>, ConversionError> {
+ let mut output_messages: Vec<Message> = Vec::new();
+ match self {
+ Self::Syncing {
+ pending_alarms_map,
+ old_alarms_map: _,
+ } => {
+ let alarm_id = input_message
+ .topic
+ .name
+ .strip_prefix(TEDGE_ALARMS_TOPIC)
+ .expect("Expected tedge/alarms prefix")
+ .to_string();
+ pending_alarms_map.insert(alarm_id, input_message.clone());
+ }
+ Self::Synced => {
+ //Regular conversion phase
+ let tedge_alarm = ThinEdgeAlarm::try_from(
+ input_message.topic.name.as_str(),
+ input_message.payload_str()?,
+ )?;
+ let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?;
+ let c8y_alarm_topic = Topic::new_unchecked(
+ self.get_c8y_alarm_topic(input_message.topic.name.as_str())?
+ .as_str(),
+ );
+ output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm));
+
+ // Persist a copy of the alarm to an internal topic for reconciliation on next restart
+ let alarm_id = input_message
+ .topic
+ .name
+ .strip_prefix(TEDGE_ALARMS_TOPIC)
+ .expect("Expected tedge/alarms prefix")
+ .to_string();
+ let topic =
+ Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str());
+ let alarm_copy =
+ Message::new(&topic, input_message.payload_bytes().to_owned()).with_retain();
+ output_messages.push(alarm_copy);
+ }
+ }
+ Ok(output_messages)
+ }
+
+ pub(crate) fn get_c8y_alarm_topic(&self, topic: &str) -> Result<String, ConversionError> {
+ let topic_split: Vec<&str> = topic.split('/').collect();
+ if topic_split.len() == 4 {
+ Ok(SMARTREST_PUBLISH_TOPIC.to_string())
+ } else if topic_split.len() == 5 {
+ Ok(format!("{SMARTREST_PUBLISH_TOPIC}/{}", topic_split[4]))
+ } else {
+ Err(ConversionError::UnsupportedTopic(topic.to_string()))
+ }
+ }
+
+ pub(crate) fn process_internal_alarm(&mut self, input: &Message) {
+ match self {
+ Self::Syncing {
+ pending_alarms_map: _,
+ old_alarms_map,
+ } => {
+ let alarm_id = input
+ .topic
+ .name
+ .strip_prefix(INTERNAL_ALARMS_TOPIC)
+ .expect("Expected c8y-internal/alarms prefix")
+ .to_string();
+ old_alarms_map.insert(alarm_id, input.clone());
+ }
+ Self::Synced => {
+ // Ignore
+ }
+ }
+ }
+
+ /// Detect and sync any alarms that were raised/cleared while this mapper process was not running.
+ /// For this syncing logic, converter maintains an internal journal of all the alarms processed by this mapper,
+ /// which is compared against all the live alarms seen by the mapper on every startup.
+ ///
+ /// All the live alarms are received from tedge/alarms topic on startup.
+ /// Similarly, all the previously processed alarms are received from c8y-internal/alarms topic.
+ /// Sync detects the difference between these two sets, which are the missed messages.
+ ///
+ /// An alarm that is present in c8y-internal/alarms, but not in tedge/alarms topic
+ /// is assumed to have been cleared while the mapper process was down.
+ /// Similarly, an alarm that is present in tedge/alarms, but not in c8y-internal/alarms topic
+ /// is one that was raised while the mapper process was down.
+ /// An alarm present in both, if their payload is the same, is one that was already processed before the restart
+ /// and hence can be ignored during sync.
+ pub(crate) fn sync(&mut self) -> Vec<Message> {
+ let mut sync_messages: Vec<Message> = Vec::new();
+
+ match self {
+ Self::Syncing {
+ pending_alarms_map,
+ old_alarms_map,
+ } => {
+ // Compare the differences between alarms in tedge/alarms topic to the ones in c8y-internal/alarms topic
+ old_alarms_map.drain().for_each(|(alarm_id, old_message)| {
+ match pending_alarms_map.entry(alarm_id.clone()) {
+ // If an alarm that is present in c8y-internal/alarms topic is not present in tedge/alarms topic,
+ // it is assumed to have been cleared while the mapper process was down
+ Entry::Vacant(_) => {
+ let topic = Topic::new_unchecked(
+ format!("{TEDGE_ALARMS_TOPIC}{alarm_id}").as_str(),
+ );
+ let message = Message::new(&topic, vec![]).with_retain();
+ // Recreate the clear alarm message and add it to the pending alarms list to be processed later
+ sync_messages.push(message);
+ }
+
+ // If the payload of a message received from tedge/alarms is same as one received from c8y-internal/alarms,
+ // it is assumed to be one that was already processed earlier and hence removed from the pending alarms list.
+ Entry::Occupied(entry) => {
+ if entry.get().payload_bytes() == old_message.payload_bytes() {
+ entry.remove();
+ }
+ }
+ }
+ });
+
+ pending_alarms_map
+ .drain()
+ .for_each(|(_key, message)| sync_messages.push(message));
+ }
+ Self::Synced => {
+ // Ignore
+ }
+ }
+ sync_messages
+ }
+}
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 8a3b7197..37c08048 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -12,7 +12,6 @@ use c8y_api::{
};
use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric;
use c8y_smartrest::{
- alarm,
error::SmartRestDeserializerError,
operations::{get_operation, Operations},
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
@@ -28,17 +27,18 @@ use logged_command::LoggedCommand;
use mqtt_channel::{Message, Topic, TopicFilter};
use plugin_sm::operation_logs::OperationLogs;
use std::{
- collections::{hash_map::Entry, HashMap, HashSet},
+ collections::HashSet,
fs::File,
io::Read,
path::{Path, PathBuf},
};
use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting};
-use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use thin_edge_json::event::ThinEdgeEvent;
use time::format_description::well_known::Rfc3339;
use tracing::{debug, info, log::error};
+use super::alarm_converter::AlarmConverter;
use super::{
error::CumulocityMapperError,
fragments::{C8yAgentFragment, C8yDeviceDataFragment},
@@ -51,7 +51,6 @@ const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.jso
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations";
const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
-const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
@@ -480,160 +479,6 @@ async fn parse_c8y_topics(
}
}
-#[derive(Debug, Clone, PartialEq, Eq)]
-enum AlarmConverter {
- Syncing {
- pending_alarms_map: HashMap<String, Message>,
- old_alarms_map: HashMap<String, Message>,
- },
- Synced,
-}
-
-impl AlarmConverter {
- fn new() -> Self {
- AlarmConverter::Syncing {
- old_alarms_map: HashMap::new(),
- pending_alarms_map: HashMap::new(),
- }
- }
-
- fn try_convert_alarm(
- &mut self,
- input_message: &Message,
- ) -> Result<Vec<Message>, ConversionError> {
- let mut output_messages: Vec<Message> = Vec::new();
- match self {
- Self::Syncing {
- pending_alarms_map,
- old_alarms_map: _,
- } => {
- let alarm_id = input_message
- .topic
- .name
- .strip_prefix(TEDGE_ALARMS_TOPIC)
- .expect("Expected tedge/alarms prefix")
- .to_string();
- pending_alarms_map.insert(alarm_id, input_message.clone());
- }
- Self::Synced => {
- //Regular conversion phase
- let tedge_alarm = ThinEdgeAlarm::try_from(
- input_message.topic.name.as_str(),
- input_message.payload_str()?,
- )?;
- let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?;
- let c8y_alarm_topic = Topic::new_unchecked(
- self.get_c8y_alarm_topic(input_message.topic.name.as_str())?
- .as_str(),
- );
- output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm));
-
- // Persist a copy of the alarm to an internal topic for reconciliation on next restart
- let alarm_id = input_message
- .topic
- .name
- .strip_prefix(TEDGE_ALARMS_TOPIC)
- .expect("Expected tedge/alarms prefix")
- .to_string();
- let topic =
- Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str());
- let alarm_copy =
- Message::new(&topic, input_message.payload_bytes().to_owned()).with_retain();
- output_messages.push(alarm_copy);
- }
- }
- Ok(output_messages)
- }
-
- fn get_c8y_alarm_topic(&self, topic: &str) -> Result<String, ConversionError> {
- let topic_split: Vec<&str> = topic.split('/').collect();
- if topic_split.len() == 4 {
- Ok(SMARTREST_PUBLISH_TOPIC.to_string())
- } else if topic_split.len() == 5 {
- Ok(format!("{SMARTREST_PUBLISH_TOPIC}/{}", topic_split[4]))
- } else {
- Err(ConversionError::UnsupportedTopic(topic.to_string()))
- }
- }
-
- fn process_internal_alarm(&mut self, input: &Message) {
- match self {
- Self::Syncing {
- pending_alarms_map: _,
- old_alarms_map,
- } => {
- let alarm_id = input
- .topic
- .name
- .strip_prefix(INTERNAL_ALARMS_TOPIC)
- .expect("Expected c8y-internal/alarms prefix")
- .to_string();
- old_alarms_map.insert(alarm_id, input.clone());
- }
- Self::Synced => {
- // Ignore
- }
- }
- }
-
- /// Detect and sync any alarms that were raised/cleared while this mapper process was not running.
- /// For this syncing logic, converter maintains an internal journal of all the alarms processed by this mapper,
- /// which is compared against all the live alarms seen by the mapper on every startup.
- ///
- /// All the live alarms are received from tedge/alarms topic on startup.
- /// Similarly, all the previously processed alarms are received from c8y-internal/alarms topic.
- /// Sync detects the difference between these two sets, which are the missed messages.
- ///
- /// An alarm that is present in c8y-internal/alarms, but not in tedge/alarms topic
- /// is assumed to have been cleared while the mapper process was down.
- /// Similarly, an alarm that is present in tedge/alarms, but not in c8y-internal/alarms topic
- /// is one that was raised while the mapper process was down.
- /// An alarm present in both, if their payload is the same, is one that was already processed before the restart
- /// and hence can be ignored during sync.
- fn sync(&mut self) -> Vec<Message> {
- let mut sync_messages: Vec<Message> = Vec::new();
-
- match self {
- Self::Syncing {
- pending_alarms_map,
- old_alarms_map,
- } => {
- // Compare the differences between alarms in tedge/alarms topic to the ones in c8y-internal/alarms topic
- old_alarms_map.drain().for_each(|(alarm_id, old_message)| {
- match pending_alarms_map.entry(alarm_id.clone()) {
- // If an alarm that is present in c8y-internal/alarms topic is not present in tedge/alarms topic,
- // it is assumed to have been cleared while the mapper process was down
- Entry::Vacant(_) => {
- let topic = Topic::new_unchecked(
- format!("{TEDGE_ALARMS_TOPIC}{alarm_id}").as_str(),
- );
- let message = Message::new(&topic, vec![]).with_retain();
- // Recreate the clear alarm message and add it to the pending alarms list to be processed later
- sync_messages.push(message);
- }
-
- // If the payload of a message received from tedge/alarms is same as one received from c8y-internal/alarms,
- // it is assumed to be one that was already processed earlier and hence removed from the pending alarms list.
- Entry::Occupied(entry) => {
- if entry.get().payload_bytes() == old_message.payload_bytes() {
- entry.remove();
- }
- }
- }
- });
-
- pending_alarms_map
- .drain()
- .for_each(|(_key, message)| sync_messages.push(message));
- }
- Self::Synced => {
- // Ignore
- }
- }
- sync_messages
- }
-}
-
fn create_device_data_fragments(
device_name: &str,
device_type: &str,
diff --git a/crates/core/tedge_mapper/src/c8y/mod.rs b/crates/core/tedge_mapper/src/c8y/mod.rs
index 87b994ea..77261b07 100644
--- a/crates/core/tedge_mapper/src/c8y/mod.rs
+++ b/crates/core/tedge_mapper/src/c8y/mod.rs
@@ -1,3 +1,4 @@
+pub mod alarm_converter;
pub mod converter;
pub mod dynamic_discovery;
pub mod error;