diff options
-rw-r--r-- | configuration/debian/tedge/preinst | 6 | ||||
-rw-r--r-- | configuration/debian/tedge_agent/postinst | 2 | ||||
-rw-r--r-- | configuration/debian/tedge_mapper/postinst | 4 | ||||
-rw-r--r-- | crates/core/tedge_agent/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/alarm_converter.rs | 165 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 161 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mod.rs | 1 | ||||
-rw-r--r-- | crates/core/tedge_watchdog/Cargo.toml | 2 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/Cargo.toml | 2 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 2 | ||||
-rw-r--r-- | plugins/tedge_apama_plugin/Cargo.toml | 2 | ||||
-rw-r--r-- | plugins/tedge_apt_plugin/Cargo.toml | 2 | ||||
-rw-r--r-- | plugins/tedge_dummy_plugin/Cargo.toml | 2 |
13 files changed, 189 insertions, 164 deletions
diff --git a/configuration/debian/tedge/preinst b/configuration/debian/tedge/preinst index b2843104..ec91f6e8 100644 --- a/configuration/debian/tedge/preinst +++ b/configuration/debian/tedge/preinst @@ -3,13 +3,13 @@ set -e ### Create groups if ! getent group tedge >/dev/null; then - addgroup --quiet --system tedge + groupadd --system tedge fi ### Create users -# Create user tedge with no additional info(--gecos "") no home(--no-create-home), no login(--shell) and in group tedge(--ingroup) +# Create user tedge with no home(--no-create-home), no login(--shell) and in group tedge(--gid) if ! getent passwd tedge >/dev/null; then - adduser --quiet --system --gecos "" --no-create-home --disabled-login --shell /sbin/nologin --ingroup tedge tedge + useradd --system --no-create-home --shell /sbin/nologin --gid tedge tedge fi ### Create file in /etc/sudoers.d directory. With this configuration, the tedge user have the right to call the tedge command with sudo rights, which is required for system-wide configuration in "/etc/tedge" diff --git a/configuration/debian/tedge_agent/postinst b/configuration/debian/tedge_agent/postinst index 4059b182..348ebe12 100644 --- a/configuration/debian/tedge_agent/postinst +++ b/configuration/debian/tedge_agent/postinst @@ -13,6 +13,6 @@ if command -v systemctl >/dev/null; then fi # Initialize the agent -runuser -u tedge -- tedge_agent --init +sudo -u tedge -- tedge_agent --init #DEBHELPER# diff --git a/configuration/debian/tedge_mapper/postinst b/configuration/debian/tedge_mapper/postinst index bb12342a..a637165f 100644 --- a/configuration/debian/tedge_mapper/postinst +++ b/configuration/debian/tedge_mapper/postinst @@ -4,6 +4,6 @@ set -e ### Initialize the sm mapper -runuser -u tedge -- tedge_mapper --init c8y -runuser -u tedge -- tedge_mapper --init az +sudo -u tedge -- tedge_mapper --init c8y +sudo -u tedge -- tedge_mapper --init az #DEBHELPER# diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml index 77732974..55097eca 100644 --- a/crates/core/tedge_agent/Cargo.toml +++ b/crates/core/tedge_agent/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "tedge_agent interacts with a Cloud Mapper and one or more Software Plugins" +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" [package.metadata.deb] pre-depends = "tedge_mapper" diff --git a/crates/core/tedge_mapper/src/c8y/alarm_converter.rs b/crates/core/tedge_mapper/src/c8y/alarm_converter.rs new file mode 100644 index 00000000..f921eef1 --- /dev/null +++ b/crates/core/tedge_mapper/src/c8y/alarm_converter.rs @@ -0,0 +1,165 @@ +use std::collections::{hash_map::Entry, HashMap}; + +use c8y_smartrest::alarm; +use mqtt_channel::{Message, Topic}; +use thin_edge_json::alarm::ThinEdgeAlarm; + +use crate::core::error::ConversionError; + +const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; +const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; +const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum AlarmConverter { + Syncing { + pending_alarms_map: HashMap<String, Message>, + old_alarms_map: HashMap<String, Message>, + }, + Synced, +} + +impl AlarmConverter { + pub(crate) fn new() -> Self { + AlarmConverter::Syncing { + old_alarms_map: HashMap::new(), + pending_alarms_map: HashMap::new(), + } + } + + pub(crate) fn try_convert_alarm( + &mut self, + input_message: &Message, + ) -> Result<Vec<Message>, ConversionError> { + let mut output_messages: Vec<Message> = Vec::new(); + match self { + Self::Syncing { + pending_alarms_map, + old_alarms_map: _, + } => { + let alarm_id = input_message + .topic + .name + .strip_prefix(TEDGE_ALARMS_TOPIC) + .expect("Expected tedge/alarms prefix") + .to_string(); + pending_alarms_map.insert(alarm_id, input_message.clone()); + } + Self::Synced => { + //Regular conversion phase + let tedge_alarm = ThinEdgeAlarm::try_from( + input_message.topic.name.as_str(), + input_message.payload_str()?, + )?; + let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?; + let c8y_alarm_topic = Topic::new_unchecked( + self.get_c8y_alarm_topic(input_message.topic.name.as_str())? + .as_str(), + ); + output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm)); + + // Persist a copy of the alarm to an internal topic for reconciliation on next restart + let alarm_id = input_message + .topic + .name + .strip_prefix(TEDGE_ALARMS_TOPIC) + .expect("Expected tedge/alarms prefix") + .to_string(); + let topic = + Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str()); + let alarm_copy = + Message::new(&topic, input_message.payload_bytes().to_owned()).with_retain(); + output_messages.push(alarm_copy); + } + } + Ok(output_messages) + } + + pub(crate) fn get_c8y_alarm_topic(&self, topic: &str) -> Result<String, ConversionError> { + let topic_split: Vec<&str> = topic.split('/').collect(); + if topic_split.len() == 4 { + Ok(SMARTREST_PUBLISH_TOPIC.to_string()) + } else if topic_split.len() == 5 { + Ok(format!("{SMARTREST_PUBLISH_TOPIC}/{}", topic_split[4])) + } else { + Err(ConversionError::UnsupportedTopic(topic.to_string())) + } + } + + pub(crate) fn process_internal_alarm(&mut self, input: &Message) { + match self { + Self::Syncing { + pending_alarms_map: _, + old_alarms_map, + } => { + let alarm_id = input + .topic + .name + .strip_prefix(INTERNAL_ALARMS_TOPIC) + .expect("Expected c8y-internal/alarms prefix") + .to_string(); + old_alarms_map.insert(alarm_id, input.clone()); + } + Self::Synced => { + // Ignore + } + } + } + + /// Detect and sync any alarms that were raised/cleared while this mapper process was not running. + /// For this syncing logic, converter maintains an internal journal of all the alarms processed by this mapper, + /// which is compared against all the live alarms seen by the mapper on every startup. + /// + /// All the live alarms are received from tedge/alarms topic on startup. + /// Similarly, all the previously processed alarms are received from c8y-internal/alarms topic. + /// Sync detects the difference between these two sets, which are the missed messages. + /// + /// An alarm that is present in c8y-internal/alarms, but not in tedge/alarms topic + /// is assumed to have been cleared while the mapper process was down. + /// Similarly, an alarm that is present in tedge/alarms, but not in c8y-internal/alarms topic + /// is one that was raised while the mapper process was down. + /// An alarm present in both, if their payload is the same, is one that was already processed before the restart + /// and hence can be ignored during sync. + pub(crate) fn sync(&mut self) -> Vec<Message> { + let mut sync_messages: Vec<Message> = Vec::new(); + + match self { + Self::Syncing { + pending_alarms_map, + old_alarms_map, + } => { + // Compare the differences between alarms in tedge/alarms topic to the ones in c8y-internal/alarms topic + old_alarms_map.drain().for_each(|(alarm_id, old_message)| { + match pending_alarms_map.entry(alarm_id.clone()) { + // If an alarm that is present in c8y-internal/alarms topic is not present in tedge/alarms topic, + // it is assumed to have been cleared while the mapper process was down + Entry::Vacant(_) => { + let topic = Topic::new_unchecked( + format!("{TEDGE_ALARMS_TOPIC}{alarm_id}").as_str(), + ); + let message = Message::new(&topic, vec![]).with_retain(); + // Recreate the clear alarm message and add it to the pending alarms list to be processed later + sync_messages.push(message); + } + + // If the payload of a message received from tedge/alarms is same as one received from c8y-internal/alarms, + // it is assumed to be one that was already processed earlier and hence removed from the pending alarms list. + Entry::Occupied(entry) => { + if entry.get().payload_bytes() == old_message.payload_bytes() { + entry.remove(); + } + } + } + }); + + pending_alarms_map + .drain() + .for_each(|(_key, message)| sync_messages.push(message)); + } + Self::Synced => { + // Ignore + } + } + sync_messages + } +} diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 8a3b7197..37c08048 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -12,7 +12,6 @@ use c8y_api::{ }; use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric; use c8y_smartrest::{ - alarm, error::SmartRestDeserializerError, operations::{get_operation, Operations}, smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, @@ -28,17 +27,18 @@ use logged_command::LoggedCommand; use mqtt_channel::{Message, Topic, TopicFilter}; use plugin_sm::operation_logs::OperationLogs; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::HashSet, fs::File, io::Read, path::{Path, PathBuf}, }; use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting}; -use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; +use thin_edge_json::event::ThinEdgeEvent; use time::format_description::well_known::Rfc3339; use tracing::{debug, info, log::error}; +use super::alarm_converter::AlarmConverter; use super::{ error::CumulocityMapperError, fragments::{C8yAgentFragment, C8yDeviceDataFragment}, @@ -51,7 +51,6 @@ const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.jso const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations"; const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; -const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; @@ -480,160 +479,6 @@ async fn parse_c8y_topics( } } -#[derive(Debug, Clone, PartialEq, Eq)] -enum AlarmConverter { - Syncing { - pending_alarms_map: HashMap<String, Message>, - old_alarms_map: HashMap<String, Message>, - }, - Synced, -} - -impl AlarmConverter { - fn new() -> Self { - AlarmConverter::Syncing { - old_alarms_map: HashMap::new(), - pending_alarms_map: HashMap::new(), - } - } - - fn try_convert_alarm( - &mut self, - input_message: &Message, - ) -> Result<Vec<Message>, ConversionError> { - let mut output_messages: Vec<Message> = Vec::new(); - match self { - Self::Syncing { - pending_alarms_map, - old_alarms_map: _, - } => { - let alarm_id = input_message - .topic - .name - .strip_prefix(TEDGE_ALARMS_TOPIC) - .expect("Expected tedge/alarms prefix") - .to_string(); - pending_alarms_map.insert(alarm_id, input_message.clone()); - } - Self::Synced => { - //Regular conversion phase - let tedge_alarm = ThinEdgeAlarm::try_from( - input_message.topic.name.as_str(), - input_message.payload_str()?, - )?; - let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?; - let c8y_alarm_topic = Topic::new_unchecked( - self.get_c8y_alarm_topic(input_message.topic.name.as_str())? - .as_str(), - ); - output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm)); - - // Persist a copy of the alarm to an internal topic for reconciliation on next restart - let alarm_id = input_message - .topic - .name - .strip_prefix(TEDGE_ALARMS_TOPIC) - .expect("Expected tedge/alarms prefix") - .to_string(); - let topic = - Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str()); - let alarm_copy = - Message::new(&topic, input_message.payload_bytes().to_owned()).with_retain(); - output_messages.push(alarm_copy); - } - } - Ok(output_messages) - } - - fn get_c8y_alarm_topic(&self, topic: &str) -> Result<String, ConversionError> { - let topic_split: Vec<&str> = topic.split('/').collect(); - if topic_split.len() == 4 { - Ok(SMARTREST_PUBLISH_TOPIC.to_string()) - } else if topic_split.len() == 5 { - Ok(format!("{SMARTREST_PUBLISH_TOPIC}/{}", topic_split[4])) - } else { - Err(ConversionError::UnsupportedTopic(topic.to_string())) - } - } - - fn process_internal_alarm(&mut self, input: &Message) { - match self { - Self::Syncing { - pending_alarms_map: _, - old_alarms_map, - } => { - let alarm_id = input - .topic - .name - .strip_prefix(INTERNAL_ALARMS_TOPIC) - .expect("Expected c8y-internal/alarms prefix") - .to_string(); - old_alarms_map.insert(alarm_id, input.clone()); - } - Self::Synced => { - // Ignore - } - } - } - - /// Detect and sync any alarms that were raised/cleared while this mapper process was not running. - /// For this syncing logic, converter maintains an internal journal of all the alarms processed by this mapper, - /// which is compared against all the live alarms seen by the mapper on every startup. - /// - /// All the live alarms are received from tedge/alarms topic on startup. - /// Similarly, all the previously processed alarms are received from c8y-internal/alarms topic. - /// Sync detects the difference between these two sets, which are the missed messages. - /// - /// An alarm that is present in c8y-internal/alarms, but not in tedge/alarms topic - /// is assumed to have been cleared while the mapper process was down. - /// Similarly, an alarm that is present in tedge/alarms, but not in c8y-internal/alarms topic - /// is one that was raised while the mapper process was down. - /// An alarm present in both, if their payload is the same, is one that was already processed before the restart - /// and hence can be ignored during sync. - fn sync(&mut self) -> Vec<Message> { - let mut sync_messages: Vec<Message> = Vec::new(); - - match self { - Self::Syncing { - pending_alarms_map, - old_alarms_map, - } => { - // Compare the differences between alarms in tedge/alarms topic to the ones in c8y-internal/alarms topic - old_alarms_map.drain().for_each(|(alarm_id, old_message)| { - match pending_alarms_map.entry(alarm_id.clone()) { - // If an alarm that is present in c8y-internal/alarms topic is not present in tedge/alarms topic, - // it is assumed to have been cleared while the mapper process was down - Entry::Vacant(_) => { - let topic = Topic::new_unchecked( - format!("{TEDGE_ALARMS_TOPIC}{alarm_id}").as_str(), - ); - let message = Message::new(&topic, vec![]).with_retain(); - // Recreate the clear alarm message and add it to the pending alarms list to be processed later - sync_messages.push(message); - } - - // If the payload of a message received from tedge/alarms is same as one received from c8y-internal/alarms, - // it is assumed to be one that was already processed earlier and hence removed from the pending alarms list. - Entry::Occupied(entry) => { - if entry.get().payload_bytes() == old_message.payload_bytes() { - entry.remove(); - } - } - } - }); - - pending_alarms_map - .drain() - .for_each(|(_key, message)| sync_messages.push(message)); - } - Self::Synced => { - // Ignore - } - } - sync_messages - } -} - fn create_device_data_fragments( device_name: &str, device_type: &str, diff --git a/crates/core/tedge_mapper/src/c8y/mod.rs b/crates/core/tedge_mapper/src/c8y/mod.rs index 87b994ea..77261b07 100644 --- a/crates/core/tedge_mapper/src/c8y/mod.rs +++ b/crates/core/tedge_mapper/src/c8y/mod.rs @@ -1,3 +1,4 @@ +pub mod alarm_converter; pub mod converter; pub mod dynamic_discovery; pub mod error; diff --git a/crates/core/tedge_watchdog/Cargo.toml b/crates/core/tedge_watchdog/Cargo.toml index 538a5918..6648c9f4 100644 --- a/crates/core/tedge_watchdog/Cargo.toml +++ b/crates/core/tedge_watchdog/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "tedge_watchdog checks the health of all the thin-edge.io components/services." +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index 385f916b..3d72f7f4 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "Thin-edge device configuration management for Cumulocity" +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" [package.metadata.deb] maintainer-scripts = "../../configuration/debian/c8y_configuration_plugin" diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 9defecac..73a50f48 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "Thin-edge device log file retriever for Cumulocity" +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [package.metadata.deb] diff --git a/plugins/tedge_apama_plugin/Cargo.toml b/plugins/tedge_apama_plugin/Cargo.toml index 56a2943f..3bd37b2b 100644 --- a/plugins/tedge_apama_plugin/Cargo.toml +++ b/plugins/tedge_apama_plugin/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "thin.edge.io plugin for installing apama projects" +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" [package.metadata.deb] assets = [ diff --git a/plugins/tedge_apt_plugin/Cargo.toml b/plugins/tedge_apt_plugin/Cargo.toml index de2056d9..99acbc0a 100644 --- a/plugins/tedge_apt_plugin/Cargo.toml +++ b/plugins/tedge_apt_plugin/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "Thin.edge.io plugin for software management using apt" +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" [package.metadata.deb] assets = [ diff --git a/plugins/tedge_dummy_plugin/Cargo.toml b/plugins/tedge_dummy_plugin/Cargo.toml index 25656696..7dc38715 100644 --- a/plugins/tedge_dummy_plugin/Cargo.toml +++ b/plugins/tedge_dummy_plugin/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" rust-version = "1.58.1" license = "Apache-2.0" description = "thin.edge.io dummy plugin for testing" +homepage = "https://thin-edge.io" +repository = "https://github.com/thin-edge/thin-edge.io" [dependencies] clap = { version = "3", features = ["derive"] } |