summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--crates/common/tedge_config/src/models/ipaddress.rs80
-rw-r--r--crates/common/tedge_config/src/models/mod.rs3
-rw-r--r--crates/common/tedge_config/src/settings.rs15
-rw-r--r--crates/common/tedge_config/src/tedge_config.rs29
-rw-r--r--crates/common/tedge_config/src/tedge_config_defaults.rs6
-rw-r--r--crates/common/tedge_config/src/tedge_config_dto.rs3
-rw-r--r--crates/common/tedge_config/tests/test_tedge_config.rs52
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs5
-rw-r--r--crates/core/tedge/src/cli/config/config_key.rs1
-rw-r--r--crates/core/tedge/src/cli/connect/command.rs26
-rw-r--r--crates/core/tedge/src/cli/connect/common_mosquitto_config.rs9
-rw-r--r--crates/core/tedge/src/cli/connect/jwt_token.rs6
-rw-r--r--crates/core/tedge/src/cli/mqtt/cli.rs10
-rw-r--r--crates/core/tedge_agent/src/agent.rs6
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs5
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs40
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs9
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs7
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs10
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs4
-rw-r--r--docs/src/howto-guides/008_config_local_mqtt_bind_address_and_port.md107
-rw-r--r--docs/src/howto-guides/008_config_local_mqtt_port.md91
23 files changed, 377 insertions, 149 deletions
diff --git a/.gitignore b/.gitignore
index 398292d6..6d7fe0c4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,7 +6,7 @@
**/*.rs.bk
#.lock file
-mapper/cumulocity/c8y_translator_lib/fuzz/Cargo.lock
+crates/core/c8y_translator/fuzz/Cargo.lock
.idea/
.vscode/
diff --git a/crates/common/tedge_config/src/models/ipaddress.rs b/crates/common/tedge_config/src/models/ipaddress.rs
new file mode 100644
index 00000000..3c0edc4a
--- /dev/null
+++ b/crates/common/tedge_config/src/models/ipaddress.rs
@@ -0,0 +1,80 @@
+use serde::{Deserialize, Serialize};
+use std::convert::{TryFrom, TryInto};
+use std::fmt;
+use std::net::IpAddr;
+
+#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+pub struct IpAddress(pub IpAddr);
+
+#[derive(thiserror::Error, Debug)]
+#[error("Invalid ip address: '{input}'.")]
+pub struct InvalidIpAddress {
+ input: String,
+}
+
+impl fmt::Display for IpAddress {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{:?}", self.0)
+ }
+}
+
+impl Default for IpAddress {
+ fn default() -> Self {
+ IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST))
+ }
+}
+
+impl TryFrom<String> for IpAddress {
+ type Error = InvalidIpAddress;
+
+ fn try_from(input: String) -> Result<Self, Self::Error> {
+ input
+ .parse::<IpAddr>()
+ .map_err(|_| InvalidIpAddress { input })
+ .map(IpAddress)
+ }
+}
+
+impl TryInto<String> for IpAddress {
+ type Error = std::convert::Infallible;
+
+ fn try_into(self) -> Result<String, Self::Error> {
+ Ok(self.to_string())
+ }
+}
+
+impl From<IpAddress> for IpAddr {
+ fn from(val: IpAddress) -> Self {
+ val.0
+ }
+}
+
+#[cfg(test)]
+use assert_matches::*;
+use std::net::Ipv4Addr;
+use std::net::Ipv6Addr;
+
+#[test]
+fn conversion_from_valid_ipv4_succeeds() {
+ let _loh: IpAddress = IpAddress::try_from("127.0.0.1".to_string()).unwrap();
+ assert_matches!(Ipv4Addr::LOCALHOST, _loh);
+}
+
+#[test]
+fn conversion_from_valid_ipv6_succeeds() {
+ let _loh: IpAddress = IpAddress::try_from("::1".to_string()).unwrap();
+ assert_matches!(Ipv6Addr::LOCALHOST, _loh);
+}
+
+#[test]
+fn conversion_from_longer_integer_fails() {
+ assert_matches!(
+ IpAddress::try_from("66000".to_string()),
+ Err(InvalidIpAddress { .. })
+ );
+}
+
+#[test]
+fn conversion_from_ip_to_string() {
+ assert_matches!(TryInto::<String>::try_into(IpAddress(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))), Ok(ip_str) if ip_str == "::1");
+}
diff --git a/crates/common/tedge_config/src/models/mod.rs b/crates/common/tedge_config/src/models/mod.rs
index e74f756c..4044168d 100644
--- a/crates/common/tedge_config/src/models/mod.rs
+++ b/crates/common/tedge_config/src/models/mod.rs
@@ -1,5 +1,6 @@
pub mod connect_url;
pub mod file_path;
pub mod flag;
+pub mod ipaddress;
pub mod port;
-pub use self::{connect_url::*, file_path::*, flag::*, port::*};
+pub use self::{connect_url::*, file_path::*, flag::*, ipaddress::*, port::*};
diff --git a/crates/common/tedge_config/src/settings.rs b/crates/common/tedge_config/src/settings.rs
index d7b8c0d7..7ff28af4 100644
--- a/crates/common/tedge_config/src/settings.rs
+++ b/crates/common/tedge_config/src/settings.rs
@@ -172,6 +172,19 @@ impl ConfigSetting for MqttPortSetting {
type Value = Port;
}
+pub struct MqttBindAddressSetting;
+
+impl ConfigSetting for MqttBindAddressSetting {
+ const KEY: &'static str = "mqtt.bind_address";
+
+ const DESCRIPTION: &'static str = concat!(
+ "Mqtt bind address, which is used by the mqtt clients to publish or subscribe. ",
+ "Example: 127.0.0.1"
+ );
+
+ type Value = IpAddress;
+}
+
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct MqttExternalPortSetting;
@@ -197,7 +210,7 @@ impl ConfigSetting for MqttExternalBindAddressSetting {
"Example: 0.0.0.0"
);
- type Value = String;
+ type Value = IpAddress;
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
diff --git a/crates/common/tedge_config/src/tedge_config.rs b/crates/common/tedge_config/src/tedge_config.rs
index ab689ae4..2b7112c9 100644
--- a/crates/common/tedge_config/src/tedge_config.rs
+++ b/crates/common/tedge_config/src/tedge_config.rs
@@ -280,6 +280,31 @@ impl ConfigSettingAccessor<MqttPortSetting> for TEdgeConfig {
}
}
+impl ConfigSettingAccessor<MqttBindAddressSetting> for TEdgeConfig {
+ fn query(&self, _setting: MqttBindAddressSetting) -> ConfigSettingResult<IpAddress> {
+ Ok(self
+ .data
+ .mqtt
+ .bind_address
+ .clone()
+ .unwrap_or_else(|| self.config_defaults.default_mqtt_bind_address.clone()))
+ }
+
+ fn update(
+ &mut self,
+ _setting: MqttBindAddressSetting,
+ value: IpAddress,
+ ) -> ConfigSettingResult<()> {
+ self.data.mqtt.bind_address = Some(value);
+ Ok(())
+ }
+
+ fn unset(&mut self, _setting: MqttBindAddressSetting) -> ConfigSettingResult<()> {
+ self.data.mqtt.bind_address = None;
+ Ok(())
+ }
+}
+
impl ConfigSettingAccessor<MqttExternalPortSetting> for TEdgeConfig {
fn query(&self, _setting: MqttExternalPortSetting) -> ConfigSettingResult<Port> {
self.data
@@ -307,7 +332,7 @@ impl ConfigSettingAccessor<MqttExternalPortSetting> for TEdgeConfig {
}
impl ConfigSettingAccessor<MqttExternalBindAddressSetting> for TEdgeConfig {
- fn query(&self, _setting: MqttExternalBindAddressSetting) -> ConfigSettingResult<String> {
+ fn query(&self, _setting: MqttExternalBindAddressSetting) -> ConfigSettingResult<IpAddress> {
self.data
.mqtt
.external_bind_address
@@ -320,7 +345,7 @@ impl ConfigSettingAccessor<MqttExternalBindAddressSetting> for TEdgeConfig {
fn update(
&mut self,
_setting: MqttExternalBindAddressSetting,
- value: String,
+ value: IpAddress,
) -> ConfigSettingResult<()> {
self.data.mqtt.external_bind_address = Some(value);
Ok(())
diff --git a/crates/common/tedge_config/src/tedge_config_defaults.rs b/crates/common/tedge_config/src/tedge_config_defaults.rs
index 3c7cea92..e1f3f058 100644
--- a/crates/common/tedge_config/src/tedge_config_defaults.rs
+++ b/crates/common/tedge_config/src/tedge_config_defaults.rs
@@ -1,4 +1,5 @@
use crate::models::FilePath;
+use crate::IpAddress;
use crate::TEdgeConfigLocation;
use crate::{Flag, Port};
use std::path::Path;
@@ -45,6 +46,9 @@ pub struct TEdgeConfigDefaults {
/// Default device type
pub default_device_type: String,
+
+ /// Default bind address
+ pub default_mqtt_bind_address: IpAddress,
}
impl From<&TEdgeConfigLocation> for TEdgeConfigDefaults {
@@ -68,6 +72,7 @@ impl From<&TEdgeConfigLocation> for TEdgeConfigDefaults {
default_mqtt_port: Port(DEFAULT_PORT),
default_tmp_path: tmp_path.into(),
default_device_type: DEFAULT_DEVICE_TYPE.into(),
+ default_mqtt_bind_address: IpAddress::default(),
}
}
}
@@ -92,6 +97,7 @@ fn test_from_tedge_config_location() {
default_mqtt_port: Port(DEFAULT_PORT),
default_tmp_path: FilePath::from("/tmp"),
default_device_type: DEFAULT_DEVICE_TYPE.into(),
+ default_mqtt_bind_address: IpAddress::default(),
}
);
}
diff --git a/crates/common/tedge_config/src/tedge_config_dto.rs b/crates/common/tedge_config/src/tedge_config_dto.rs
index ae9cd15f..6756718f 100644
--- a/crates/common/tedge_config/src/tedge_config_dto.rs
+++ b/crates/common/tedge_config/src/tedge_config_dto.rs
@@ -81,8 +81,9 @@ pub(crate) struct AzureConfigDto {
#[serde(deny_unknown_fields)]
pub(crate) struct MqttConfigDto {
pub(crate) port: Option<u16>,
+ pub(crate) bind_address: Option<IpAddress>,
pub(crate) external_port: Option<u16>,
- pub(crate) external_bind_address: Option<String>,
+ pub(crate) external_bind_address: Option<IpAddress>,
pub(crate) external_bind_interface: Option<String>,
pub(crate) external_capath: Option<FilePath>,
pub(crate) external_certfile: Option<FilePath>,
diff --git a/crates/common/tedge_config/tests/test_tedge_config.rs b/crates/common/tedge_config/tests/test_tedge_config.rs
index 26118adf..bfa926a2 100644
--- a/crates/common/tedge_config/tests/test_tedge_config.rs
+++ b/crates/common/tedge_config/tests/test_tedge_config.rs
@@ -1,6 +1,7 @@
use assert_matches::assert_matches;
use std::convert::TryFrom;
use std::io::Write;
+use std::net::{IpAddr, Ipv4Addr};
use tedge_config::*;
use tempfile::TempDir;
@@ -30,6 +31,7 @@ external_bind_interface = "wlan0"
external_capath = "ca.pem"
external_certfile = "cert.pem"
external_keyfile = "key.pem"
+bind_address = "0.0.0.0"
[tmp]
path = "/some/value"
@@ -75,8 +77,8 @@ path = "/some/value"
assert_eq!(config.query(MqttExternalPortSetting)?, Port(2345));
assert_eq!(
- config.query(MqttExternalBindAddressSetting)?.as_str(),
- "0.0.0.0"
+ config.query(MqttExternalBindAddressSetting)?,
+ IpAddress::try_from("0.0.0.0".to_string()).unwrap()
);
assert_eq!(
@@ -103,6 +105,12 @@ path = "/some/value"
config.query(TmpPathDefaultSetting)?,
FilePath::from("/some/value")
);
+
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress::try_from("0.0.0.0".to_string()).unwrap()
+ );
+
Ok(())
}
@@ -124,6 +132,7 @@ mapper_timestamp = false
[mqtt]
port = 1883
+bind_address = "0.0.0.0"
"#;
let (_tempdir, config_location) = create_temp_tedge_config(toml_conf)?;
@@ -139,11 +148,12 @@ port = 1883
let updated_azure_url = "OtherAzure.azure-devices.net";
let updated_mqtt_port = Port(2345);
let updated_mqtt_external_port = Port(3456);
- let updated_mqtt_external_bind_address = "localhost";
+ let updated_mqtt_external_bind_address = IpAddress::default();
let updated_mqtt_external_bind_interface = "eth0";
let updated_mqtt_external_capath = "/some/path";
let updated_mqtt_external_certfile = "cert.pem";
let updated_mqtt_external_keyfile = "key.pem";
+ let updated_mqtt_bind_address = IpAddress(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST));
{
let mut config = config_repo.load()?;
@@ -187,8 +197,9 @@ port = 1883
config.update(MqttExternalPortSetting, updated_mqtt_external_port)?;
config.update(
MqttExternalBindAddressSetting,
- updated_mqtt_external_bind_address.to_string(),
+ updated_mqtt_external_bind_address.clone(),
)?;
+
config.update(
MqttExternalBindInterfaceSetting,
updated_mqtt_external_bind_interface.to_string(),
@@ -205,6 +216,7 @@ port = 1883
MqttExternalKeyfileSetting,
FilePath::from(updated_mqtt_external_keyfile),
)?;
+ config.update(MqttBindAddressSetting, updated_mqtt_bind_address.clone())?;
config_repo.store(&config)?;
}
@@ -240,7 +252,7 @@ port = 1883
updated_mqtt_external_port
);
assert_eq!(
- config.query(MqttExternalBindAddressSetting)?.as_str(),
+ config.query(MqttExternalBindAddressSetting)?,
updated_mqtt_external_bind_address
);
assert_eq!(
@@ -259,6 +271,10 @@ port = 1883
config.query(MqttExternalKeyfileSetting)?,
FilePath::from(updated_mqtt_external_keyfile)
);
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ updated_mqtt_bind_address
+ );
}
Ok(())
@@ -306,6 +322,10 @@ fn test_parse_config_with_only_device_configuration() -> Result<(), TEdgeConfigE
assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true));
assert_eq!(config.query(MqttPortSetting)?, Port(1883));
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress::try_from("127.0.0.1".to_string()).unwrap()
+ );
Ok(())
}
@@ -354,6 +374,10 @@ url = "your-tenant.cumulocity.com"
assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true));
assert_eq!(config.query(MqttPortSetting)?, Port(1883));
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST))
+ );
Ok(())
}
@@ -402,6 +426,10 @@ url = "MyAzure.azure-devices.net"
assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true));
assert_eq!(config.query(MqttPortSetting)?, Port(1883));
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST))
+ );
Ok(())
}
@@ -410,6 +438,7 @@ fn test_parse_config_with_only_mqtt_configuration() -> Result<(), TEdgeConfigErr
let toml_conf = r#"
[mqtt]
port = 2222
+bind_address = "1.2.3.4"
"#;
let (_tempdir, config_location) = create_temp_tedge_config(toml_conf)?;
@@ -447,6 +476,10 @@ port = 2222
assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true));
assert_eq!(config.query(MqttPortSetting)?, Port(2222));
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress::try_from("1.2.3.4".to_string()).unwrap()
+ );
Ok(())
}
@@ -564,6 +597,10 @@ fn test_parse_config_empty_file() -> Result<(), TEdgeConfigError> {
assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true));
assert_eq!(config.query(MqttPortSetting)?, Port(1883));
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST))
+ );
Ok(())
}
@@ -596,6 +633,10 @@ fn test_parse_config_no_config_file() -> Result<(), TEdgeConfigError> {
assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true));
assert_eq!(config.query(MqttPortSetting)?, Port(1883));
+ assert_eq!(
+ config.query(MqttBindAddressSetting)?,
+ IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST))
+ );
Ok(())
}
@@ -858,6 +899,7 @@ fn dummy_tedge_config_defaults() -> TEdgeConfigDefaults {
default_mqtt_port: Port(1883),
default_tmp_path: FilePath::from("/tmp"),
default_device_type: String::from("test"),
+ default_mqtt_bind_address: IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)),
}
}
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 4c6dff98..d8047e53 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -11,7 +11,7 @@ use reqwest::Url;
use std::time::Duration;
use tedge_config::{
C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting,
- MqttPortSetting, TEdgeConfig,
+ MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
};
use time::{format_description, OffsetDateTime};
@@ -210,11 +210,14 @@ impl JwtAuthHttpProxy {
let http_con = reqwest::ClientBuilder::new().build()?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+ let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let topic = TopicFilter::new("c8y/s/dat")?;
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
.with_clean_session(true)
+ .with_host(mqtt_host)
.with_subscriptions(topic);
+
let mut mqtt_con = Connection::new(&mqtt_config).await?;
// Ignore errors on this connection
diff --git a/crates/core/tedge/src/cli/config/config_key.rs b/crates/core/tedge/src/cli/config/config_key.rs
index 1b0fcef3..e6b9f5e6 100644
--- a/crates/core/tedge/src/cli/config/config_key.rs
+++ b/crates/core/tedge/src/cli/config/config_key.rs
@@ -48,6 +48,7 @@ impl ConfigKey {
config_key!(AzureUrlSetting),
config_key!(AzureRootCertPathSetting),
config_key!(AzureMapperTimestamp),
+ config_key!(MqttBindAddressSetting),
config_key!(MqttPortSetting),
config_key!(MqttExternalPortSetting),
config_key!(MqttExternalBindAddressSetting),
diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs
index 253fd813..30682ddd 100644
--- a/crates/core/tedge/src/cli/connect/command.rs
+++ b/crates/core/tedge/src/cli/connect/command.rs
@@ -11,7 +11,6 @@ use tedge_users::UserManager;
use tedge_utils::paths::{create_directories, ok_if_not_found, DraftFile};
use which::which;
-pub(crate) const DEFAULT_HOST: &str = "localhost";
const WAIT_FOR_CHECK_SECONDS: u64 = 2;
const C8Y_CONFIG_FILENAME: &str = "c8y-bridge.conf";
const AZURE_CONFIG_FILENAME: &str = "az-bridge.conf";
@@ -104,10 +103,16 @@ impl Command for ConnectCommand {
let updated_mosquitto_config = self
.common_mosquitto_config
.clone()
- .with_internal_opts(config.query(MqttPortSetting)?.into())
+ .with_internal_opts(
+ config.query(MqttPortSetting)?.into(),
+ config.query(MqttBindAddressSetting)?.to_string(),
+ )
.with_external_opts(
config.query(MqttExternalPortSetting).ok().map(|x| x.into()),
- config.query(MqttExternalBindAddressSetting).ok(),
+ config
+ .query(MqttExternalBindAddressSetting)
+ .ok()
+ .map(|x| x.to_string()),
config.query(MqttExternalBindInterfaceSetting).ok(),
config
.query(MqttExternalCAPathSetting)
@@ -164,6 +169,7 @@ impl Command for ConnectCommand {
check_connected_c8y_tenant_as_configured(
&config.query_string(C8yUrlSetting)?,
config.query(MqttPortSetting)?.into(),
+ config.query(MqttBindAddressSetting)?.to_string(),
);
enable_software_management(&bridge_config, self.service_manager.as_ref());
}
@@ -206,13 +212,14 @@ impl ConnectCommand {
fn check_connection(&self, config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> {
let port = config.query(MqttPortSetting)?.into();
+ let host = config.query(MqttBindAddressSetting)?.to_string();
println!(
"Sending packets to check connection. This may take up to {} seconds.\n",
WAIT_FOR_CHECK_SECONDS
);
match self.cloud {
- Cloud::Azure => check_device_status_azure(port),
+ Cloud::Azure => check_device_status_azure(port, host),
Cloud::C8y => check_device_status_c8y(config),
}
}
@@ -250,9 +257,10 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
let mut options = MqttOptions::new(
CLIENT_ID,
- DEFAULT_HOST,
+ tedge_config.query(MqttBindAddressSetting)?.to_string(),
tedge_config.query(MqttPortSetting)?.into(),
);
+
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
@@ -310,14 +318,14 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
// Empty payload will be published to az/$iothub/twin/GET/?$rid=1, here 1 is request ID.
// The result will be published by the iothub on the az/$iothub/twin/res/{status}/?$rid={request id}.
// Here if the status is 200 then it's success.
-fn check_device_status_azure(port: u16) -> Result<DeviceStatus, ConnectError> {
+fn check_device_status_azure(port: u16, host: String) -> Result<DeviceStatus, ConnectError> {
const AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM: &str = r##"az/twin/res/#"##;
const AZURE_TOPIC_DEVICE_TWIN_UPSTREAM: &str = r#"az/twin/GET/?$rid=1"#;
const CLIENT_ID: &str = "check_connection_az";
const REGISTRATION_PAYLOAD: &[u8] = b"";
const REGISTRATION_OK: &str = "200";
- let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port);
+ let mut options = MqttOptions::new(CLIENT_ID, host, port);
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
@@ -549,8 +557,8 @@ fn get_common_mosquitto_config_file_path(
}
// To confirm the connected c8y tenant is the one that user configured.
-fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16) {
- match get_connected_c8y_url(port) {
+fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16, host: String) {
+ match get_connected_c8y_url(port, host) {
Ok(url) if url == configured_url => {}
Ok(url) => println!(
"Warning: Connecting to {}, but the configured URL is {}.\n\
diff --git a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs
index 7fd9a28b..68af7d00 100644
--- a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs
+++ b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs
@@ -118,9 +118,10 @@ impl CommonMosquittoConfig {
Ok(())
}
- pub fn with_internal_opts(self, port: u16) -> Self {
+ pub fn with_internal_opts(self, port: u16, bind_address: String) -> Self {
let internal_listener = ListenerConfig {
port: Some(port),
+ bind_address: Some(bind_address),
..self.internal_listener
};
Self {
@@ -198,10 +199,10 @@ fn test_serialize() -> anyhow::Result<()> {
fn test_serialize_with_opts() -> anyhow::Result<()> {
let common_mosquitto_config = CommonMosquittoConfig::default();
let mosquitto_config_with_opts = common_mosquitto_config
- .with_internal_opts(1234)
+ .with_internal_opts(1234, "1.2.3.4".into())
.with_external_opts(
Some(2345),
- Some("0.0.0.0".into()),
+ Some("0.0.0.0".to_string()),
Some("wlan0".into()),
Some("/etc/ssl/certs".into()),
Some("cert.pem".into()),
@@ -227,7 +228,7 @@ fn test_serialize_with_opts() -> anyhow::Result<()> {
"log_type subscribe\n",
"log_type unsubscribe\n",
"message_size_limit 268435455\n",
- "listener 1234 localhost\n",
+ "listener 1234 1.2.3.4\n",
"allow_anonymous true\n",
"require_certificate false\n",
"listener 2345 0.0.0.0\n",
diff --git a/crates/core/tedge/src/cli/connect/jwt_token.rs b/crates/core/tedge/src/cli/connect/jwt_token.rs
index b81e26ed..afc86434 100644
--- a/crates/core/tedge/src/cli/connect/jwt_token.rs
+++ b/crates/core/tedge/src/cli/connect/jwt_token.rs
@@ -1,13 +1,13 @@
-use crate::cli::connect::{ConnectError, DEFAULT_HOST, RESPONSE_TIMEOUT};
+use crate::cli::connect::{ConnectError, RESPONSE_TIMEOUT};
use rumqttc::QoS::AtLeastOnce;
use rumqttc::{Event, Incoming, MqttOptions, Outgoing, Packet};
-pub(crate) fn get_connected_c8y_url(port: u16) -> Result<String, ConnectError> {
+pub(crate) fn get_connected_c8y_url(port: u16, host: String) -> Result<String, ConnectError> {
const C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM: &str = "c8y/s/uat";
const C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM: &str = "c8y/s/dat";
const CLIENT_ID: &str = "get_jwt_token_c8y";
- let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port);
+ let mut options = MqttOptions::new(CLIENT_ID, host, port);
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
diff --git a/crates/core/tedge/src/cli/mqtt/cli.rs b/crates/core/tedge/src/cli/mqtt/cli.rs
index f6878a78..ce38abeb 100644
--- a/crates/core/tedge/src/cli/mqtt/cli.rs
+++ b/crates/core/tedge/src/cli/mqtt/cli.rs
@@ -4,7 +4,6 @@ use rumqttc::QoS;
use std::time::Duration;
use tedge_config::*;
-const DEFAULT_HOST: &str = "localhost";
const PUB_CLIENT_PREFIX: &str = "tedge-pub";
const SUB_CLIENT_PREFIX: &str = "tedge-sub";
const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(2);
@@ -41,7 +40,10 @@ pub enum TEdgeMqttCli {
impl BuildCommand for TEdgeMqttCli {
fn build_command(self, context: BuildContext) -> Result<Box<dyn Command>, crate::ConfigError> {
let port = context.config_repository.load()?.query(MqttPortSetting)?;
-
+ let host = context
+ .config_repository
+ .load()?
+ .query(MqttBindAddressSetting)?;
let cmd = {
match self {
TEdgeMqttCli::Pub {
@@ -50,7 +52,7 @@ impl BuildCommand for TEdgeMqttCli {
qos,
retain,
} => MqttPublishCommand {
- host: DEFAULT_HOST.to_string(),
+ host: host.to_string(),
port: port.into(),
topic,
message,
@@ -65,7 +67,7 @@ impl BuildCommand for TEdgeMqttCli {
qos,
hide_topic,
} => MqttSubscribeCommand {
- host: DEFAULT_HOST.to_string(),
+ host: host.to_string(),
port: port.into(),
topic,
qos,
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index d1285c22..aeabfc6a 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -17,8 +17,9 @@ use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic
use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc};
use tedge_config::{
- ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting,
- SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting,
+ ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt,
+ MqttBindAddressSetting, MqttPortSetting, SoftwarePluginDefaultSetting, TEdgeConfigLocation,
+ TmpPathDefaultSetting,
};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument};
@@ -107,6 +108,7 @@ impl SmAgentConfig {
let tedge_config = config_repository.load()?;
let mqtt_config = mqtt_channel::Config::default()
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string