From 6a1f0d72c384b4e647711f483eba992b8095b6f4 Mon Sep 17 00:00:00 2001 From: PradeepKiruvale Date: Tue, 1 Mar 2022 09:41:29 +0530 Subject: [823] Configurable mqtt bind address (#929) * [823] add mqtt.bind_address option * ipaddress instead of string --- crates/core/c8y_api/src/http_proxy.rs | 5 ++- crates/core/tedge/src/cli/config/config_key.rs | 1 + crates/core/tedge/src/cli/connect/command.rs | 26 +++++++++----- .../src/cli/connect/common_mosquitto_config.rs | 9 ++--- crates/core/tedge/src/cli/connect/jwt_token.rs | 6 ++-- crates/core/tedge/src/cli/mqtt/cli.rs | 10 +++--- crates/core/tedge_agent/src/agent.rs | 6 ++-- crates/core/tedge_mapper/src/az/mapper.rs | 5 +-- crates/core/tedge_mapper/src/c8y/mapper.rs | 40 +++++++++++++--------- crates/core/tedge_mapper/src/c8y/tests.rs | 9 ++++- crates/core/tedge_mapper/src/collectd/mapper.rs | 7 ++-- crates/core/tedge_mapper/src/collectd/monitor.rs | 10 ++++-- crates/core/tedge_mapper/src/core/mapper.rs | 4 +++ 13 files changed, 91 insertions(+), 47 deletions(-) (limited to 'crates/core') diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 4c6dff98..d8047e53 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -11,7 +11,7 @@ use reqwest::Url; use std::time::Duration; use tedge_config::{ C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, - MqttPortSetting, TEdgeConfig, + MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, }; use time::{format_description, OffsetDateTime}; @@ -210,11 +210,14 @@ impl JwtAuthHttpProxy { let http_con = reqwest::ClientBuilder::new().build()?; let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let topic = TopicFilter::new("c8y/s/dat")?; let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) .with_clean_session(true) + .with_host(mqtt_host) .with_subscriptions(topic); + let mut mqtt_con = Connection::new(&mqtt_config).await?; // Ignore errors on this connection diff --git a/crates/core/tedge/src/cli/config/config_key.rs b/crates/core/tedge/src/cli/config/config_key.rs index 1b0fcef3..e6b9f5e6 100644 --- a/crates/core/tedge/src/cli/config/config_key.rs +++ b/crates/core/tedge/src/cli/config/config_key.rs @@ -48,6 +48,7 @@ impl ConfigKey { config_key!(AzureUrlSetting), config_key!(AzureRootCertPathSetting), config_key!(AzureMapperTimestamp), + config_key!(MqttBindAddressSetting), config_key!(MqttPortSetting), config_key!(MqttExternalPortSetting), config_key!(MqttExternalBindAddressSetting), diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 253fd813..30682ddd 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -11,7 +11,6 @@ use tedge_users::UserManager; use tedge_utils::paths::{create_directories, ok_if_not_found, DraftFile}; use which::which; -pub(crate) const DEFAULT_HOST: &str = "localhost"; const WAIT_FOR_CHECK_SECONDS: u64 = 2; const C8Y_CONFIG_FILENAME: &str = "c8y-bridge.conf"; const AZURE_CONFIG_FILENAME: &str = "az-bridge.conf"; @@ -104,10 +103,16 @@ impl Command for ConnectCommand { let updated_mosquitto_config = self .common_mosquitto_config .clone() - .with_internal_opts(config.query(MqttPortSetting)?.into()) + .with_internal_opts( + config.query(MqttPortSetting)?.into(), + config.query(MqttBindAddressSetting)?.to_string(), + ) .with_external_opts( config.query(MqttExternalPortSetting).ok().map(|x| x.into()), - config.query(MqttExternalBindAddressSetting).ok(), + config + .query(MqttExternalBindAddressSetting) + .ok() + .map(|x| x.to_string()), config.query(MqttExternalBindInterfaceSetting).ok(), config .query(MqttExternalCAPathSetting) @@ -164,6 +169,7 @@ impl Command for ConnectCommand { check_connected_c8y_tenant_as_configured( &config.query_string(C8yUrlSetting)?, config.query(MqttPortSetting)?.into(), + config.query(MqttBindAddressSetting)?.to_string(), ); enable_software_management(&bridge_config, self.service_manager.as_ref()); } @@ -206,13 +212,14 @@ impl ConnectCommand { fn check_connection(&self, config: &TEdgeConfig) -> Result { let port = config.query(MqttPortSetting)?.into(); + let host = config.query(MqttBindAddressSetting)?.to_string(); println!( "Sending packets to check connection. This may take up to {} seconds.\n", WAIT_FOR_CHECK_SECONDS ); match self.cloud { - Cloud::Azure => check_device_status_azure(port), + Cloud::Azure => check_device_status_azure(port, host), Cloud::C8y => check_device_status_c8y(config), } } @@ -250,9 +257,10 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result Result Result { +fn check_device_status_azure(port: u16, host: String) -> Result { const AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM: &str = r##"az/twin/res/#"##; const AZURE_TOPIC_DEVICE_TWIN_UPSTREAM: &str = r#"az/twin/GET/?$rid=1"#; const CLIENT_ID: &str = "check_connection_az"; const REGISTRATION_PAYLOAD: &[u8] = b""; const REGISTRATION_OK: &str = "200"; - let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port); + let mut options = MqttOptions::new(CLIENT_ID, host, port); options.set_keep_alive(RESPONSE_TIMEOUT); let (mut client, mut connection) = rumqttc::Client::new(options, 10); @@ -549,8 +557,8 @@ fn get_common_mosquitto_config_file_path( } // To confirm the connected c8y tenant is the one that user configured. -fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16) { - match get_connected_c8y_url(port) { +fn check_connected_c8y_tenant_as_configured(configured_url: &str, port: u16, host: String) { + match get_connected_c8y_url(port, host) { Ok(url) if url == configured_url => {} Ok(url) => println!( "Warning: Connecting to {}, but the configured URL is {}.\n\ diff --git a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs index 7fd9a28b..68af7d00 100644 --- a/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs +++ b/crates/core/tedge/src/cli/connect/common_mosquitto_config.rs @@ -118,9 +118,10 @@ impl CommonMosquittoConfig { Ok(()) } - pub fn with_internal_opts(self, port: u16) -> Self { + pub fn with_internal_opts(self, port: u16, bind_address: String) -> Self { let internal_listener = ListenerConfig { port: Some(port), + bind_address: Some(bind_address), ..self.internal_listener }; Self { @@ -198,10 +199,10 @@ fn test_serialize() -> anyhow::Result<()> { fn test_serialize_with_opts() -> anyhow::Result<()> { let common_mosquitto_config = CommonMosquittoConfig::default(); let mosquitto_config_with_opts = common_mosquitto_config - .with_internal_opts(1234) + .with_internal_opts(1234, "1.2.3.4".into()) .with_external_opts( Some(2345), - Some("0.0.0.0".into()), + Some("0.0.0.0".to_string()), Some("wlan0".into()), Some("/etc/ssl/certs".into()), Some("cert.pem".into()), @@ -227,7 +228,7 @@ fn test_serialize_with_opts() -> anyhow::Result<()> { "log_type subscribe\n", "log_type unsubscribe\n", "message_size_limit 268435455\n", - "listener 1234 localhost\n", + "listener 1234 1.2.3.4\n", "allow_anonymous true\n", "require_certificate false\n", "listener 2345 0.0.0.0\n", diff --git a/crates/core/tedge/src/cli/connect/jwt_token.rs b/crates/core/tedge/src/cli/connect/jwt_token.rs index b81e26ed..afc86434 100644 --- a/crates/core/tedge/src/cli/connect/jwt_token.rs +++ b/crates/core/tedge/src/cli/connect/jwt_token.rs @@ -1,13 +1,13 @@ -use crate::cli::connect::{ConnectError, DEFAULT_HOST, RESPONSE_TIMEOUT}; +use crate::cli::connect::{ConnectError, RESPONSE_TIMEOUT}; use rumqttc::QoS::AtLeastOnce; use rumqttc::{Event, Incoming, MqttOptions, Outgoing, Packet}; -pub(crate) fn get_connected_c8y_url(port: u16) -> Result { +pub(crate) fn get_connected_c8y_url(port: u16, host: String) -> Result { const C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM: &str = "c8y/s/uat"; const C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM: &str = "c8y/s/dat"; const CLIENT_ID: &str = "get_jwt_token_c8y"; - let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port); + let mut options = MqttOptions::new(CLIENT_ID, host, port); options.set_keep_alive(RESPONSE_TIMEOUT); let (mut client, mut connection) = rumqttc::Client::new(options, 10); diff --git a/crates/core/tedge/src/cli/mqtt/cli.rs b/crates/core/tedge/src/cli/mqtt/cli.rs index f6878a78..ce38abeb 100644 --- a/crates/core/tedge/src/cli/mqtt/cli.rs +++ b/crates/core/tedge/src/cli/mqtt/cli.rs @@ -4,7 +4,6 @@ use rumqttc::QoS; use std::time::Duration; use tedge_config::*; -const DEFAULT_HOST: &str = "localhost"; const PUB_CLIENT_PREFIX: &str = "tedge-pub"; const SUB_CLIENT_PREFIX: &str = "tedge-sub"; const DISCONNECT_TIMEOUT: Duration = Duration::from_secs(2); @@ -41,7 +40,10 @@ pub enum TEdgeMqttCli { impl BuildCommand for TEdgeMqttCli { fn build_command(self, context: BuildContext) -> Result, crate::ConfigError> { let port = context.config_repository.load()?.query(MqttPortSetting)?; - + let host = context + .config_repository + .load()? + .query(MqttBindAddressSetting)?; let cmd = { match self { TEdgeMqttCli::Pub { @@ -50,7 +52,7 @@ impl BuildCommand for TEdgeMqttCli { qos, retain, } => MqttPublishCommand { - host: DEFAULT_HOST.to_string(), + host: host.to_string(), port: port.into(), topic, message, @@ -65,7 +67,7 @@ impl BuildCommand for TEdgeMqttCli { qos, hide_topic, } => MqttSubscribeCommand { - host: DEFAULT_HOST.to_string(), + host: host.to_string(), port: port.into(), topic, qos, diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index d1285c22..aeabfc6a 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -17,8 +17,9 @@ use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic use plugin_sm::plugin_manager::{ExternalPlugins, Plugins}; use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc}; use tedge_config::{ - ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting, - SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting, + ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, + MqttBindAddressSetting, MqttPortSetting, SoftwarePluginDefaultSetting, TEdgeConfigLocation, + TmpPathDefaultSetting, }; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument}; @@ -107,6 +108,7 @@ impl SmAgentConfig { let tedge_config = config_repository.load()?; let mqtt_config = mqtt_channel::Config::default() + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) .with_port(tedge_config.query(MqttPortSetting)?.into()) .with_max_packet_size(10 * 1024 * 1024); diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index e9c0b7ee..542e6c1c 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -5,7 +5,7 @@ use crate::{ use async_trait::async_trait; use clock::WallClock; -use tedge_config::{AzureMapperTimestamp, TEdgeConfig}; +use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig}; use tedge_config::{ConfigSettingAccessor, MqttPortSetting}; use tracing::{info_span, Instrument}; @@ -24,12 +24,13 @@ impl TEdgeComponent for AzureMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let clock = Box::new(WallClock); let size_threshold = SizeThreshold(255 * 1024); let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold)); - let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_port, converter).await?; + let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; mapper .run() diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index a5df4d0b..def4808c 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -7,9 +7,10 @@ use agent_interface::topic::ResponseTopic; use async_trait::async_trait; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::operations::Operations; -use mqtt_channel::{Config, TopicFilter}; +use mqtt_channel::TopicFilter; use tedge_config::{ - ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig, + ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, + MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, }; use tracing::{info, info_span, Instrument}; @@ -39,27 +40,32 @@ impl CumulocityMapper { } pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { - info!("Initialize tedge sm mapper session"); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = Self::subscriptions(&operations)?; - let config = Config::default() - .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(false) - .with_subscriptions(mqtt_topic); - mqtt_channel::init_session(&config).await?; + info!("Initialize tedge mapper session"); + mqtt_channel::init_session(&self.get_mqtt_config()?).await?; Ok(()) } pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { - info!("Clear tedge sm mapper session"); + info!("Clear tedge mapper session"); + mqtt_channel::clear_session(&self.get_mqtt_config()?).await?; + Ok(()) + } + + fn get_mqtt_config(&mut self) -> Result { let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; let mqtt_topic = Self::subscriptions(&operations)?; - let config = Config::default() + let config_repository = + tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default()); + let tedge_config = config_repository.load()?; + + let mqtt_config = mqtt_channel::Config::default() + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) + .with_port(tedge_config.query(MqttPortSetting)?.into()) .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(true) + .with_clean_session(false) .with_subscriptions(mqtt_topic); - mqtt_channel::clear_session(&config).await?; - Ok(()) + + Ok(mqtt_config) } } @@ -74,6 +80,7 @@ impl TEdgeComponent for CumulocityMapper { let device_name = tedge_config.query(DeviceIdSetting)?; let device_type = tedge_config.query(DeviceTypeSetting)?; let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); let converter = Box::new(CumulocityConverter::new( size_threshold, @@ -83,7 +90,8 @@ impl TEdgeComponent for CumulocityMapper { http_proxy, )); - let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?; + let mut mapper = + create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; mapper .run() diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 7cbed97a..04cf07cc 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -24,6 +24,7 @@ use tokio::task::JoinHandle; use super::converter::{get_child_id_from_topic, CumulocityConverter}; const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); +const MQTT_HOST: &str = "127.0.0.1"; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] @@ -796,7 +797,13 @@ impl C8YHttpProxy for FakeC8YHttpProxy { async fn start_c8y_mapper(mqtt_port: u16) -> Result, anyhow::Error> { let converter = create_c8y_converter(); - let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?; + let mut mapper = create_mapper( + "c8y-mapper-test", + MQTT_HOST.to_string(), + mqtt_port, + Box::new(converter), + ) + .await?; let mapper_task = tokio::spawn(async move { let _ = mapper.run().await; diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs index aac2b087..abae07af 100644 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ b/crates/core/tedge_mapper/src/collectd/mapper.rs @@ -3,7 +3,7 @@ use crate::{ core::component::TEdgeComponent, }; use async_trait::async_trait; -use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; +use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig}; use tracing::{info_span, Instrument}; const APP_NAME: &str = "tedge-mapper-collectd"; @@ -20,8 +20,11 @@ impl CollectdMapper { impl TEdgeComponent for CollectdMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); - let device_monitor_config = DeviceMonitorConfig::default().with_port(mqtt_port); + let device_monitor_config = DeviceMonitorConfig::default() + .with_port(mqtt_port) + .with_host(mqtt_host); let device_monitor = DeviceMonitor::new(device_monitor_config); device_monitor diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index b16f2452..c804deb5 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -15,7 +15,7 @@ const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; #[derive(Debug)] pub struct DeviceMonitorConfig { - host: &'static str, + host: String, port: u16, mqtt_client_id: &'static str, mqtt_source_topic: &'static str, @@ -28,7 +28,7 @@ pub struct DeviceMonitorConfig { impl Default for DeviceMonitorConfig { fn default() -> Self { Self { - host: DEFAULT_HOST, + host: DEFAULT_HOST.to_string(), port: DEFAULT_PORT, mqtt_client_id: DEFAULT_MQTT_CLIENT_ID, mqtt_source_topic: DEFAULT_MQTT_SOURCE_TOPIC, @@ -44,6 +44,10 @@ impl DeviceMonitorConfig { pub fn with_port(self, port: u16) -> Self { Self { port, ..self } } + + pub fn with_host(self, host: String) -> Self { + Self { host, ..self } + } } #[derive(Debug)] @@ -63,7 +67,7 @@ impl DeviceMonitor { let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? .with_qos(QoS::AtMostOnce); let mqtt_config = mqtt_channel::Config::new( - self.device_monitor_config.host, + self.device_monitor_config.host.to_string(), self.device_monitor_config.port, ) .with_session_name(self.device_monitor_config.mqtt_client_id) diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index 7f66ed62..f0691397 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -12,6 +12,7 @@ const SYNC_WINDOW: Duration = Duration::from_secs(3); pub async fn create_mapper( app_name: &str, + mqtt_host: String, mqtt_port: u16, converter: Box>, ) -> Result { @@ -20,6 +21,7 @@ pub async fn create_mapper( let mapper_config = converter.get_mapper_config(); let mqtt_client = Connection::new(&mqtt_config( app_name, + &mqtt_host, mqtt_port, mapper_config.in_topic_filter.clone(), )?) @@ -36,10 +38,12 @@ pub async fn create_mapper( pub fn mqtt_config( name: &str, + host: &str, port: u16, topics: TopicFilter, ) -> Result { Ok(mqtt_channel::Config::default() + .with_host(host) .with_port(port) .with_session_name(name) .with_subscriptions(topics) -- cgit v1.2.3