summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-03-01 09:41:29 +0530
committerGitHub <noreply@github.com>2022-03-01 09:41:29 +0530
commit6a1f0d72c384b4e647711f483eba992b8095b6f4 (patch)
treeb7531e80c61dc13b2fa38949309aa64dc068e0ff /crates/core/tedge_mapper/src
parent3e52543e1d63a333098eba46361d80218dbd6868 (diff)
[823] Configurable mqtt bind address (#929)
* [823] add mqtt.bind_address option * ipaddress instead of string
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs5
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs40
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs9
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs7
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs10
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs4
6 files changed, 51 insertions, 24 deletions
diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index e9c0b7ee..542e6c1c 100644
--- a/crates/core/tedge_mapper/src/az/mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -5,7 +5,7 @@ use crate::{
use async_trait::async_trait;
use clock::WallClock;
-use tedge_config::{AzureMapperTimestamp, TEdgeConfig};
+use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig};
use tedge_config::{ConfigSettingAccessor, MqttPortSetting};
use tracing::{info_span, Instrument};
@@ -24,12 +24,13 @@ impl TEdgeComponent for AzureMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let clock = Box::new(WallClock);
let size_threshold = SizeThreshold(255 * 1024);
let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold));
- let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_port, converter).await?;
+ let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
mapper
.run()
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index a5df4d0b..def4808c 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -7,9 +7,10 @@ use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
-use mqtt_channel::{Config, TopicFilter};
+use mqtt_channel::TopicFilter;
use tedge_config::{
- ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig,
+ ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting,
+ MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
};
use tracing::{info, info_span, Instrument};
@@ -39,27 +40,32 @@ impl CumulocityMapper {
}
pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge sm mapper session");
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = Self::subscriptions(&operations)?;
- let config = Config::default()
- .with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
- mqtt_channel::init_session(&config).await?;
+ info!("Initialize tedge mapper session");
+ mqtt_channel::init_session(&self.get_mqtt_config()?).await?;
Ok(())
}
pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge sm mapper session");
+ info!("Clear tedge mapper session");
+ mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
+ Ok(())
+ }
+
+ fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> {
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let mqtt_topic = Self::subscriptions(&operations)?;
- let config = Config::default()
+ let config_repository =
+ tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
+ let tedge_config = config_repository.load()?;
+
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
+ .with_port(tedge_config.query(MqttPortSetting)?.into())
.with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(true)
+ .with_clean_session(false)
.with_subscriptions(mqtt_topic);
- mqtt_channel::clear_session(&config).await?;
- Ok(())
+
+ Ok(mqtt_config)
}
}
@@ -74,6 +80,7 @@ impl TEdgeComponent for CumulocityMapper {
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let converter = Box::new(CumulocityConverter::new(
size_threshold,
@@ -83,7 +90,8 @@ impl TEdgeComponent for CumulocityMapper {
http_proxy,
));
- let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?;
+ let mut mapper =
+ create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
mapper
.run()
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 7cbed97a..04cf07cc 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -24,6 +24,7 @@ use tokio::task::JoinHandle;
use super::converter::{get_child_id_from_topic, CumulocityConverter};
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
+const MQTT_HOST: &str = "127.0.0.1";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
@@ -796,7 +797,13 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
let converter = create_c8y_converter();
- let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?;
+ let mut mapper = create_mapper(
+ "c8y-mapper-test",
+ MQTT_HOST.to_string(),
+ mqtt_port,
+ Box::new(converter),
+ )
+ .await?;
let mapper_task = tokio::spawn(async move {
let _ = mapper.run().await;
diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs
index aac2b087..abae07af 100644
--- a/crates/core/tedge_mapper/src/collectd/mapper.rs
+++ b/crates/core/tedge_mapper/src/collectd/mapper.rs
@@ -3,7 +3,7 @@ use crate::{
core::component::TEdgeComponent,
};
use async_trait::async_trait;
-use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
+use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig};
use tracing::{info_span, Instrument};
const APP_NAME: &str = "tedge-mapper-collectd";
@@ -20,8 +20,11 @@ impl CollectdMapper {
impl TEdgeComponent for CollectdMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
- let device_monitor_config = DeviceMonitorConfig::default().with_port(mqtt_port);
+ let device_monitor_config = DeviceMonitorConfig::default()
+ .with_port(mqtt_port)
+ .with_host(mqtt_host);
let device_monitor = DeviceMonitor::new(device_monitor_config);
device_monitor
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index b16f2452..c804deb5 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -15,7 +15,7 @@ const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
#[derive(Debug)]
pub struct DeviceMonitorConfig {
- host: &'static str,
+ host: String,
port: u16,
mqtt_client_id: &'static str,
mqtt_source_topic: &'static str,
@@ -28,7 +28,7 @@ pub struct DeviceMonitorConfig {
impl Default for DeviceMonitorConfig {
fn default() -> Self {
Self {
- host: DEFAULT_HOST,
+ host: DEFAULT_HOST.to_string(),
port: DEFAULT_PORT,
mqtt_client_id: DEFAULT_MQTT_CLIENT_ID,
mqtt_source_topic: DEFAULT_MQTT_SOURCE_TOPIC,
@@ -44,6 +44,10 @@ impl DeviceMonitorConfig {
pub fn with_port(self, port: u16) -> Self {
Self { port, ..self }
}
+
+ pub fn with_host(self, host: String) -> Self {
+ Self { host, ..self }
+ }
}
#[derive(Debug)]
@@ -63,7 +67,7 @@ impl DeviceMonitor {
let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
let mqtt_config = mqtt_channel::Config::new(
- self.device_monitor_config.host,
+ self.device_monitor_config.host.to_string(),
self.device_monitor_config.port,
)
.with_session_name(self.device_monitor_config.mqtt_client_id)
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index 7f66ed62..f0691397 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -12,6 +12,7 @@ const SYNC_WINDOW: Duration = Duration::from_secs(3);
pub async fn create_mapper(
app_name: &str,
+ mqtt_host: String,
mqtt_port: u16,
converter: Box<dyn Converter<Error = ConversionError>>,
) -> Result<Mapper, anyhow::Error> {
@@ -20,6 +21,7 @@ pub async fn create_mapper(
let mapper_config = converter.get_mapper_config();
let mqtt_client = Connection::new(&mqtt_config(
app_name,
+ &mqtt_host,
mqtt_port,
mapper_config.in_topic_filter.clone(),
)?)
@@ -36,10 +38,12 @@ pub async fn create_mapper(
pub fn mqtt_config(
name: &str,
+ host: &str,
port: u16,
topics: TopicFilter,
) -> Result<mqtt_channel::Config, anyhow::Error> {
Ok(mqtt_channel::Config::default()
+ .with_host(host)
.with_port(port)
.with_session_name(name)
.with_subscriptions(topics)