summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-03-01 09:41:29 +0530
committerGitHub <noreply@github.com>2022-03-01 09:41:29 +0530
commit6a1f0d72c384b4e647711f483eba992b8095b6f4 (patch)
treeb7531e80c61dc13b2fa38949309aa64dc068e0ff /crates/core
parent3e52543e1d63a333098eba46361d80218dbd6868 (diff)
[823] Configurable mqtt bind address (#929)
* [823] add mqtt.bind_address option * ipaddress instead of string
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs5
-rw-r--r--crates/core/tedge/src/cli/config/config_key.rs1
-rw-r--r--crates/core/tedge/src/cli/connect/command.rs26
-rw-r--r--crates/core/tedge/src/cli/connect/common_mosquitto_config.rs9
-rw-r--r--crates/core/tedge/src/cli/connect/jwt_token.rs6
-rw-r--r--crates/core/tedge/src/cli/mqtt/cli.rs10
-rw-r--r--crates/core/tedge_agent/src/agent.rs6
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs5
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs40
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs9
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs7
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs10
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs4
13 files changed, 91 insertions, 47 deletions
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 4c6dff98..d8047e53 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -11,7 +11,7 @@ use reqwest::Url;
use std::time::Duration;
use tedge_config::{
C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting,
- MqttPortSetting, TEdgeConfig,
+ MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
};
use time::{format_description, OffsetDateTime};
@@ -210,11 +210,14 @@ impl JwtAuthHttpProxy {
let http_con = reqwest::ClientBuilder::new().build()?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let topic = TopicFilter::new("c8y/s/dat")?;
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
.with_clean_session(true)
+ .with_host(mqtt_host)
.with_subscriptions(topic);
+
let mut mqtt_con = Connection::new(&mqtt_config).await?;
// Ignore errors on this connection
diff --git a/crates/core/tedge/src/cli/config/config_key.rs b/crates/core/tedge/src/cli/config/config_key.rs
index 1b0fcef3..e6b9f5e6 100644
--- a/crates/core/tedge/src/cli/config/config_key.rs
+++ b/crates/core/tedge/src/cli/config/config_key.rs
@@ -48,6 +48,7 @@ impl ConfigKey {
config_key!(AzureUrlSetting),
config_key!(AzureRootCertPathSetting),
config_key!(AzureMapperTimestamp),
+ config_key!(MqttBindAddressSetting),
config_key!(MqttPortSetting),
config_key!(MqttExternalPortSetting),
config_key!(MqttExternalBindAddressSetting),
diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs
index 253fd813..30682ddd 100644
--- a/crates/core/tedge/src/cli/connect/command.rs
+++ b/crates/core/tedge/src/cli/connect/command.rs
@@ -11,7 +11,6 @@ use tedge_users::UserManager;
use tedge_utils::paths::{create_directories, ok_if_not_found, DraftFile};
use which::which;
-pub(crate) const DEFAULT_HOST: &str = "localhost";
const WAIT_FOR_CHECK_SECONDS: u64 = 2;
const C8Y_CONFIG_FILENAME: &str = "c8y-bridge.conf";
const AZURE_CONFIG_FILENAME: &str = "az-bridge.conf";
@@ -104,10 +103,16 @@ impl Command for ConnectCommand {
let updated_mosquitto_config = self
.common_mosquitto_config
.clone()
- .with_internal_opts(config.query(MqttPortSetting)?.into())
+ .with_internal_opts(
+ config.query(MqttPortSetting)?.into(),
+ config.query(MqttBindAddressSetting)?.to_string(),
+ )
.with_external_opts(
config.query(MqttExternalPortSetting).ok().map(|x| x.into()),
- config.query(MqttExternalBindAddressSetting).ok(),
+ config
+ .query(MqttExternalBindAddressSetting)
+ .ok()
+ .map(|x| x.to_string()),
config.query(MqttExternalBindInterfaceSetting).ok(),
config
.query(MqttExternalCAPathSetting)
@@ -164,6 +169,7 @@ impl Command for ConnectCommand {
check_connected_c8y_tenant_as_configured(
&config.query_string(C8yUrlSetting)?,
config.query(MqttPortSetting)?.into(),
+ config.query(MqttBindAddressSetting)?.to_string(),
);
enable_software_management(&bridge_config, self.service_manager.as_ref());
}
@@ -206,13 +212,14 @@ impl ConnectCommand {
fn check_connection(&self, config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> {
let port = config.query(MqttPortSetting)?.into();
+ let host = config.query(MqttBindAddressSetting)?.to_string();
println!(
"Sending packets to check connection. This may take up to {} seconds.\n",
WAIT_FOR_CHECK_SECONDS
);
match self.cloud {
- Cloud::Azure => check_device_status_azure(port),
+ Cloud::Azure => check_device_status_azure(port, host),
Cloud::C8y => check_device_status_c8y(config),
}
}
@@ -250,9 +257,10 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
let mut options = MqttOptions::new(
CLIENT_ID,
- DEFAULT_HOST,
+ tedge_config.query(MqttBindAddressSetting)?.to_string(),
tedge_config.query(MqttPortSetting)?.into(),
);
+
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
@@ -310,14 +318,14 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
// Empty payload will be published to az/$iothub/twin/GET/?$rid=1, here 1 is request ID.
// The result will be published by the iothub on the az/$iothub/twin/res/{status}/?$rid={request id}.
// Here if the status is 200 then it's success.
-fn check_device_status_azure(port: u16) -> Result<DeviceStatus, ConnectError> {
+fn check_device_status_azure(port: u16, host: String) -> Result<DeviceStatus, ConnectError> {
const AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM: &str = r##"az/twin/res/#"##;
const AZURE_TOPIC_DEVICE_TWIN_UPSTREAM: &str = r#"az/twin/GET/?$rid=1"#;
const CLIENT_ID: &str = "check_connection_az";
const REGISTRATION_PAYLOAD: &[u8] = b"";
const REGISTRATION_OK: &str = "200";
- let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port);
+ let mut options = MqttOptions::new(CLIENT_ID, host, port);
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
@@ -549,8 +557,8 @@ fn get_common_mosquitto_config_file_path(
}
// To confirm the connected c8y tenant is the one that user configured.
-fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16) {
- match get_connected_c8y_url(port) {
+fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16, host: String) {
+ match get_connected_c8y_url(port, host) {
Ok(url) if url == configured_url => {}
Ok(url) => println!(
"Warning: Connecting to {}, but the configured URL is {}.\n\
diff --git a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs
index 7fd9a28b..68af7d00 100644
--- a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs
+++ b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs
@@ -118,9 +118,10 @@ impl CommonMosquittoConfig {
Ok(())
}
- pub fn with_internal_opts(self, port: u16) -> Self {
+ pub fn with_internal_opts(self, port: u16, bind_address: String) -> Self {
let internal_listener = ListenerConfig {
port: Some(port),
+ bind_address: Some(bind_address),
..self.internal_listener
};
Self {
@@ -198,10 +199,10 @@ fn test_serialize() -> anyhow::Result<()> {
fn test_serialize_with_opts() -> anyhow::Result<()> {
let common_mosquitto_config = CommonMosquittoConfig::default();
let mosquitto_config_with_opts = common_mosquitto_config
- .with_internal_opts(1234)
+ .with_internal_opts(1234, "1.2.3.4".into())
.with_external_opts(
Some(2345),
- Some("0.0.0.0".into()),
+ Some("0.0.0.0".to_string()),
Some("wlan0".into()),
Some("/etc/ssl/certs".into()),
Some("cert.pem".into()),
@@ -227,7 +228,7 @@ fn test_serialize_with_opts() -> anyhow::Result<()> {
"log_type subscribe\n",
"log_type unsubscribe\n",
"message_size_limit 268435455\n",
- "listener 1234 localhost\n",
+ "listener 1234 1.2.3.4\n",
"allow_anonymous true\n",
"require_certificate false\n",
"listener 2345 0.0.0.0\n",
diff --git a/crates/core/tedge/src/cli/connect/jwt_token.rs b/crates/core/tedge/src/cli/connect/jwt_token.rs
index b81e26ed..afc86434 100644
--- a/crates/core/tedge/src/cli/connect/jwt_token.rs
+++ b/crates/core/tedge/src/cli/connect/jwt_token.rs
@@ -1,13 +1,13 @@
-use crate::cli::connect::{ConnectError, DEFAULT_HOST, RESPONSE_TIMEOUT};
+use crate::cli::connect::{ConnectError, RESPONSE_TIMEOUT};
use rumqttc::QoS::AtLeastOnce;
use rumqttc::{Event, Incoming, MqttOptions, Outgoing, Packet};
-pub(crate) fn get_connected_c8y_url(port: u16) -> Result<String, ConnectError> {
+pub(crate) fn get_connected_c8y_url(port: u16, host: String) -> Result<String, ConnectError> {
const C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM: &str = "c8y/s/uat";
const C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM: &str = "c8y/s/dat";
const CLIENT_ID: &str = "get_jwt_token_c8y";
- let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port);
+ let mut options = MqttOptions::new(CLIENT_ID, host, port);
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
diff --git a/crates/core/tedge/src/cli/mqtt/cli.rs b/crates/core/tedge/src/cli/mqtt/cli.rs
index f6878a78..ce38abeb 100644
--- a/crates/core/tedge/src/cli/mqtt/cli.rs
+++ b/crates/core/tedge/src/cli/mqtt/cli.rs
@@ -4,7 +4,6 @@ use rumqttc::QoS;
use std::time::Duration;
use tedge_config::*;
-const DEFAULT_HOST: &str = "localhost";
const PUB_CLIENT_PREFIX: &str = "tedge-pub";
const SUB_CLIENT_PREFIX: &str = "tedge-sub";
const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(2);
@@ -41,7 +40,10 @@ pub enum TEdgeMqttCli {
impl BuildCommand for TEdgeMqttCli {
fn build_command(self, context: BuildContext) -> Result<Box<dyn Command>, crate::ConfigError> {
let port = context.config_repository.load()?.query(MqttPortSetting)?;
-
+ let host = context
+ .config_repository
+ .load()?
+ .query(MqttBindAddressSetting)?;
let cmd = {
match self {
TEdgeMqttCli::Pub {
@@ -50,7 +52,7 @@ impl BuildCommand for TEdgeMqttCli {
qos,
retain,
} => MqttPublishCommand {
- host: DEFAULT_HOST.to_string(),
+ host: host.to_string(),
port: port.into(),
topic,
message,
@@ -65,7 +67,7 @@ impl BuildCommand for TEdgeMqttCli {
qos,
hide_topic,
} => MqttSubscribeCommand {
- host: DEFAULT_HOST.to_string(),
+ host: host.to_string(),
port: port.into(),
topic,
qos,
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index d1285c22..aeabfc6a 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -17,8 +17,9 @@ use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic
use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc};
use tedge_config::{
- ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting,
- SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting,
+ ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt,
+ MqttBindAddressSetting, MqttPortSetting, SoftwarePluginDefaultSetting, TEdgeConfigLocation,
+ TmpPathDefaultSetting,
};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument};
@@ -107,6 +108,7 @@ impl SmAgentConfig {
let tedge_config = config_repository.load()?;
let mqtt_config = mqtt_channel::Config::default()
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
.with_port(tedge_config.query(MqttPortSetting)?.into())
.with_max_packet_size(10 * 1024 * 1024);
diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index e9c0b7ee..542e6c1c 100644
--- a/crates/core/tedge_mapper/src/az/mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -5,7 +5,7 @@ use crate::{
use async_trait::async_trait;
use clock::WallClock;
-use tedge_config::{AzureMapperTimestamp, TEdgeConfig};
+use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig};
use tedge_config::{ConfigSettingAccessor, MqttPortSetting};
use tracing::{info_span, Instrument};
@@ -24,12 +24,13 @@ impl TEdgeComponent for AzureMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let clock = Box::new(WallClock);
let size_threshold = SizeThreshold(255 * 1024);
let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold));
- let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_port, converter).await?;
+ let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
mapper
.run()
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index a5df4d0b..def4808c 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -7,9 +7,10 @@ use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
-use mqtt_channel::{Config, TopicFilter};
+use mqtt_channel::TopicFilter;
use tedge_config::{
- ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig,
+ ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting,
+ MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
};
use tracing::{info, info_span, Instrument};
@@ -39,27 +40,32 @@ impl CumulocityMapper {
}
pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge sm mapper session");
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = Self::subscriptions(&operations)?;
- let config = Config::default()
- .with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
- mqtt_channel::init_session(&config).await?;
+ info!("Initialize tedge mapper session");
+ mqtt_channel::init_session(&self.get_mqtt_config()?).await?;
Ok(())
}
pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge sm mapper session");
+ info!("Clear tedge mapper session");
+ mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
+ Ok(())
+ }
+
+ fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> {
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let mqtt_topic = Self::subscriptions(&operations)?;
- let config = Config::default()
+ let config_repository =
+ tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
+ let tedge_config = config_repository.load()?;
+
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
+ .with_port(tedge_config.query(MqttPortSetting)?.into())
.with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(true)
+ .with_clean_session(false)
.with_subscriptions(mqtt_topic);
- mqtt_channel::clear_session(&config).await?;
- Ok(())
+
+ Ok(mqtt_config)
}
}
@@ -74,6 +80,7 @@ impl TEdgeComponent for CumulocityMapper {
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let converter = Box::new(CumulocityConverter::new(
size_threshold,
@@ -83,7 +90,8 @@ impl TEdgeComponent for CumulocityMapper {
http_proxy,
));
- let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?;
+ let mut mapper =
+ create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
mapper
.run()
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 7cbed97a..04cf07cc 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -24,6 +24,7 @@ use tokio::task::JoinHandle;
use super::converter::{get_child_id_from_topic, CumulocityConverter};
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
+const MQTT_HOST: &str = "127.0.0.1";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
@@ -796,7 +797,13 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
let converter = create_c8y_converter();
- let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?;
+ let mut mapper = create_mapper(
+ "c8y-mapper-test",
+ MQTT_HOST.to_string(),
+ mqtt_port,
+ Box::new(converter),
+ )
+ .await?;
let mapper_task = tokio::spawn(async move {
let _ = mapper.run().await;
diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs
index aac2b087..abae07af 100644
--- a/crates/core/tedge_mapper/src/collectd/mapper.rs
+++ b/crates/core/tedge_mapper/src/collectd/mapper.rs
@@ -3,7 +3,7 @@ use crate::{
core::component::TEdgeComponent,
};
use async_trait::async_trait;
-use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
+use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig};
use tracing::{info_span, Instrument};
const APP_NAME: &str = "tedge-mapper-collectd";
@@ -20,8 +20,11 @@ impl CollectdMapper {
impl TEdgeComponent for CollectdMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
- let device_monitor_config = DeviceMonitorConfig::default().with_port(mqtt_port);
+ let device_monitor_config = DeviceMonitorConfig::default()
+ .with_port(mqtt_port)
+ .with_host(mqtt_host);
let device_monitor = DeviceMonitor::new(device_monitor_config);
device_monitor
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index b16f2452..c804deb5 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -15,7 +15,7 @@ const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
#[derive(Debug)]
pub struct DeviceMonitorConfig {
- host: &'static str,
+ host: String,
port: u16,
mqtt_client_id: &'static str,
mqtt_source_topic: &'static str,
@@ -28,7 +28,7 @@ pub struct DeviceMonitorConfig {
impl Default for DeviceMonitorConfig {
fn default() -> Self {
Self {
- host: DEFAULT_HOST,
+ host: DEFAULT_HOST.to_string(),
port: DEFAULT_PORT,
mqtt_client_id: DEFAULT_MQTT_CLIENT_ID,
mqtt_source_topic: DEFAULT_MQTT_SOURCE_TOPIC,
@@ -44,6 +44,10 @@ impl DeviceMonitorConfig {
pub fn with_port(self, port: u16) -> Self {
Self { port, ..self }
}
+
+ pub fn with_host(self, host: String) -> Self {
+ Self { host, ..self }
+ }
}
#[derive(Debug)]
@@ -63,7 +67,7 @@ impl DeviceMonitor {
let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
let mqtt_config = mqtt_channel::Config::new(
- self.device_monitor_config.host,
+ self.device_monitor_config.host.to_string(),
self.device_monitor_config.port,
)
.with_session_name(self.device_monitor_config.mqtt_client_id)
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index 7f66ed62..f0691397 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -12,6 +12,7 @@ const SYNC_WINDOW: Duration = Duration::from_secs(3);
pub async fn create_mapper(
app_name: &str,
+ mqtt_host: String,
mqtt_port: u16,
converter: Box<dyn Converter<Error = ConversionError>>,
) -> Result<Mapper, anyhow::Error> {
@@ -20,6 +21,7 @@ pub async fn create_mapper(
let mapper_config = converter.get_mapper_config();
let mqtt_client = Connection::new(&mqtt_config(
app_name,
+ &mqtt_host,
mqtt_port,
mapper_config.in_topic_filter.clone(),
)?)
@@ -36,10 +38,12 @@ pub async fn create_mapper(
pub fn mqtt_config(
name: &str,
+ host: &str,
port: u16,
topics: TopicFilter,
) -> Result<mqtt_channel::Config, anyhow::Error> {
Ok(mqtt_channel::Config::default()
+ .with_host(host)
.with_port(port)
.with_session_name(name)
.with_subscriptions(topics)