summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs29
1 files changed, 27 insertions, 2 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index 4235146b..3637fe56 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -27,7 +27,7 @@ use c8y_smartrest::{
};
use chrono::{DateTime, FixedOffset};
use download::{Auth, DownloadInfo};
-use mqtt_channel::{Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter};
+use mqtt_channel::{Config, Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
@@ -36,6 +36,7 @@ use tedge_config::TEdgeConfig;
use tracing::{debug, error, info, instrument};
const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
+const SM_MAPPER: &str = "SM-C8Y-Mapper";
pub struct CumulocitySoftwareManagementMapper {}
@@ -56,6 +57,30 @@ impl CumulocitySoftwareManagementMapper {
Ok(topic_filter)
}
+
+ pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge sm mapper session");
+ let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
+ let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
+ let config = Config::default()
+ .with_session_name(SM_MAPPER)
+ .with_clean_session(false)
+ .with_subscriptions(mqtt_topic);
+ mqtt_channel::init_session(&config).await?;
+ Ok(())
+ }
+
+ pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
+ info!("Clear tedge sm mapper session");
+ let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
+ let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
+ let config = Config::default()
+ .with_session_name(SM_MAPPER)
+ .with_clean_session(true)
+ .with_subscriptions(mqtt_topic);
+ mqtt_channel::clear_session(&config).await?;
+ Ok(())
+ }
}
#[async_trait]
@@ -100,7 +125,7 @@ where
operations: Operations,
) -> Result<Self, anyhow::Error> {
let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
- let mqtt_config = crate::mapper::mqtt_config("SM-C8Y-Mapper", &tedge_config, mqtt_topic)?;
+ let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, &tedge_config, mqtt_topic)?;
let client = Connection::new(&mqtt_config).await?;
Ok(Self {