diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-12 09:58:30 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-12 10:34:18 +0200 |
commit | fe104f9b77544f58136600b7598745b56fe25683 (patch) | |
tree | 87546251a8f6c711415bd3a139f712e04fb093b5 | |
parent | 8a89c5c9229dd5a0b7b37f5154e8da025a8be1ad (diff) |
Remove global mqtt dummy server
Before this patch there was one in-process mqtt server used for running
all tests. Tests had to be run in sequence because of this, adding up to
a huge test time.
This patch makes each test have its own broker, on an individual port,
to be able to run all tests in parallel.
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | crates/common/mqtt_channel/src/tests.rs | 41 | ||||
-rw-r--r-- | crates/core/tedge/tests/mqtt.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 30 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 6 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_server.rs | 9 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 3 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/upload.rs | 3 |
9 files changed, 36 insertions, 63 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index fb30eac7..fe873652 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -2,17 +2,16 @@ mod tests { use crate::*; use futures::{SinkExt, StreamExt}; - use serial_test::serial; + use mqtt_tests::test_mqtt_server::MqttProcessHandler; use std::convert::TryInto; use std::time::Duration; const TIMEOUT: Duration = Duration::from_millis(1000); #[tokio::test] - #[serial] async fn subscribing_to_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55551); let mqtt_config = Config::default().with_port(broker.port); // A client subscribes to a topic on connect @@ -66,10 +65,9 @@ mod tests { } #[tokio::test] - #[serial] async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55552); let mqtt_config = Config::default().with_port(broker.port); // A client can subscribe to many topics @@ -121,10 +119,9 @@ mod tests { } #[tokio::test] - #[serial] async fn publishing_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55553); let mqtt_config = Config::default().with_port(broker.port); let mut all_messages = broker.messages_published_on("#").await; @@ -156,10 +153,9 @@ mod tests { } #[tokio::test] - #[serial] async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55554); let mqtt_config = Config::default().with_port(broker.port); // and an MQTT connection with input and output topics @@ -203,10 +199,9 @@ mod tests { } #[tokio::test] - #[serial] async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55555); let mqtt_config = Config::default().with_port(broker.port); // A client that connects with a well-known session name, subscribing to some topic. @@ -246,7 +241,6 @@ mod tests { } #[tokio::test] - #[serial] async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { // Given an mqtt client async fn run(mut input: impl SubChannel, mut output: impl PubChannel) { @@ -281,7 +275,7 @@ mod tests { assert_eq!(expected, output.collect().await); // This very same client can be tested with an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55556); let mqtt_config = Config::default().with_port(broker.port); let mut out_messages = broker.messages_published_on("out/topic").await; @@ -307,10 +301,9 @@ mod tests { } #[tokio::test] - #[serial] async fn creating_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55557); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name @@ -354,9 +347,8 @@ mod tests { } #[tokio::test] - #[serial] async fn a_session_must_have_a_name() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55558); let mqtt_config = Config::default().with_port(broker.port); let result = init_session(&mqtt_config).await; @@ -365,9 +357,8 @@ mod tests { } #[tokio::test] - #[serial] async fn a_named_session_must_not_set_clean_session() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55559); let mqtt_config = Config::default() .with_port(broker.port) .with_session_name("useless name") @@ -379,10 +370,9 @@ mod tests { } #[tokio::test] - #[serial] async fn cleaning_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55560); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name @@ -427,9 +417,8 @@ mod tests { } #[tokio::test] - #[serial] async fn to_be_cleared_a_session_must_have_a_name() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55561); let mqtt_config = Config::default().with_port(broker.port); let result = clear_session(&mqtt_config).await; @@ -438,9 +427,8 @@ mod tests { } #[tokio::test] - #[serial] async fn subscription_failures() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55562); let mqtt_config = Config::default().with_port(broker.port); let topic = TopicFilter::new_unchecked("test/topic"); @@ -454,9 +442,8 @@ mod tests { } #[tokio::test] - #[serial] async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55563); let topic = "data/topic"; let mut messages = broker.messages_published_on(topic).await; diff --git a/crates/core/tedge/tests/mqtt.rs b/crates/core/tedge/tests/mqtt.rs index b0e438ad..457c3557 100644 --- a/crates/core/tedge/tests/mqtt.rs +++ b/crates/core/tedge/tests/mqtt.rs @@ -29,7 +29,7 @@ mod tests { #[test_case(None)] #[tokio::test] async fn test_cli_pub_basic(qos: Option<&str>) -> Result<(), anyhow::Error> { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55590); let tmpfile = make_config(broker.port)?; let mut messages = broker.messages_published_on("topic").await; diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 502bbc10..ba6508c8 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -128,7 +128,7 @@ mod tests { use c8y_api::http_proxy::MockC8yJwtTokenRetriever; use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; use mockito::mock; - use mqtt_tests::{assert_received_all_expected, test_mqtt_broker}; + use mqtt_tests::assert_received_all_expected; use serde_json::json; use std::time::Duration; use tedge_test_utils::fs::TempTedgeDir; @@ -202,7 +202,7 @@ mod tests { .unwrap(), ); - let broker = test_mqtt_broker(); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55580); let mut mapper = create_mapper( CUMULOCITY_MAPPER_NAME_TEST, diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 0413c33a..2b98c2bf 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -32,7 +32,7 @@ const MQTT_HOST: &str = "127.0.0.1"; #[serial] async fn mapper_publishes_a_software_list_request() { // The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`. - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55600); let mut messages = broker .messages_published_on("tedge/commands/req/software/list") @@ -51,7 +51,7 @@ async fn mapper_publishes_a_software_list_request() { #[serial] async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() { // The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists. - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55601); let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper @@ -68,7 +68,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 async fn mapper_publishes_software_update_request() { // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` // and converts it to thin-edge json message published on `tedge/commands/req/software/update`. - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55602); let mut messages = broker .messages_published_on("tedge/commands/req/software/update") .await; @@ -77,7 +77,7 @@ async fn mapper_publishes_software_update_request() { // Prepare and publish a software update smartrest request on `c8y/s/ds`. let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#; let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); - let _ = publish_a_fake_jwt_token(broker).await; + let _ = publish_a_fake_jwt_token(&broker).await; let expected_update_list = r#" "updateList": [ @@ -108,13 +108,13 @@ async fn mapper_publishes_software_update_request() { async fn mapper_publishes_software_update_status_onto_c8y_topic() { // The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update` // and publishes status of the operation `501` on `c8y/s/us` - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55603); let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); - let _ = publish_a_fake_jwt_token(broker).await; + let _ = publish_a_fake_jwt_token(&broker).await; // Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`. let json_response = r#"{ @@ -164,12 +164,12 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55604); let mut messages = broker.messages_published_on("c8y/s/us").await; // Start SM Mapper let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap(); - let _ = publish_a_fake_jwt_token(broker).await; + let _ = publish_a_fake_jwt_token(&broker).await; // The agent publish an error let json_response = r#" @@ -213,7 +213,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error> { // The test assures recovery and processing of messages by the SM-Mapper when it fails in the middle of the operation. - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55605); // When a software update request message is received on `c8y/s/ds` by the sm mapper, // converts it to thin-edge json message, publishes a request message on `tedge/commands/req/software/update`. @@ -236,7 +236,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result // Prepare and publish a software update smartrest request on `c8y/s/ds`. let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#; let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); - let _ = publish_a_fake_jwt_token(broker).await; + let _ = publish_a_fake_jwt_token(&broker).await; let expected_update_list = r#" "updateList": [ @@ -311,7 +311,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { // Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`. // Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them. - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55606); // Create a subscriber to receive messages on `c8y/s/us` topic. let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -334,7 +334,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_alarm_mapping_to_smartrest() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55607); let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -376,7 +376,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_child_alarm_mapping_to_smartrest() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55608); let mut messages = broker .messages_published_on("c8y/s/us/external_sensor") @@ -430,7 +430,7 @@ async fn c8y_mapper_child_alarm_mapping_to_smartrest() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_syncs_pending_alarms_on_startup() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55609); let mut messages = broker.messages_published_on("c8y/s/us").await; @@ -520,7 +520,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_syncs_pending_child_alarms_on_startup() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55610); let mut messages = broker .messages_published_on("c8y/s/us/external_sensor") diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index f103126a..049c1e4b 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -185,10 +185,9 @@ mod tests { use tokio::time::sleep; #[tokio::test] - #[serial_test::serial] async fn a_valid_input_leads_to_a_translated_output() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55800); // Given a mapper let name = "mapper_under_test"; @@ -231,10 +230,9 @@ mod tests { #[cfg(test)] use serde_json::json; #[tokio::test] - #[serial_test::serial] async fn health_check() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55801); // Given a mapper let name = "mapper_under_test"; diff --git a/crates/tests/mqtt_tests/src/lib.rs b/crates/tests/mqtt_tests/src/lib.rs index c51498a9..b0fe83ec 100644 --- a/crates/tests/mqtt_tests/src/lib.rs +++ b/crates/tests/mqtt_tests/src/lib.rs @@ -6,4 +6,3 @@ pub mod with_timeout; pub use futures::{SinkExt, StreamExt}; pub use message_streams::*; pub use test_mqtt_client::{assert_received, assert_received_all_expected, publish}; -pub use test_mqtt_server::test_mqtt_broker; diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs index 675e715a..81e30508 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs @@ -6,17 +6,8 @@ use std::{ use futures::channel::mpsc::UnboundedReceiver; use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings}; -use once_cell::sync::Lazy; use rumqttc::QoS; -const MQTT_TEST_PORT: u16 = 55555; - -static SERVER: Lazy<MqttProcessHandler> = Lazy::new(|| MqttProcessHandler::new(MQTT_TEST_PORT)); - -pub fn test_mqtt_broker() -> &'static MqttProcessHandler { - Lazy::force(&SERVER) -} - pub struct MqttProcessHandler { pub port: u16, } diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index 6f9ddf7b..c6b35bc4 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -319,12 +319,11 @@ mod tests { const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - #[serial_test::serial] async fn test_message_dispatch() -> anyhow::Result<()> { let test_config_path = "/some/test/config"; let test_config_type = "c8y-configuration-plugin"; - let broker = mqtt_tests::test_mqtt_broker(); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55570); let mut messages = broker.messages_published_on("c8y/s/us").await; diff --git a/plugins/c8y_configuration_plugin/src/upload.rs b/plugins/c8y_configuration_plugin/src/upload.rs index 957cbb9c..57de32e6 100644 --- a/plugins/c8y_configuration_plugin/src/upload.rs +++ b/plugins/c8y_configuration_plugin/src/upload.rs @@ -166,11 +166,10 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - #[serial_test::serial] async fn test_handle_config_upload_request() -> anyhow::Result<()> { let config_path = Path::new("/some/test/config"); - let broker = mqtt_tests::test_mqtt_broker(); + let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55700); let mqtt_config = mqtt_channel::Config::default() .with_port(broker.port) .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( |