diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 29 |
1 files changed, 27 insertions, 2 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index 4235146b..3637fe56 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -27,7 +27,7 @@ use c8y_smartrest::{ }; use chrono::{DateTime, FixedOffset}; use download::{Auth, DownloadInfo}; -use mqtt_channel::{Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter}; +use mqtt_channel::{Config, Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -36,6 +36,7 @@ use tedge_config::TEdgeConfig; use tracing::{debug, error, info, instrument}; const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; +const SM_MAPPER: &str = "SM-C8Y-Mapper"; pub struct CumulocitySoftwareManagementMapper {} @@ -56,6 +57,30 @@ impl CumulocitySoftwareManagementMapper { Ok(topic_filter) } + + pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { + info!("Initialize tedge sm mapper session"); + let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; + let config = Config::default() + .with_session_name(SM_MAPPER) + .with_clean_session(false) + .with_subscriptions(mqtt_topic); + mqtt_channel::init_session(&config).await?; + Ok(()) + } + + pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { + info!("Clear tedge sm mapper session"); + let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; + let config = Config::default() + .with_session_name(SM_MAPPER) + .with_clean_session(true) + .with_subscriptions(mqtt_topic); + mqtt_channel::clear_session(&config).await?; + Ok(()) + } } #[async_trait] @@ -100,7 +125,7 @@ where operations: Operations, ) -> Result<Self, anyhow::Error> { let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; - let mqtt_config = crate::mapper::mqtt_config("SM-C8Y-Mapper", &tedge_config, mqtt_topic)?; + let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, &tedge_config, mqtt_topic)?; let client = Connection::new(&mqtt_config).await?; Ok(Self { |