summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/mapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/mapper.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs65
1 files changed, 36 insertions, 29 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 07abc637..cb9a4f93 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -9,9 +9,10 @@ use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
use mqtt_channel::TopicFilter;
use tedge_config::{
- ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting,
- MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
+ ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttBindAddressSetting,
+ MqttPortSetting, TEdgeConfig,
};
+use tedge_utils::file::*;
use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
@@ -38,39 +39,23 @@ impl CumulocityMapper {
Ok(topic_filter)
}
+}
- pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge mapper session");
- mqtt_channel::init_session(&self.get_mqtt_config()?).await?;
- Ok(())
- }
-
- pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge mapper session");
- mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
- Ok(())
+#[async_trait]
+impl TEdgeComponent for CumulocityMapper {
+ fn session_name(&self) -> &str {
+ CUMULOCITY_MAPPER_NAME
}
- fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> {
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper c8y");
+ create_directories()?;
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = Self::subscriptions(&operations)?;
- let config_repository =
- tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
- let tedge_config = config_repository.load()?;
-
- let mqtt_config = mqtt_channel::Config::default()
- .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
- .with_port(tedge_config.query(MqttPortSetting)?.into())
- .with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
-
- Ok(mqtt_config)
+ self.init_session(CumulocityMapper::subscriptions(&operations)?)
+ .await?;
+ Ok(())
}
-}
-#[async_trait]
-impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
@@ -101,3 +86,25 @@ impl TEdgeComponent for CumulocityMapper {
Ok(())
}
}
+
+fn create_directories() -> Result<(), anyhow::Error> {
+ create_directory_with_user_group(
+ "/etc/tedge/operations/c8y",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o775,
+ )?;
+ create_file_with_user_group(
+ "/etc/tedge/operations/c8y/c8y_SoftwareUpdate",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o644,
+ )?;
+ create_file_with_user_group(
+ "/etc/tedge/operations/c8y/c8y_Restart",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o644,
+ )?;
+ Ok(())
+}