summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/az/converter.rs8
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs20
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs65
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs20
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs2
-rw-r--r--crates/core/tedge_mapper/src/core/component.rs35
-rw-r--r--crates/core/tedge_mapper/src/main.rs20
7 files changed, 121 insertions, 49 deletions
diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs
index 7989cfbd..f7f4b7a9 100644
--- a/crates/core/tedge_mapper/src/az/converter.rs
+++ b/crates/core/tedge_mapper/src/az/converter.rs
@@ -2,7 +2,7 @@ use crate::core::{converter::*, error::*, size_threshold::SizeThreshold};
use async_trait::async_trait;
use clock::Clock;
-use mqtt_channel::Message;
+use mqtt_channel::{Message, TopicFilter};
use thin_edge_json::serialize::ThinEdgeJsonSerializer;
pub struct AzureConverter {
@@ -15,7 +15,7 @@ pub struct AzureConverter {
impl AzureConverter {
pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, size_threshold: SizeThreshold) -> Self {
let mapper_config = MapperConfig {
- in_topic_filter: make_valid_topic_filter_or_panic("tedge/measurements"),
+ in_topic_filter: Self::in_topic_filter(),
out_topic: make_valid_topic_or_panic("az/messages/events/"),
errors_topic: make_valid_topic_or_panic("tedge/errors"),
};
@@ -26,6 +26,10 @@ impl AzureConverter {
mapper_config,
}
}
+
+ pub fn in_topic_filter() -> TopicFilter {
+ make_valid_topic_filter_or_panic("tedge/measurements")
+ }
}
#[async_trait]
diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index 542e6c1c..0d998eff 100644
--- a/crates/core/tedge_mapper/src/az/mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -7,7 +7,8 @@ use async_trait::async_trait;
use clock::WallClock;
use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig};
use tedge_config::{ConfigSettingAccessor, MqttPortSetting};
-use tracing::{info_span, Instrument};
+use tedge_utils::file::create_directory_with_user_group;
+use tracing::{info, info_span, Instrument};
const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
@@ -21,6 +22,23 @@ impl AzureMapper {
#[async_trait]
impl TEdgeComponent for AzureMapper {
+ fn session_name(&self) -> &str {
+ AZURE_MAPPER_NAME
+ }
+
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper az");
+ create_directory_with_user_group(
+ "/etc/tedge/operations/az",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o775,
+ )?;
+
+ self.init_session(AzureConverter::in_topic_filter()).await?;
+ Ok(())
+ }
+
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 07abc637..cb9a4f93 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -9,9 +9,10 @@ use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
use mqtt_channel::TopicFilter;
use tedge_config::{
- ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting,
- MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
+ ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttBindAddressSetting,
+ MqttPortSetting, TEdgeConfig,
};
+use tedge_utils::file::*;
use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
@@ -38,39 +39,23 @@ impl CumulocityMapper {
Ok(topic_filter)
}
+}
- pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge mapper session");
- mqtt_channel::init_session(&self.get_mqtt_config()?).await?;
- Ok(())
- }
-
- pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge mapper session");
- mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
- Ok(())
+#[async_trait]
+impl TEdgeComponent for CumulocityMapper {
+ fn session_name(&self) -> &str {
+ CUMULOCITY_MAPPER_NAME
}
- fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> {
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper c8y");
+ create_directories()?;
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = Self::subscriptions(&operations)?;
- let config_repository =
- tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
- let tedge_config = config_repository.load()?;
-
- let mqtt_config = mqtt_channel::Config::default()
- .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
- .with_port(tedge_config.query(MqttPortSetting)?.into())
- .with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
-
- Ok(mqtt_config)
+ self.init_session(CumulocityMapper::subscriptions(&operations)?)
+ .await?;
+ Ok(())
}
-}
-#[async_trait]
-impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
@@ -101,3 +86,25 @@ impl TEdgeComponent for CumulocityMapper {
Ok(())
}
}
+
+fn create_directories() -> Result<(), anyhow::Error> {
+ create_directory_with_user_group(
+ "/etc/tedge/operations/c8y",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o775,
+ )?;
+ create_file_with_user_group(
+ "/etc/tedge/operations/c8y/c8y_SoftwareUpdate",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o644,
+ )?;
+ create_file_with_user_group(
+ "/etc/tedge/operations/c8y/c8y_Restart",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o644,
+ )?;
+ Ok(())
+}
diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs
index abae07af..a4c6f4c5 100644
--- a/crates/core/tedge_mapper/src/collectd/mapper.rs
+++ b/crates/core/tedge_mapper/src/collectd/mapper.rs
@@ -3,10 +3,11 @@ use crate::{
core::component::TEdgeComponent,
};
use async_trait::async_trait;
+use mqtt_channel::TopicFilter;
use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig};
-use tracing::{info_span, Instrument};
+use tracing::{info, info_span, Instrument};
-const APP_NAME: &str = "tedge-mapper-collectd";
+const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd";
pub struct CollectdMapper {}
@@ -18,6 +19,19 @@ impl CollectdMapper {
#[async_trait]
impl TEdgeComponent for CollectdMapper {
+ fn session_name(&self) -> &str {
+ COLLECTD_MAPPER_NAME
+ }
+
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper collectd");
+ self.init_session(TopicFilter::new(
+ DeviceMonitorConfig::default().mqtt_source_topic,
+ )?)
+ .await?;
+ Ok(())
+ }
+
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
@@ -29,7 +43,7 @@ impl TEdgeComponent for CollectdMapper {
let device_monitor = DeviceMonitor::new(device_monitor_config);
device_monitor
.run()
- .instrument(info_span!(APP_NAME))
+ .instrument(info_span!(COLLECTD_MAPPER_NAME))
.await?;
Ok(())
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index c804deb5..93ea68cd 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -18,7 +18,7 @@ pub struct DeviceMonitorConfig {
host: String,
port: u16,
mqtt_client_id: &'static str,
- mqtt_source_topic: &'static str,
+ pub mqtt_source_topic: &'static str,
mqtt_target_topic: &'static str,
batching_window: u32,
maximum_message_delay: u32,
diff --git a/crates/core/tedge_mapper/src/core/component.rs b/crates/core/tedge_mapper/src/core/component.rs
index 6e5fedc6..786c78c8 100644
--- a/crates/core/tedge_mapper/src/core/component.rs
+++ b/crates/core/tedge_mapper/src/core/component.rs
@@ -1,7 +1,38 @@
use async_trait::async_trait;
-use tedge_config::TEdgeConfig;
+use mqtt_channel::TopicFilter;
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
+};
+use tracing::info;
#[async_trait]
-pub trait TEdgeComponent {
+pub trait TEdgeComponent: Sync + Send {
+ fn session_name(&self) -> &str;
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error>;
+ async fn init(&self) -> Result<(), anyhow::Error>;
+ async fn init_session(&self, mqtt_topics: TopicFilter) -> Result<(), anyhow::Error> {
+ mqtt_channel::init_session(&self.get_mqtt_config()?.with_subscriptions(mqtt_topics))
+ .await?;
+ Ok(())
+ }
+
+ async fn clear_session(&self) -> Result<(), anyhow::Error> {
+ info!("Clear {} session", self.session_name());
+ mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
+ Ok(())
+ }
+
+ fn get_mqtt_config(&self) -> Result<mqtt_channel::Config, anyhow::Error> {
+ let config_repository =
+ tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
+ let tedge_config = config_repository.load()?;
+
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
+ .with_port(tedge_config.query(MqttPortSetting)?.into())
+ .with_session_name(self.session_name())
+ .with_clean_session(false);
+
+ Ok(mqtt_config)
+ }
}
diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs
index d6de0301..aabcff27 100644
--- a/crates/core/tedge_mapper/src/main.rs
+++ b/crates/core/tedge_mapper/src/main.rs
@@ -75,26 +75,24 @@ impl fmt::Display for MapperName {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
- let mapper = MapperOpt::parse();
- tedge_utils::logging::initialise_tracing_subscriber(mapper.debug);
+ let mapper_opt = MapperOpt::parse();
+ tedge_utils::logging::initialise_tracing_subscriber(mapper_opt.debug);
- let component = lookup_component(&mapper.name);
+ let component = lookup_component(&mapper_opt.name);
let tedge_config_location =
- tedge_config::TEdgeConfigLocation::from_custom_root(&mapper.config_dir);
+ tedge_config::TEdgeConfigLocation::from_custom_root(&mapper_opt.config_dir);
let config = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()).load()?;
// Run only one instance of a mapper
let _flock = check_another_instance_is_not_running(
- &mapper.name.to_string(),
+ &mapper_opt.name.to_string(),
&config.query(RunPathDefaultSetting)?.into(),
)?;
- if mapper.init {
- let mut mapper = CumulocityMapper::new();
- mapper.init_session().await
- } else if mapper.clear {
- let mut mapper = CumulocityMapper::new();
- mapper.clear_session().await
+ if mapper_opt.init {
+ component.init().await
+ } else if mapper_opt.clear {
+ component.clear_session().await
} else {
component.start(config).await
}