diff options
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r-- | crates/core/tedge_mapper/src/az/converter.rs | 8 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/mapper.rs | 20 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 65 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/mapper.rs | 20 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/component.rs | 35 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/main.rs | 20 |
7 files changed, 121 insertions, 49 deletions
diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs index 7989cfbd..f7f4b7a9 100644 --- a/crates/core/tedge_mapper/src/az/converter.rs +++ b/crates/core/tedge_mapper/src/az/converter.rs @@ -2,7 +2,7 @@ use crate::core::{converter::*, error::*, size_threshold::SizeThreshold}; use async_trait::async_trait; use clock::Clock; -use mqtt_channel::Message; +use mqtt_channel::{Message, TopicFilter}; use thin_edge_json::serialize::ThinEdgeJsonSerializer; pub struct AzureConverter { @@ -15,7 +15,7 @@ pub struct AzureConverter { impl AzureConverter { pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, size_threshold: SizeThreshold) -> Self { let mapper_config = MapperConfig { - in_topic_filter: make_valid_topic_filter_or_panic("tedge/measurements"), + in_topic_filter: Self::in_topic_filter(), out_topic: make_valid_topic_or_panic("az/messages/events/"), errors_topic: make_valid_topic_or_panic("tedge/errors"), }; @@ -26,6 +26,10 @@ impl AzureConverter { mapper_config, } } + + pub fn in_topic_filter() -> TopicFilter { + make_valid_topic_filter_or_panic("tedge/measurements") + } } #[async_trait] diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 542e6c1c..0d998eff 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -7,7 +7,8 @@ use async_trait::async_trait; use clock::WallClock; use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig}; use tedge_config::{ConfigSettingAccessor, MqttPortSetting}; -use tracing::{info_span, Instrument}; +use tedge_utils::file::create_directory_with_user_group; +use tracing::{info, info_span, Instrument}; const AZURE_MAPPER_NAME: &str = "tedge-mapper-az"; @@ -21,6 +22,23 @@ impl AzureMapper { #[async_trait] impl TEdgeComponent for AzureMapper { + fn session_name(&self) -> &str { + AZURE_MAPPER_NAME + } + + async fn init(&self) -> Result<(), anyhow::Error> { + info!("Initialize tedge mapper az"); + create_directory_with_user_group( + "/etc/tedge/operations/az", + "tedge-mapper", + "tedge-mapper", + 0o775, + )?; + + self.init_session(AzureConverter::in_topic_filter()).await?; + Ok(()) + } + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 07abc637..cb9a4f93 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -9,9 +9,10 @@ use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::operations::Operations; use mqtt_channel::TopicFilter; use tedge_config::{ - ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, - MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, + ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttBindAddressSetting, + MqttPortSetting, TEdgeConfig, }; +use tedge_utils::file::*; use tracing::{info, info_span, Instrument}; use super::topic::C8yTopic; @@ -38,39 +39,23 @@ impl CumulocityMapper { Ok(topic_filter) } +} - pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { - info!("Initialize tedge mapper session"); - mqtt_channel::init_session(&self.get_mqtt_config()?).await?; - Ok(()) - } - - pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { - info!("Clear tedge mapper session"); - mqtt_channel::clear_session(&self.get_mqtt_config()?).await?; - Ok(()) +#[async_trait] +impl TEdgeComponent for CumulocityMapper { + fn session_name(&self) -> &str { + CUMULOCITY_MAPPER_NAME } - fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> { + async fn init(&self) -> Result<(), anyhow::Error> { + info!("Initialize tedge mapper c8y"); + create_directories()?; let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = Self::subscriptions(&operations)?; - let config_repository = - tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default()); - let tedge_config = config_repository.load()?; - - let mqtt_config = mqtt_channel::Config::default() - .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) - .with_port(tedge_config.query(MqttPortSetting)?.into()) - .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(false) - .with_subscriptions(mqtt_topic); - - Ok(mqtt_config) + self.init_session(CumulocityMapper::subscriptions(&operations)?) + .await?; + Ok(()) } -} -#[async_trait] -impl TEdgeComponent for CumulocityMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); @@ -101,3 +86,25 @@ impl TEdgeComponent for CumulocityMapper { Ok(()) } } + +fn create_directories() -> Result<(), anyhow::Error> { + create_directory_with_user_group( + "/etc/tedge/operations/c8y", + "tedge-mapper", + "tedge-mapper", + 0o775, + )?; + create_file_with_user_group( + "/etc/tedge/operations/c8y/c8y_SoftwareUpdate", + "tedge-mapper", + "tedge-mapper", + 0o644, + )?; + create_file_with_user_group( + "/etc/tedge/operations/c8y/c8y_Restart", + "tedge-mapper", + "tedge-mapper", + 0o644, + )?; + Ok(()) +} diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs index abae07af..a4c6f4c5 100644 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ b/crates/core/tedge_mapper/src/collectd/mapper.rs @@ -3,10 +3,11 @@ use crate::{ core::component::TEdgeComponent, }; use async_trait::async_trait; +use mqtt_channel::TopicFilter; use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig}; -use tracing::{info_span, Instrument}; +use tracing::{info, info_span, Instrument}; -const APP_NAME: &str = "tedge-mapper-collectd"; +const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd"; pub struct CollectdMapper {} @@ -18,6 +19,19 @@ impl CollectdMapper { #[async_trait] impl TEdgeComponent for CollectdMapper { + fn session_name(&self) -> &str { + COLLECTD_MAPPER_NAME + } + + async fn init(&self) -> Result<(), anyhow::Error> { + info!("Initialize tedge mapper collectd"); + self.init_session(TopicFilter::new( + DeviceMonitorConfig::default().mqtt_source_topic, + )?) + .await?; + Ok(()) + } + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); @@ -29,7 +43,7 @@ impl TEdgeComponent for CollectdMapper { let device_monitor = DeviceMonitor::new(device_monitor_config); device_monitor .run() - .instrument(info_span!(APP_NAME)) + .instrument(info_span!(COLLECTD_MAPPER_NAME)) .await?; Ok(()) diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index c804deb5..93ea68cd 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -18,7 +18,7 @@ pub struct DeviceMonitorConfig { host: String, port: u16, mqtt_client_id: &'static str, - mqtt_source_topic: &'static str, + pub mqtt_source_topic: &'static str, mqtt_target_topic: &'static str, batching_window: u32, maximum_message_delay: u32, diff --git a/crates/core/tedge_mapper/src/core/component.rs b/crates/core/tedge_mapper/src/core/component.rs index 6e5fedc6..786c78c8 100644 --- a/crates/core/tedge_mapper/src/core/component.rs +++ b/crates/core/tedge_mapper/src/core/component.rs @@ -1,7 +1,38 @@ use async_trait::async_trait; -use tedge_config::TEdgeConfig; +use mqtt_channel::TopicFilter; +use tedge_config::{ + ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, +}; +use tracing::info; #[async_trait] -pub trait TEdgeComponent { +pub trait TEdgeComponent: Sync + Send { + fn session_name(&self) -> &str; async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error>; + async fn init(&self) -> Result<(), anyhow::Error>; + async fn init_session(&self, mqtt_topics: TopicFilter) -> Result<(), anyhow::Error> { + mqtt_channel::init_session(&self.get_mqtt_config()?.with_subscriptions(mqtt_topics)) + .await?; + Ok(()) + } + + async fn clear_session(&self) -> Result<(), anyhow::Error> { + info!("Clear {} session", self.session_name()); + mqtt_channel::clear_session(&self.get_mqtt_config()?).await?; + Ok(()) + } + + fn get_mqtt_config(&self) -> Result<mqtt_channel::Config, anyhow::Error> { + let config_repository = + tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default()); + let tedge_config = config_repository.load()?; + + let mqtt_config = mqtt_channel::Config::default() + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) + .with_port(tedge_config.query(MqttPortSetting)?.into()) + .with_session_name(self.session_name()) + .with_clean_session(false); + + Ok(mqtt_config) + } } diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs index d6de0301..aabcff27 100644 --- a/crates/core/tedge_mapper/src/main.rs +++ b/crates/core/tedge_mapper/src/main.rs @@ -75,26 +75,24 @@ impl fmt::Display for MapperName { #[tokio::main] async fn main() -> anyhow::Result<()> { - let mapper = MapperOpt::parse(); - tedge_utils::logging::initialise_tracing_subscriber(mapper.debug); + let mapper_opt = MapperOpt::parse(); + tedge_utils::logging::initialise_tracing_subscriber(mapper_opt.debug); - let component = lookup_component(&mapper.name); + let component = lookup_component(&mapper_opt.name); let tedge_config_location = - tedge_config::TEdgeConfigLocation::from_custom_root(&mapper.config_dir); + tedge_config::TEdgeConfigLocation::from_custom_root(&mapper_opt.config_dir); let config = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()).load()?; // Run only one instance of a mapper let _flock = check_another_instance_is_not_running( - &mapper.name.to_string(), + &mapper_opt.name.to_string(), &config.query(RunPathDefaultSetting)?.into(), )?; - if mapper.init { - let mut mapper = CumulocityMapper::new(); - mapper.init_session().await - } else if mapper.clear { - let mut mapper = CumulocityMapper::new(); - mapper.clear_session().await + if mapper_opt.init { + component.init().await + } else if mapper_opt.clear { + component.clear_session().await } else { component.start(config).await } |