summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-08-12 09:58:30 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-08-12 10:34:18 +0200
commitfe104f9b77544f58136600b7598745b56fe25683 (patch)
tree87546251a8f6c711415bd3a139f712e04fb093b5
parent8a89c5c9229dd5a0b7b37f5154e8da025a8be1ad (diff)
Remove global mqtt dummy server
Before this patch there was one in-process mqtt server used for running all tests. Tests had to be run in sequence because of this, adding up to a huge test time. This patch makes each test have its own broker, on an individual port, to be able to run all tests in parallel. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--crates/common/mqtt_channel/src/tests.rs41
-rw-r--r--crates/core/tedge/tests/mqtt.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs4
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs30
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs6
-rw-r--r--crates/tests/mqtt_tests/src/lib.rs1
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_server.rs9
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs3
-rw-r--r--plugins/c8y_configuration_plugin/src/upload.rs3
9 files changed, 36 insertions, 63 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs
index fb30eac7..fe873652 100644
--- a/crates/common/mqtt_channel/src/tests.rs
+++ b/crates/common/mqtt_channel/src/tests.rs
@@ -2,17 +2,16 @@
mod tests {
use crate::*;
use futures::{SinkExt, StreamExt};
- use serial_test::serial;
+ use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use std::convert::TryInto;
use std::time::Duration;
const TIMEOUT: Duration = Duration::from_millis(1000);
#[tokio::test]
- #[serial]
async fn subscribing_to_messages() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55551);
let mqtt_config = Config::default().with_port(broker.port);
// A client subscribes to a topic on connect
@@ -66,10 +65,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55552);
let mqtt_config = Config::default().with_port(broker.port);
// A client can subscribe to many topics
@@ -121,10 +119,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn publishing_messages() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55553);
let mqtt_config = Config::default().with_port(broker.port);
let mut all_messages = broker.messages_published_on("#").await;
@@ -156,10 +153,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55554);
let mqtt_config = Config::default().with_port(broker.port);
// and an MQTT connection with input and output topics
@@ -203,10 +199,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55555);
let mqtt_config = Config::default().with_port(broker.port);
// A client that connects with a well-known session name, subscribing to some topic.
@@ -246,7 +241,6 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> {
// Given an mqtt client
async fn run(mut input: impl SubChannel, mut output: impl PubChannel) {
@@ -281,7 +275,7 @@ mod tests {
assert_eq!(expected, output.collect().await);
// This very same client can be tested with an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55556);
let mqtt_config = Config::default().with_port(broker.port);
let mut out_messages = broker.messages_published_on("out/topic").await;
@@ -307,10 +301,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn creating_a_session() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55557);
let mqtt_config = Config::default().with_port(broker.port);
// Given an MQTT config with a well-known session name
@@ -354,9 +347,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn a_session_must_have_a_name() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55558);
let mqtt_config = Config::default().with_port(broker.port);
let result = init_session(&mqtt_config).await;
@@ -365,9 +357,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn a_named_session_must_not_set_clean_session() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55559);
let mqtt_config = Config::default()
.with_port(broker.port)
.with_session_name("useless name")
@@ -379,10 +370,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn cleaning_a_session() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55560);
let mqtt_config = Config::default().with_port(broker.port);
// Given an MQTT config with a well-known session name
@@ -427,9 +417,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn to_be_cleared_a_session_must_have_a_name() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55561);
let mqtt_config = Config::default().with_port(broker.port);
let result = clear_session(&mqtt_config).await;
@@ -438,9 +427,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn subscription_failures() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55562);
let mqtt_config = Config::default().with_port(broker.port);
let topic = TopicFilter::new_unchecked("test/topic");
@@ -454,9 +442,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55563);
let topic = "data/topic";
let mut messages = broker.messages_published_on(topic).await;
diff --git a/crates/core/tedge/tests/mqtt.rs b/crates/core/tedge/tests/mqtt.rs
index b0e438ad..457c3557 100644
--- a/crates/core/tedge/tests/mqtt.rs
+++ b/crates/core/tedge/tests/mqtt.rs
@@ -29,7 +29,7 @@ mod tests {
#[test_case(None)]
#[tokio::test]
async fn test_cli_pub_basic(qos: Option<&str>) -> Result<(), anyhow::Error> {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55590);
let tmpfile = make_config(broker.port)?;
let mut messages = broker.messages_published_on("topic").await;
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 502bbc10..ba6508c8 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -128,7 +128,7 @@ mod tests {
use c8y_api::http_proxy::MockC8yJwtTokenRetriever;
use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
use mockito::mock;
- use mqtt_tests::{assert_received_all_expected, test_mqtt_broker};
+ use mqtt_tests::assert_received_all_expected;
use serde_json::json;
use std::time::Duration;
use tedge_test_utils::fs::TempTedgeDir;
@@ -202,7 +202,7 @@ mod tests {
.unwrap(),
);
- let broker = test_mqtt_broker();
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55580);
let mut mapper = create_mapper(
CUMULOCITY_MAPPER_NAME_TEST,
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 0413c33a..2b98c2bf 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -32,7 +32,7 @@ const MQTT_HOST: &str = "127.0.0.1";
#[serial]
async fn mapper_publishes_a_software_list_request() {
// The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`.
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55600);
let mut messages = broker
.messages_published_on("tedge/commands/req/software/list")
@@ -51,7 +51,7 @@ async fn mapper_publishes_a_software_list_request() {
#[serial]
async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() {
// The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists.
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55601);
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
@@ -68,7 +68,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
async fn mapper_publishes_software_update_request() {
// The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
// and converts it to thin-edge json message published on `tedge/commands/req/software/update`.
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55602);
let mut messages = broker
.messages_published_on("tedge/commands/req/software/update")
.await;
@@ -77,7 +77,7 @@ async fn mapper_publishes_software_update_request() {
// Prepare and publish a software update smartrest request on `c8y/s/ds`.
let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
- let _ = publish_a_fake_jwt_token(broker).await;
+ let _ = publish_a_fake_jwt_token(&broker).await;
let expected_update_list = r#"
"updateList": [
@@ -108,13 +108,13 @@ async fn mapper_publishes_software_update_request() {
async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update`
// and publishes status of the operation `501` on `c8y/s/us`
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55603);
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
- let _ = publish_a_fake_jwt_token(broker).await;
+ let _ = publish_a_fake_jwt_token(&broker).await;
// Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`.
let json_response = r#"{
@@ -164,12 +164,12 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55604);
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
- let _ = publish_a_fake_jwt_token(broker).await;
+ let _ = publish_a_fake_jwt_token(&broker).await;
// The agent publish an error
let json_response = r#"
@@ -213,7 +213,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error>
{
// The test assures recovery and processing of messages by the SM-Mapper when it fails in the middle of the operation.
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55605);
// When a software update request message is received on `c8y/s/ds` by the sm mapper,
// converts it to thin-edge json message, publishes a request message on `tedge/commands/req/software/update`.
@@ -236,7 +236,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
// Prepare and publish a software update smartrest request on `c8y/s/ds`.
let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
- let _ = publish_a_fake_jwt_token(broker).await;
+ let _ = publish_a_fake_jwt_token(&broker).await;
let expected_update_list = r#"
"updateList": [
@@ -311,7 +311,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
// Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`.
// Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them.
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55606);
// Create a subscriber to receive messages on `c8y/s/us` topic.
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -334,7 +334,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_alarm_mapping_to_smartrest() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55607);
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -376,7 +376,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_child_alarm_mapping_to_smartrest() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55608);
let mut messages = broker
.messages_published_on("c8y/s/us/external_sensor")
@@ -430,7 +430,7 @@ async fn c8y_mapper_child_alarm_mapping_to_smartrest() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_syncs_pending_alarms_on_startup() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55609);
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -520,7 +520,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_syncs_pending_child_alarms_on_startup() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55610);
let mut messages = broker
.messages_published_on("c8y/s/us/external_sensor")
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index f103126a..049c1e4b 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -185,10 +185,9 @@ mod tests {
use tokio::time::sleep;
#[tokio::test]
- #[serial_test::serial]
async fn a_valid_input_leads_to_a_translated_output() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55800);
// Given a mapper
let name = "mapper_under_test";
@@ -231,10 +230,9 @@ mod tests {
#[cfg(test)]
use serde_json::json;
#[tokio::test]
- #[serial_test::serial]
async fn health_check() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55801);
// Given a mapper
let name = "mapper_under_test";
diff --git a/crates/tests/mqtt_tests/src/lib.rs b/crates/tests/mqtt_tests/src/lib.rs
index c51498a9..b0fe83ec 100644
--- a/crates/tests/mqtt_tests/src/lib.rs
+++ b/crates/tests/mqtt_tests/src/lib.rs
@@ -6,4 +6,3 @@ pub mod with_timeout;
pub use futures::{SinkExt, StreamExt};
pub use message_streams::*;
pub use test_mqtt_client::{assert_received, assert_received_all_expected, publish};
-pub use test_mqtt_server::test_mqtt_broker;
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
index 675e715a..81e30508 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
@@ -6,17 +6,8 @@ use std::{
use futures::channel::mpsc::UnboundedReceiver;
use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings};
-use once_cell::sync::Lazy;
use rumqttc::QoS;
-const MQTT_TEST_PORT: u16 = 55555;
-
-static SERVER: Lazy<MqttProcessHandler> = Lazy::new(|| MqttProcessHandler::new(MQTT_TEST_PORT));
-
-pub fn test_mqtt_broker() -> &'static MqttProcessHandler {
- Lazy::force(&SERVER)
-}
-
pub struct MqttProcessHandler {
pub port: u16,
}
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index 6f9ddf7b..c6b35bc4 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -319,12 +319,11 @@ mod tests {
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
- #[serial_test::serial]
async fn test_message_dispatch() -> anyhow::Result<()> {
let test_config_path = "/some/test/config";
let test_config_type = "c8y-configuration-plugin";
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55570);
let mut messages = broker.messages_published_on("c8y/s/us").await;
diff --git a/plugins/c8y_configuration_plugin/src/upload.rs b/plugins/c8y_configuration_plugin/src/upload.rs
index 957cbb9c..57de32e6 100644
--- a/plugins/c8y_configuration_plugin/src/upload.rs
+++ b/plugins/c8y_configuration_plugin/src/upload.rs
@@ -166,11 +166,10 @@ mod tests {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
- #[serial_test::serial]
async fn test_handle_config_upload_request() -> anyhow::Result<()> {
let config_path = Path::new("/some/test/config");
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55700);
let mqtt_config = mqtt_channel::Config::default()
.with_port(broker.port)
.with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(