summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/mapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/mapper.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs40
1 files changed, 24 insertions, 16 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index a5df4d0b..def4808c 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -7,9 +7,10 @@ use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
-use mqtt_channel::{Config, TopicFilter};
+use mqtt_channel::TopicFilter;
use tedge_config::{
- ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig,
+ ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting,
+ MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
};
use tracing::{info, info_span, Instrument};
@@ -39,27 +40,32 @@ impl CumulocityMapper {
}
pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge sm mapper session");
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = Self::subscriptions(&operations)?;
- let config = Config::default()
- .with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
- mqtt_channel::init_session(&config).await?;
+ info!("Initialize tedge mapper session");
+ mqtt_channel::init_session(&self.get_mqtt_config()?).await?;
Ok(())
}
pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge sm mapper session");
+ info!("Clear tedge mapper session");
+ mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
+ Ok(())
+ }
+
+ fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> {
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let mqtt_topic = Self::subscriptions(&operations)?;
- let config = Config::default()
+ let config_repository =
+ tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
+ let tedge_config = config_repository.load()?;
+
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
+ .with_port(tedge_config.query(MqttPortSetting)?.into())
.with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(true)
+ .with_clean_session(false)
.with_subscriptions(mqtt_topic);
- mqtt_channel::clear_session(&config).await?;
- Ok(())
+
+ Ok(mqtt_config)
}
}
@@ -74,6 +80,7 @@ impl TEdgeComponent for CumulocityMapper {
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let converter = Box::new(CumulocityConverter::new(
size_threshold,
@@ -83,7 +90,8 @@ impl TEdgeComponent for CumulocityMapper {
http_proxy,
));
- let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?;
+ let mut mapper =
+ create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
mapper
.run()