diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/mapper.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index a5df4d0b..def4808c 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -7,9 +7,10 @@ use agent_interface::topic::ResponseTopic; use async_trait::async_trait; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::operations::Operations; -use mqtt_channel::{Config, TopicFilter}; +use mqtt_channel::TopicFilter; use tedge_config::{ - ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig, + ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, + MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, }; use tracing::{info, info_span, Instrument}; @@ -39,27 +40,32 @@ impl CumulocityMapper { } pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { - info!("Initialize tedge sm mapper session"); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = Self::subscriptions(&operations)?; - let config = Config::default() - .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(false) - .with_subscriptions(mqtt_topic); - mqtt_channel::init_session(&config).await?; + info!("Initialize tedge mapper session"); + mqtt_channel::init_session(&self.get_mqtt_config()?).await?; Ok(()) } pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { - info!("Clear tedge sm mapper session"); + info!("Clear tedge mapper session"); + mqtt_channel::clear_session(&self.get_mqtt_config()?).await?; + Ok(()) + } + + fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> { let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; let mqtt_topic = Self::subscriptions(&operations)?; - let config = Config::default() + let config_repository = + tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default()); + let tedge_config = config_repository.load()?; + + let mqtt_config = mqtt_channel::Config::default() + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) + .with_port(tedge_config.query(MqttPortSetting)?.into()) .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(true) + .with_clean_session(false) .with_subscriptions(mqtt_topic); - mqtt_channel::clear_session(&config).await?; - Ok(()) + + Ok(mqtt_config) } } @@ -74,6 +80,7 @@ impl TEdgeComponent for CumulocityMapper { let device_name = tedge_config.query(DeviceIdSetting)?; let device_type = tedge_config.query(DeviceTypeSetting)?; let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let converter = Box::new(CumulocityConverter::new( size_threshold, @@ -83,7 +90,8 @@ impl TEdgeComponent for CumulocityMapper { http_proxy, )); - let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?; + let mut mapper = + create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; mapper .run() |