diff options
23 files changed, 377 insertions, 149 deletions
@@ -6,7 +6,7 @@ **/*.rs.bk #.lock file -mapper/cumulocity/c8y_translator_lib/fuzz/Cargo.lock +crates/core/c8y_translator/fuzz/Cargo.lock .idea/ .vscode/ diff --git a/crates/common/tedge_config/src/models/ipaddress.rs b/crates/common/tedge_config/src/models/ipaddress.rs new file mode 100644 index 00000000..3c0edc4a --- /dev/null +++ b/crates/common/tedge_config/src/models/ipaddress.rs @@ -0,0 +1,80 @@ +use serde::{Deserialize, Serialize}; +use std::convert::{TryFrom, TryInto}; +use std::fmt; +use std::net::IpAddr; + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct IpAddress(pub IpAddr); + +#[derive(thiserror::Error, Debug)] +#[error("Invalid ip address: '{input}'.")] +pub struct InvalidIpAddress { + input: String, +} + +impl fmt::Display for IpAddress { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl Default for IpAddress { + fn default() -> Self { + IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)) + } +} + +impl TryFrom<String> for IpAddress { + type Error = InvalidIpAddress; + + fn try_from(input: String) -> Result<Self, Self::Error> { + input + .parse::<IpAddr>() + .map_err(|_| InvalidIpAddress { input }) + .map(IpAddress) + } +} + +impl TryInto<String> for IpAddress { + type Error = std::convert::Infallible; + + fn try_into(self) -> Result<String, Self::Error> { + Ok(self.to_string()) + } +} + +impl From<IpAddress> for IpAddr { + fn from(val: IpAddress) -> Self { + val.0 + } +} + +#[cfg(test)] +use assert_matches::*; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; + +#[test] +fn conversion_from_valid_ipv4_succeeds() { + let _loh: IpAddress = IpAddress::try_from("127.0.0.1".to_string()).unwrap(); + assert_matches!(Ipv4Addr::LOCALHOST, _loh); +} + +#[test] +fn conversion_from_valid_ipv6_succeeds() { + let _loh: IpAddress = IpAddress::try_from("::1".to_string()).unwrap(); + assert_matches!(Ipv6Addr::LOCALHOST, _loh); +} + +#[test] +fn conversion_from_longer_integer_fails() { + assert_matches!( + IpAddress::try_from("66000".to_string()), + Err(InvalidIpAddress { .. }) + ); +} + +#[test] +fn conversion_from_ip_to_string() { + assert_matches!(TryInto::<String>::try_into(IpAddress(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))), Ok(ip_str) if ip_str == "::1"); +} diff --git a/crates/common/tedge_config/src/models/mod.rs b/crates/common/tedge_config/src/models/mod.rs index e74f756c..4044168d 100644 --- a/crates/common/tedge_config/src/models/mod.rs +++ b/crates/common/tedge_config/src/models/mod.rs @@ -1,5 +1,6 @@ pub mod connect_url; pub mod file_path; pub mod flag; +pub mod ipaddress; pub mod port; -pub use self::{connect_url::*, file_path::*, flag::*, port::*}; +pub use self::{connect_url::*, file_path::*, flag::*, ipaddress::*, port::*}; diff --git a/crates/common/tedge_config/src/settings.rs b/crates/common/tedge_config/src/settings.rs index d7b8c0d7..7ff28af4 100644 --- a/crates/common/tedge_config/src/settings.rs +++ b/crates/common/tedge_config/src/settings.rs @@ -172,6 +172,19 @@ impl ConfigSetting for MqttPortSetting { type Value = Port; } +pub struct MqttBindAddressSetting; + +impl ConfigSetting for MqttBindAddressSetting { + const KEY: &'static str = "mqtt.bind_address"; + + const DESCRIPTION: &'static str = concat!( + "Mqtt bind address, which is used by the mqtt clients to publish or subscribe. ", + "Example: 127.0.0.1" + ); + + type Value = IpAddress; +} + #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct MqttExternalPortSetting; @@ -197,7 +210,7 @@ impl ConfigSetting for MqttExternalBindAddressSetting { "Example: 0.0.0.0" ); - type Value = String; + type Value = IpAddress; } #[derive(Debug, Copy, Clone, Eq, PartialEq)] diff --git a/crates/common/tedge_config/src/tedge_config.rs b/crates/common/tedge_config/src/tedge_config.rs index ab689ae4..2b7112c9 100644 --- a/crates/common/tedge_config/src/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config.rs @@ -280,6 +280,31 @@ impl ConfigSettingAccessor<MqttPortSetting> for TEdgeConfig { } } +impl ConfigSettingAccessor<MqttBindAddressSetting> for TEdgeConfig { + fn query(&self, _setting: MqttBindAddressSetting) -> ConfigSettingResult<IpAddress> { + Ok(self + .data + .mqtt + .bind_address + .clone() + .unwrap_or_else(|| self.config_defaults.default_mqtt_bind_address.clone())) + } + + fn update( + &mut self, + _setting: MqttBindAddressSetting, + value: IpAddress, + ) -> ConfigSettingResult<()> { + self.data.mqtt.bind_address = Some(value); + Ok(()) + } + + fn unset(&mut self, _setting: MqttBindAddressSetting) -> ConfigSettingResult<()> { + self.data.mqtt.bind_address = None; + Ok(()) + } +} + impl ConfigSettingAccessor<MqttExternalPortSetting> for TEdgeConfig { fn query(&self, _setting: MqttExternalPortSetting) -> ConfigSettingResult<Port> { self.data @@ -307,7 +332,7 @@ impl ConfigSettingAccessor<MqttExternalPortSetting> for TEdgeConfig { } impl ConfigSettingAccessor<MqttExternalBindAddressSetting> for TEdgeConfig { - fn query(&self, _setting: MqttExternalBindAddressSetting) -> ConfigSettingResult<String> { + fn query(&self, _setting: MqttExternalBindAddressSetting) -> ConfigSettingResult<IpAddress> { self.data .mqtt .external_bind_address @@ -320,7 +345,7 @@ impl ConfigSettingAccessor<MqttExternalBindAddressSetting> for TEdgeConfig { fn update( &mut self, _setting: MqttExternalBindAddressSetting, - value: String, + value: IpAddress, ) -> ConfigSettingResult<()> { self.data.mqtt.external_bind_address = Some(value); Ok(()) diff --git a/crates/common/tedge_config/src/tedge_config_defaults.rs b/crates/common/tedge_config/src/tedge_config_defaults.rs index 3c7cea92..e1f3f058 100644 --- a/crates/common/tedge_config/src/tedge_config_defaults.rs +++ b/crates/common/tedge_config/src/tedge_config_defaults.rs @@ -1,4 +1,5 @@ use crate::models::FilePath; +use crate::IpAddress; use crate::TEdgeConfigLocation; use crate::{Flag, Port}; use std::path::Path; @@ -45,6 +46,9 @@ pub struct TEdgeConfigDefaults { /// Default device type pub default_device_type: String, + + /// Default bind address + pub default_mqtt_bind_address: IpAddress, } impl From<&TEdgeConfigLocation> for TEdgeConfigDefaults { @@ -68,6 +72,7 @@ impl From<&TEdgeConfigLocation> for TEdgeConfigDefaults { default_mqtt_port: Port(DEFAULT_PORT), default_tmp_path: tmp_path.into(), default_device_type: DEFAULT_DEVICE_TYPE.into(), + default_mqtt_bind_address: IpAddress::default(), } } } @@ -92,6 +97,7 @@ fn test_from_tedge_config_location() { default_mqtt_port: Port(DEFAULT_PORT), default_tmp_path: FilePath::from("/tmp"), default_device_type: DEFAULT_DEVICE_TYPE.into(), + default_mqtt_bind_address: IpAddress::default(), } ); } diff --git a/crates/common/tedge_config/src/tedge_config_dto.rs b/crates/common/tedge_config/src/tedge_config_dto.rs index ae9cd15f..6756718f 100644 --- a/crates/common/tedge_config/src/tedge_config_dto.rs +++ b/crates/common/tedge_config/src/tedge_config_dto.rs @@ -81,8 +81,9 @@ pub(crate) struct AzureConfigDto { #[serde(deny_unknown_fields)] pub(crate) struct MqttConfigDto { pub(crate) port: Option<u16>, + pub(crate) bind_address: Option<IpAddress>, pub(crate) external_port: Option<u16>, - pub(crate) external_bind_address: Option<String>, + pub(crate) external_bind_address: Option<IpAddress>, pub(crate) external_bind_interface: Option<String>, pub(crate) external_capath: Option<FilePath>, pub(crate) external_certfile: Option<FilePath>, diff --git a/crates/common/tedge_config/tests/test_tedge_config.rs b/crates/common/tedge_config/tests/test_tedge_config.rs index 26118adf..bfa926a2 100644 --- a/crates/common/tedge_config/tests/test_tedge_config.rs +++ b/crates/common/tedge_config/tests/test_tedge_config.rs @@ -1,6 +1,7 @@ use assert_matches::assert_matches; use std::convert::TryFrom; use std::io::Write; +use std::net::{IpAddr, Ipv4Addr}; use tedge_config::*; use tempfile::TempDir; @@ -30,6 +31,7 @@ external_bind_interface = "wlan0" external_capath = "ca.pem" external_certfile = "cert.pem" external_keyfile = "key.pem" +bind_address = "0.0.0.0" [tmp] path = "/some/value" @@ -75,8 +77,8 @@ path = "/some/value" assert_eq!(config.query(MqttExternalPortSetting)?, Port(2345)); assert_eq!( - config.query(MqttExternalBindAddressSetting)?.as_str(), - "0.0.0.0" + config.query(MqttExternalBindAddressSetting)?, + IpAddress::try_from("0.0.0.0".to_string()).unwrap() ); assert_eq!( @@ -103,6 +105,12 @@ path = "/some/value" config.query(TmpPathDefaultSetting)?, FilePath::from("/some/value") ); + + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress::try_from("0.0.0.0".to_string()).unwrap() + ); + Ok(()) } @@ -124,6 +132,7 @@ mapper_timestamp = false [mqtt] port = 1883 +bind_address = "0.0.0.0" "#; let (_tempdir, config_location) = create_temp_tedge_config(toml_conf)?; @@ -139,11 +148,12 @@ port = 1883 let updated_azure_url = "OtherAzure.azure-devices.net"; let updated_mqtt_port = Port(2345); let updated_mqtt_external_port = Port(3456); - let updated_mqtt_external_bind_address = "localhost"; + let updated_mqtt_external_bind_address = IpAddress::default(); let updated_mqtt_external_bind_interface = "eth0"; let updated_mqtt_external_capath = "/some/path"; let updated_mqtt_external_certfile = "cert.pem"; let updated_mqtt_external_keyfile = "key.pem"; + let updated_mqtt_bind_address = IpAddress(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST)); { let mut config = config_repo.load()?; @@ -187,8 +197,9 @@ port = 1883 config.update(MqttExternalPortSetting, updated_mqtt_external_port)?; config.update( MqttExternalBindAddressSetting, - updated_mqtt_external_bind_address.to_string(), + updated_mqtt_external_bind_address.clone(), )?; + config.update( MqttExternalBindInterfaceSetting, updated_mqtt_external_bind_interface.to_string(), @@ -205,6 +216,7 @@ port = 1883 MqttExternalKeyfileSetting, FilePath::from(updated_mqtt_external_keyfile), )?; + config.update(MqttBindAddressSetting, updated_mqtt_bind_address.clone())?; config_repo.store(&config)?; } @@ -240,7 +252,7 @@ port = 1883 updated_mqtt_external_port ); assert_eq!( - config.query(MqttExternalBindAddressSetting)?.as_str(), + config.query(MqttExternalBindAddressSetting)?, updated_mqtt_external_bind_address ); assert_eq!( @@ -259,6 +271,10 @@ port = 1883 config.query(MqttExternalKeyfileSetting)?, FilePath::from(updated_mqtt_external_keyfile) ); + assert_eq!( + config.query(MqttBindAddressSetting)?, + updated_mqtt_bind_address + ); } Ok(()) @@ -306,6 +322,10 @@ fn test_parse_config_with_only_device_configuration() -> Result<(), TEdgeConfigE assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true)); assert_eq!(config.query(MqttPortSetting)?, Port(1883)); + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress::try_from("127.0.0.1".to_string()).unwrap() + ); Ok(()) } @@ -354,6 +374,10 @@ url = "your-tenant.cumulocity.com" assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true)); assert_eq!(config.query(MqttPortSetting)?, Port(1883)); + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)) + ); Ok(()) } @@ -402,6 +426,10 @@ url = "MyAzure.azure-devices.net" assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true)); assert_eq!(config.query(MqttPortSetting)?, Port(1883)); + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)) + ); Ok(()) } @@ -410,6 +438,7 @@ fn test_parse_config_with_only_mqtt_configuration() -> Result<(), TEdgeConfigErr let toml_conf = r#" [mqtt] port = 2222 +bind_address = "1.2.3.4" "#; let (_tempdir, config_location) = create_temp_tedge_config(toml_conf)?; @@ -447,6 +476,10 @@ port = 2222 assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true)); assert_eq!(config.query(MqttPortSetting)?, Port(2222)); + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress::try_from("1.2.3.4".to_string()).unwrap() + ); Ok(()) } @@ -564,6 +597,10 @@ fn test_parse_config_empty_file() -> Result<(), TEdgeConfigError> { assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true)); assert_eq!(config.query(MqttPortSetting)?, Port(1883)); + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)) + ); Ok(()) } @@ -596,6 +633,10 @@ fn test_parse_config_no_config_file() -> Result<(), TEdgeConfigError> { assert_eq!(config.query(AzureMapperTimestamp)?, Flag(true)); assert_eq!(config.query(MqttPortSetting)?, Port(1883)); + assert_eq!( + config.query(MqttBindAddressSetting)?, + IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)) + ); Ok(()) } @@ -858,6 +899,7 @@ fn dummy_tedge_config_defaults() -> TEdgeConfigDefaults { default_mqtt_port: Port(1883), default_tmp_path: FilePath::from("/tmp"), default_device_type: String::from("test"), + default_mqtt_bind_address: IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)), } } diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 4c6dff98..d8047e53 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -11,7 +11,7 @@ use reqwest::Url; use std::time::Duration; use tedge_config::{ C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, - MqttPortSetting, TEdgeConfig, + MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, }; use time::{format_description, OffsetDateTime}; @@ -210,11 +210,14 @@ impl JwtAuthHttpProxy { let http_con = reqwest::ClientBuilder::new().build()?; let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let topic = TopicFilter::new("c8y/s/dat")?; let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) .with_clean_session(true) + .with_host(mqtt_host) .with_subscriptions(topic); + let mut mqtt_con = Connection::new(&mqtt_config).await?; // Ignore errors on this connection diff --git a/crates/core/tedge/src/cli/config/config_key.rs b/crates/core/tedge/src/cli/config/config_key.rs index 1b0fcef3..e6b9f5e6 100644 --- a/crates/core/tedge/src/cli/config/config_key.rs +++ b/crates/core/tedge/src/cli/config/config_key.rs @@ -48,6 +48,7 @@ impl ConfigKey { config_key!(AzureUrlSetting), config_key!(AzureRootCertPathSetting), config_key!(AzureMapperTimestamp), + config_key!(MqttBindAddressSetting), config_key!(MqttPortSetting), config_key!(MqttExternalPortSetting), config_key!(MqttExternalBindAddressSetting), diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 253fd813..30682ddd 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -11,7 +11,6 @@ use tedge_users::UserManager; use tedge_utils::paths::{create_directories, ok_if_not_found, DraftFile}; use which::which; -pub(crate) const DEFAULT_HOST: &str = "localhost"; const WAIT_FOR_CHECK_SECONDS: u64 = 2; const C8Y_CONFIG_FILENAME: &str = "c8y-bridge.conf"; const AZURE_CONFIG_FILENAME: &str = "az-bridge.conf"; @@ -104,10 +103,16 @@ impl Command for ConnectCommand { let updated_mosquitto_config = self .common_mosquitto_config .clone() - .with_internal_opts(config.query(MqttPortSetting)?.into()) + .with_internal_opts( + config.query(MqttPortSetting)?.into(), + config.query(MqttBindAddressSetting)?.to_string(), + ) .with_external_opts( config.query(MqttExternalPortSetting).ok().map(|x| x.into()), - config.query(MqttExternalBindAddressSetting).ok(), + config + .query(MqttExternalBindAddressSetting) + .ok() + .map(|x| x.to_string()), config.query(MqttExternalBindInterfaceSetting).ok(), config .query(MqttExternalCAPathSetting) @@ -164,6 +169,7 @@ impl Command for ConnectCommand { check_connected_c8y_tenant_as_configured( &config.query_string(C8yUrlSetting)?, config.query(MqttPortSetting)?.into(), + config.query(MqttBindAddressSetting)?.to_string(), ); enable_software_management(&bridge_config, self.service_manager.as_ref()); } @@ -206,13 +212,14 @@ impl ConnectCommand { fn check_connection(&self, config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> { let port = config.query(MqttPortSetting)?.into(); + let host = config.query(MqttBindAddressSetting)?.to_string(); println!( "Sending packets to check connection. This may take up to {} seconds.\n", WAIT_FOR_CHECK_SECONDS ); match self.cloud { - Cloud::Azure => check_device_status_azure(port), + Cloud::Azure => check_device_status_azure(port, host), Cloud::C8y => check_device_status_c8y(config), } } @@ -250,9 +257,10 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C let mut options = MqttOptions::new( CLIENT_ID, - DEFAULT_HOST, + tedge_config.query(MqttBindAddressSetting)?.to_string(), tedge_config.query(MqttPortSetting)?.into(), ); + options.set_keep_alive(RESPONSE_TIMEOUT); let (mut client, mut connection) = rumqttc::Client::new(options, 10); @@ -310,14 +318,14 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C // Empty payload will be published to az/$iothub/twin/GET/?$rid=1, here 1 is request ID. // The result will be published by the iothub on the az/$iothub/twin/res/{status}/?$rid={request id}. // Here if the status is 200 then it's success. -fn check_device_status_azure(port: u16) -> Result<DeviceStatus, ConnectError> { +fn check_device_status_azure(port: u16, host: String) -> Result<DeviceStatus, ConnectError> { const AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM: &str = r##"az/twin/res/#"##; const AZURE_TOPIC_DEVICE_TWIN_UPSTREAM: &str = r#"az/twin/GET/?$rid=1"#; const CLIENT_ID: &str = "check_connection_az"; const REGISTRATION_PAYLOAD: &[u8] = b""; const REGISTRATION_OK: &str = "200"; - let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port); + let mut options = MqttOptions::new(CLIENT_ID, host, port); options.set_keep_alive(RESPONSE_TIMEOUT); let (mut client, mut connection) = rumqttc::Client::new(options, 10); @@ -549,8 +557,8 @@ fn get_common_mosquitto_config_file_path( } // To confirm the connected c8y tenant is the one that user configured. -fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16) { - match get_connected_c8y_url(port) { +fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16, host: String) { + match get_connected_c8y_url(port, host) { Ok(url) if url == configured_url => {} Ok(url) => println!( "Warning: Connecting to {}, but the configured URL is {}.\n\ diff --git a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs index 7fd9a28b..68af7d00 100644 --- a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs +++ b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs @@ -118,9 +118,10 @@ impl CommonMosquittoConfig { Ok(()) } - pub fn with_internal_opts(self, port: u16) -> Self { + pub fn with_internal_opts(self, port: u16, bind_address: String) -> Self { let internal_listener = ListenerConfig { port: Some(port), + bind_address: Some(bind_address), ..self.internal_listener }; Self { @@ -198,10 +199,10 @@ fn test_serialize() -> anyhow::Result<()> { fn test_serialize_with_opts() -> anyhow::Result<()> { let common_mosquitto_config = CommonMosquittoConfig::default(); let mosquitto_config_with_opts = common_mosquitto_config - .with_internal_opts(1234) + .with_internal_opts(1234, "1.2.3.4".into()) .with_external_opts( Some(2345), - Some("0.0.0.0".into()), + Some("0.0.0.0".to_string()), Some("wlan0".into()), Some("/etc/ssl/certs".into()), Some("cert.pem".into()), @@ -227,7 +228,7 @@ fn test_serialize_with_opts() -> anyhow::Result<()> { "log_type subscribe\n", "log_type unsubscribe\n", "message_size_limit 268435455\n", - "listener 1234 localhost\n", + "listener 1234 1.2.3.4\n", "allow_anonymous true\n", "require_certificate false\n", "listener 2345 0.0.0.0\n", diff --git a/crates/core/tedge/src/cli/connect/jwt_token.rs b/crates/core/tedge/src/cli/connect/jwt_token.rs index b81e26ed..afc86434 100644 --- a/crates/core/tedge/src/cli/connect/jwt_token.rs +++ b/crates/core/tedge/src/cli/connect/jwt_token.rs @@ -1,13 +1,13 @@ -use crate::cli::connect::{ConnectError, DEFAULT_HOST, RESPONSE_TIMEOUT}; +use crate::cli::connect::{ConnectError, RESPONSE_TIMEOUT}; use rumqttc::QoS::AtLeastOnce; use rumqttc::{Event, Incoming, MqttOptions, Outgoing, Packet}; -pub(crate) fn get_connected_c8y_url(port: u16) -> Result<String, ConnectError> { +pub(crate) fn get_connected_c8y_url(port: u16, host: String) -> Result<String, ConnectError> { const C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM: &str = "c8y/s/uat"; const C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM: &str = "c8y/s/dat"; const CLIENT_ID: &str = "get_jwt_token_c8y"; - let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port); + let mut options = MqttOptions::new(CLIENT_ID, host, port); options.set_keep_alive(RESPONSE_TIMEOUT); let (mut client, mut connection) = rumqttc::Client::new(options, 10); diff --git a/crates/core/tedge/src/cli/mqtt/cli.rs b/crates/core/tedge/src/cli/mqtt/cli.rs index f6878a78..ce38abeb 100644 --- a/crates/core/tedge/src/cli/mqtt/cli.rs +++ b/crates/core/tedge/src/cli/mqtt/cli.rs @@ -4,7 +4,6 @@ use rumqttc::QoS; use std::time::Duration; use tedge_config::*; -const DEFAULT_HOST: &str = "localhost"; const PUB_CLIENT_PREFIX: &str = "tedge-pub"; const SUB_CLIENT_PREFIX: &str = "tedge-sub"; const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(2); @@ -41,7 +40,10 @@ pub enum TEdgeMqttCli { impl BuildCommand for TEdgeMqttCli { fn build_command(self, context: BuildContext) -> Result<Box<dyn Command>, crate::ConfigError> { let port = context.config_repository.load()?.query(MqttPortSetting)?; - + let host = context + .config_repository + .load()? + .query(MqttBindAddressSetting)?; let cmd = { match self { TEdgeMqttCli::Pub { @@ -50,7 +52,7 @@ impl BuildCommand for TEdgeMqttCli { qos, retain, } => MqttPublishCommand { - host: DEFAULT_HOST.to_string(), + host: host.to_string(), port: port.into(), topic, message, @@ -65,7 +67,7 @@ impl BuildCommand for TEdgeMqttCli { qos, hide_topic, } => MqttSubscribeCommand { - host: DEFAULT_HOST.to_string(), + host: host.to_string(), port: port.into(), topic, qos, diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index d1285c22..aeabfc6a 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -17,8 +17,9 @@ use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic use plugin_sm::plugin_manager::{ExternalPlugins, Plugins}; use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc}; use tedge_config::{ - ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting, - SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting, + ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, + MqttBindAddressSetting, MqttPortSetting, SoftwarePluginDefaultSetting, TEdgeConfigLocation, + TmpPathDefaultSetting, }; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument}; @@ -107,6 +108,7 @@ impl SmAgentConfig { let tedge_config = config_repository.load()?; let mqtt_config = mqtt_channel::Config::default() + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string |