From 6a1f0d72c384b4e647711f483eba992b8095b6f4 Mon Sep 17 00:00:00 2001 From: PradeepKiruvale Date: Tue, 1 Mar 2022 09:41:29 +0530 Subject: [823] Configurable mqtt bind address (#929) * [823] add mqtt.bind_address option * ipaddress instead of string --- crates/core/tedge_mapper/src/az/mapper.rs | 5 +-- crates/core/tedge_mapper/src/c8y/mapper.rs | 40 ++++++++++++++---------- crates/core/tedge_mapper/src/c8y/tests.rs | 9 +++++- crates/core/tedge_mapper/src/collectd/mapper.rs | 7 +++-- crates/core/tedge_mapper/src/collectd/monitor.rs | 10 ++++-- crates/core/tedge_mapper/src/core/mapper.rs | 4 +++ 6 files changed, 51 insertions(+), 24 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index e9c0b7ee..542e6c1c 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -5,7 +5,7 @@ use crate::{ use async_trait::async_trait; use clock::WallClock; -use tedge_config::{AzureMapperTimestamp, TEdgeConfig}; +use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig}; use tedge_config::{ConfigSettingAccessor, MqttPortSetting}; use tracing::{info_span, Instrument}; @@ -24,12 +24,13 @@ impl TEdgeComponent for AzureMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let clock = Box::new(WallClock); let size_threshold = SizeThreshold(255 * 1024); let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold)); - let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_port, converter).await?; + let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; mapper .run() diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index a5df4d0b..def4808c 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -7,9 +7,10 @@ use agent_interface::topic::ResponseTopic; use async_trait::async_trait; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::operations::Operations; -use mqtt_channel::{Config, TopicFilter}; +use mqtt_channel::TopicFilter; use tedge_config::{ - ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig, + ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, + MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, }; use tracing::{info, info_span, Instrument}; @@ -39,27 +40,32 @@ impl CumulocityMapper { } pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { - info!("Initialize tedge sm mapper session"); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = Self::subscriptions(&operations)?; - let config = Config::default() - .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(false) - .with_subscriptions(mqtt_topic); - mqtt_channel::init_session(&config).await?; + info!("Initialize tedge mapper session"); + mqtt_channel::init_session(&self.get_mqtt_config()?).await?; Ok(()) } pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { - info!("Clear tedge sm mapper session"); + info!("Clear tedge mapper session"); + mqtt_channel::clear_session(&self.get_mqtt_config()?).await?; + Ok(()) + } + + fn get_mqtt_config(&mut self) -> Result { let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; let mqtt_topic = Self::subscriptions(&operations)?; - let config = Config::default() + let config_repository = + tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default()); + let tedge_config = config_repository.load()?; + + let mqtt_config = mqtt_channel::Config::default() + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) + .with_port(tedge_config.query(MqttPortSetting)?.into()) .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(true) + .with_clean_session(false) .with_subscriptions(mqtt_topic); - mqtt_channel::clear_session(&config).await?; - Ok(()) + + Ok(mqtt_config) } } @@ -74,6 +80,7 @@ impl TEdgeComponent for CumulocityMapper { let device_name = tedge_config.query(DeviceIdSetting)?; let device_type = tedge_config.query(DeviceTypeSetting)?; let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let converter = Box::new(CumulocityConverter::new( size_threshold, @@ -83,7 +90,8 @@ impl TEdgeComponent for CumulocityMapper { http_proxy, )); - let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?; + let mut mapper = + create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; mapper .run() diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 7cbed97a..04cf07cc 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -24,6 +24,7 @@ use tokio::task::JoinHandle; use super::converter::{get_child_id_from_topic, CumulocityConverter}; const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); +const MQTT_HOST: &str = "127.0.0.1"; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] @@ -796,7 +797,13 @@ impl C8YHttpProxy for FakeC8YHttpProxy { async fn start_c8y_mapper(mqtt_port: u16) -> Result, anyhow::Error> { let converter = create_c8y_converter(); - let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?; + let mut mapper = create_mapper( + "c8y-mapper-test", + MQTT_HOST.to_string(), + mqtt_port, + Box::new(converter), + ) + .await?; let mapper_task = tokio::spawn(async move { let _ = mapper.run().await; diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs index aac2b087..abae07af 100644 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ b/crates/core/tedge_mapper/src/collectd/mapper.rs @@ -3,7 +3,7 @@ use crate::{ core::component::TEdgeComponent, }; use async_trait::async_trait; -use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; +use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig}; use tracing::{info_span, Instrument}; const APP_NAME: &str = "tedge-mapper-collectd"; @@ -20,8 +20,11 @@ impl CollectdMapper { impl TEdgeComponent for CollectdMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); - let device_monitor_config = DeviceMonitorConfig::default().with_port(mqtt_port); + let device_monitor_config = DeviceMonitorConfig::default() + .with_port(mqtt_port) + .with_host(mqtt_host); let device_monitor = DeviceMonitor::new(device_monitor_config); device_monitor diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index b16f2452..c804deb5 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -15,7 +15,7 @@ const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; #[derive(Debug)] pub struct DeviceMonitorConfig { - host: &'static str, + host: String, port: u16, mqtt_client_id: &'static str, mqtt_source_topic: &'static str, @@ -28,7 +28,7 @@ pub struct DeviceMonitorConfig { impl Default for DeviceMonitorConfig { fn default() -> Self { Self { - host: DEFAULT_HOST, + host: DEFAULT_HOST.to_string(), port: DEFAULT_PORT, mqtt_client_id: DEFAULT_MQTT_CLIENT_ID, mqtt_source_topic: DEFAULT_MQTT_SOURCE_TOPIC, @@ -44,6 +44,10 @@ impl DeviceMonitorConfig { pub fn with_port(self, port: u16) -> Self { Self { port, ..self } } + + pub fn with_host(self, host: String) -> Self { + Self { host, ..self } + } } #[derive(Debug)] @@ -63,7 +67,7 @@ impl DeviceMonitor { let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? .with_qos(QoS::AtMostOnce); let mqtt_config = mqtt_channel::Config::new( - self.device_monitor_config.host, + self.device_monitor_config.host.to_string(), self.device_monitor_config.port, ) .with_session_name(self.device_monitor_config.mqtt_client_id) diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index 7f66ed62..f0691397 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -12,6 +12,7 @@ const SYNC_WINDOW: Duration = Duration::from_secs(3); pub async fn create_mapper( app_name: &str, + mqtt_host: String, mqtt_port: u16, converter: Box>, ) -> Result { @@ -20,6 +21,7 @@ pub async fn create_mapper( let mapper_config = converter.get_mapper_config(); let mqtt_client = Connection::new(&mqtt_config( app_name, + &mqtt_host, mqtt_port, mapper_config.in_topic_filter.clone(), )?) @@ -36,10 +38,12 @@ pub async fn create_mapper( pub fn mqtt_config( name: &str, + host: &str, port: u16, topics: TopicFilter, ) -> Result { Ok(mqtt_channel::Config::default() + .with_host(host) .with_port(port) .with_session_name(name) .with_subscriptions(topics) -- cgit v1.2.3