diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-12 11:14:28 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-12 11:14:28 +0200 |
commit | 6c6836a92969f72d1141df63789f0efd7c0cdf78 (patch) | |
tree | 04125b2537df65a441a5beae0a6e4332c3afd1f5 | |
parent | fe104f9b77544f58136600b7598745b56fe25683 (diff) |
Make console port configurable in test broker instance
This patch makes the port for the broker console configurable in the
test helper broker type and calls it with a different port number in
each test.
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | crates/common/mqtt_channel/src/tests.rs | 26 | ||||
-rw-r--r-- | crates/core/tedge/tests/mqtt.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 22 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 4 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_server.rs | 12 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 2 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/upload.rs | 2 |
8 files changed, 36 insertions, 36 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index fe873652..e319dd1b 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -11,7 +11,7 @@ mod tests { #[tokio::test] async fn subscribing_to_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55551); + let broker = MqttProcessHandler::new(55551, 3551); let mqtt_config = Config::default().with_port(broker.port); // A client subscribes to a topic on connect @@ -67,7 +67,7 @@ mod tests { #[tokio::test] async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55552); + let broker = MqttProcessHandler::new(55552, 3552); let mqtt_config = Config::default().with_port(broker.port); // A client can subscribe to many topics @@ -121,7 +121,7 @@ mod tests { #[tokio::test] async fn publishing_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55553); + let broker = MqttProcessHandler::new(55553, 3553); let mqtt_config = Config::default().with_port(broker.port); let mut all_messages = broker.messages_published_on("#").await; @@ -155,7 +155,7 @@ mod tests { #[tokio::test] async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55554); + let broker = MqttProcessHandler::new(55554, 3554); let mqtt_config = Config::default().with_port(broker.port); // and an MQTT connection with input and output topics @@ -201,7 +201,7 @@ mod tests { #[tokio::test] async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55555); + let broker = MqttProcessHandler::new(55555, 3555); let mqtt_config = Config::default().with_port(broker.port); // A client that connects with a well-known session name, subscribing to some topic. @@ -275,7 +275,7 @@ mod tests { assert_eq!(expected, output.collect().await); // This very same client can be tested with an MQTT broker - let broker = MqttProcessHandler::new(55556); + let broker = MqttProcessHandler::new(55556, 3556); let mqtt_config = Config::default().with_port(broker.port); let mut out_messages = broker.messages_published_on("out/topic").await; @@ -303,7 +303,7 @@ mod tests { #[tokio::test] async fn creating_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55557); + let broker = MqttProcessHandler::new(55557, 3557); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name @@ -348,7 +348,7 @@ mod tests { #[tokio::test] async fn a_session_must_have_a_name() { - let broker = MqttProcessHandler::new(55558); + let broker = MqttProcessHandler::new(55558, 3558); let mqtt_config = Config::default().with_port(broker.port); let result = init_session(&mqtt_config).await; @@ -358,7 +358,7 @@ mod tests { #[tokio::test] async fn a_named_session_must_not_set_clean_session() { - let broker = MqttProcessHandler::new(55559); + let broker = MqttProcessHandler::new(55559, 3559); let mqtt_config = Config::default() .with_port(broker.port) .with_session_name("useless name") @@ -372,7 +372,7 @@ mod tests { #[tokio::test] async fn cleaning_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = MqttProcessHandler::new(55560); + let broker = MqttProcessHandler::new(55560, 3560); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name @@ -418,7 +418,7 @@ mod tests { #[tokio::test] async fn to_be_cleared_a_session_must_have_a_name() { - let broker = MqttProcessHandler::new(55561); + let broker = MqttProcessHandler::new(55561, 3561); let mqtt_config = Config::default().with_port(broker.port); let result = clear_session(&mqtt_config).await; @@ -428,7 +428,7 @@ mod tests { #[tokio::test] async fn subscription_failures() { - let broker = MqttProcessHandler::new(55562); + let broker = MqttProcessHandler::new(55562, 3562); let mqtt_config = Config::default().with_port(broker.port); let topic = TopicFilter::new_unchecked("test/topic"); @@ -443,7 +443,7 @@ mod tests { #[tokio::test] async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> { - let broker = MqttProcessHandler::new(55563); + let broker = MqttProcessHandler::new(55563, 3563); let topic = "data/topic"; let mut messages = broker.messages_published_on(topic).await; diff --git a/crates/core/tedge/tests/mqtt.rs b/crates/core/tedge/tests/mqtt.rs index 457c3557..79ecaaec 100644 --- a/crates/core/tedge/tests/mqtt.rs +++ b/crates/core/tedge/tests/mqtt.rs @@ -29,7 +29,7 @@ mod tests { #[test_case(None)] #[tokio::test] async fn test_cli_pub_basic(qos: Option<&str>) -> Result<(), anyhow::Error> { - let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55590); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55590, 3590); let tmpfile = make_config(broker.port)?; let mut messages = broker.messages_published_on("topic").await; diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index ba6508c8..2d5695dc 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -202,7 +202,7 @@ mod tests { .unwrap(), ); - let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55580); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55580, 3580); let mut mapper = create_mapper( CUMULOCITY_MAPPER_NAME_TEST, diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 2b98c2bf..2f0b0db2 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -32,7 +32,7 @@ const MQTT_HOST: &str = "127.0.0.1"; #[serial] async fn mapper_publishes_a_software_list_request() { // The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`. - let broker = MqttProcessHandler::new(55600); + let broker = MqttProcessHandler::new(55600, 3600); let mut messages = broker .messages_published_on("tedge/commands/req/software/list") @@ -51,7 +51,7 @@ async fn mapper_publishes_a_software_list_request() { #[serial] async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() { // The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists. - let broker = MqttProcessHandler::new(55601); + let broker = MqttProcessHandler::new(55601, 3601); let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper @@ -68,7 +68,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 async fn mapper_publishes_software_update_request() { // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` // and converts it to thin-edge json message published on `tedge/commands/req/software/update`. - let broker = MqttProcessHandler::new(55602); + let broker = MqttProcessHandler::new(55602, 3602); let mut messages = broker .messages_published_on("tedge/commands/req/software/update") .await; @@ -108,7 +108,7 @@ async fn mapper_publishes_software_update_request() { async fn mapper_publishes_software_update_status_onto_c8y_topic() { // The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update` // and publishes status of the operation `501` on `c8y/s/us` - let broker = MqttProcessHandler::new(55603); + let broker = MqttProcessHandler::new(55603, 3603); let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -164,7 +164,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { - let broker = MqttProcessHandler::new(55604); + let broker = MqttProcessHandler::new(55604, 3604); let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper @@ -213,7 +213,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error> { // The test assures recovery and processing of messages by the SM-Mapper when it fails in the middle of the operation. - let broker = MqttProcessHandler::new(55605); + let broker = MqttProcessHandler::new(55605, 3605); // When a software update request message is received on `c8y/s/ds` by the sm mapper, // converts it to thin-edge json message, publishes a request message on `tedge/commands/req/software/update`. @@ -311,7 +311,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { // Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`. // Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them. - let broker = MqttProcessHandler::new(55606); + let broker = MqttProcessHandler::new(55606, 3606); // Create a subscriber to receive messages on `c8y/s/us` topic. let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -334,7 +334,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_alarm_mapping_to_smartrest() { - let broker = MqttProcessHandler::new(55607); + let broker = MqttProcessHandler::new(55607, 3607); let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -376,7 +376,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_child_alarm_mapping_to_smartrest() { - let broker = MqttProcessHandler::new(55608); + let broker = MqttProcessHandler::new(55608, 3608); let mut messages = broker .messages_published_on("c8y/s/us/external_sensor") @@ -430,7 +430,7 @@ async fn c8y_mapper_child_alarm_mapping_to_smartrest() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_syncs_pending_alarms_on_startup() { - let broker = MqttProcessHandler::new(55609); + let broker = MqttProcessHandler::new(55609, 3609); let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -520,7 +520,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_syncs_pending_child_alarms_on_startup() { - let broker = MqttProcessHandler::new(55610); + let broker = MqttProcessHandler::new(55610, 3610); let mut messages = broker .messages_published_on("c8y/s/us/external_sensor") diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index 049c1e4b..e9cf3a4e 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -187,7 +187,7 @@ mod tests { #[tokio::test] async fn a_valid_input_leads_to_a_translated_output() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55800); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55800, 3800); // Given a mapper let name = "mapper_under_test"; @@ -232,7 +232,7 @@ mod tests { #[tokio::test] async fn health_check() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55801); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55801, 3801); // Given a mapper let name = "mapper_under_test"; diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs index 81e30508..7db0505d 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs @@ -13,8 +13,8 @@ pub struct MqttProcessHandler { } impl MqttProcessHandler { - pub fn new(port: u16) -> MqttProcessHandler { - spawn_broker(port); + pub fn new(port: u16, console_port: u16) -> MqttProcessHandler { + spawn_broker(port, console_port); MqttProcessHandler { port } } @@ -61,8 +61,8 @@ impl MqttProcessHandler { } } -fn spawn_broker(port: u16) { - let config = get_rumqttd_config(port); +fn spawn_broker(port: u16, console_port: u16) { + let config = get_rumqttd_config(port, console_port); let mut broker = Broker::new(config); let mut tx = broker.link("localclient").unwrap(); @@ -101,7 +101,7 @@ fn spawn_broker(port: u16) { }); } -fn get_rumqttd_config(port: u16) -> Config { +fn get_rumqttd_config(port: u16, console_port: u16) -> Config { let router_config = librumqttd::rumqttlog::Config { id: 0, dir: "/tmp/rumqttd".into(), @@ -131,7 +131,7 @@ fn get_rumqttd_config(port: u16) -> Config { servers.insert("1".to_string(), server_config); let console_settings = ConsoleSettings { - listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030)), + listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), console_port)), }; librumqttd::Config { diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index c6b35bc4..60e76d38 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -323,7 +323,7 @@ mod tests { let test_config_path = "/some/test/config"; let test_config_type = "c8y-configuration-plugin"; - let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55570); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55570, 3570); let mut messages = broker.messages_published_on("c8y/s/us").await; diff --git a/plugins/c8y_configuration_plugin/src/upload.rs b/plugins/c8y_configuration_plugin/src/upload.rs index 57de32e6..8f2424a3 100644 --- a/plugins/c8y_configuration_plugin/src/upload.rs +++ b/plugins/c8y_configuration_plugin/src/upload.rs @@ -169,7 +169,7 @@ mod tests { async fn test_handle_config_upload_request() -> anyhow::Result<()> { let config_path = Path::new("/some/test/config"); - let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55700); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55700, 3700); let mqtt_config = mqtt_channel::Config::default() .with_port(broker.port) .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( |