summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-08-12 11:14:28 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-08-12 11:14:28 +0200
commit6c6836a92969f72d1141df63789f0efd7c0cdf78 (patch)
tree04125b2537df65a441a5beae0a6e4332c3afd1f5
parentfe104f9b77544f58136600b7598745b56fe25683 (diff)
Make console port configurable in test broker instance
This patch makes the port for the broker console configurable in the test helper broker type and calls it with a different port number in each test. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--crates/common/mqtt_channel/src/tests.rs26
-rw-r--r--crates/core/tedge/tests/mqtt.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs22
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs4
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_server.rs12
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs2
-rw-r--r--plugins/c8y_configuration_plugin/src/upload.rs2
8 files changed, 36 insertions, 36 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs
index fe873652..e319dd1b 100644
--- a/crates/common/mqtt_channel/src/tests.rs
+++ b/crates/common/mqtt_channel/src/tests.rs
@@ -11,7 +11,7 @@ mod tests {
#[tokio::test]
async fn subscribing_to_messages() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55551);
+ let broker = MqttProcessHandler::new(55551, 3551);
let mqtt_config = Config::default().with_port(broker.port);
// A client subscribes to a topic on connect
@@ -67,7 +67,7 @@ mod tests {
#[tokio::test]
async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55552);
+ let broker = MqttProcessHandler::new(55552, 3552);
let mqtt_config = Config::default().with_port(broker.port);
// A client can subscribe to many topics
@@ -121,7 +121,7 @@ mod tests {
#[tokio::test]
async fn publishing_messages() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55553);
+ let broker = MqttProcessHandler::new(55553, 3553);
let mqtt_config = Config::default().with_port(broker.port);
let mut all_messages = broker.messages_published_on("#").await;
@@ -155,7 +155,7 @@ mod tests {
#[tokio::test]
async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55554);
+ let broker = MqttProcessHandler::new(55554, 3554);
let mqtt_config = Config::default().with_port(broker.port);
// and an MQTT connection with input and output topics
@@ -201,7 +201,7 @@ mod tests {
#[tokio::test]
async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55555);
+ let broker = MqttProcessHandler::new(55555, 3555);
let mqtt_config = Config::default().with_port(broker.port);
// A client that connects with a well-known session name, subscribing to some topic.
@@ -275,7 +275,7 @@ mod tests {
assert_eq!(expected, output.collect().await);
// This very same client can be tested with an MQTT broker
- let broker = MqttProcessHandler::new(55556);
+ let broker = MqttProcessHandler::new(55556, 3556);
let mqtt_config = Config::default().with_port(broker.port);
let mut out_messages = broker.messages_published_on("out/topic").await;
@@ -303,7 +303,7 @@ mod tests {
#[tokio::test]
async fn creating_a_session() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55557);
+ let broker = MqttProcessHandler::new(55557, 3557);
let mqtt_config = Config::default().with_port(broker.port);
// Given an MQTT config with a well-known session name
@@ -348,7 +348,7 @@ mod tests {
#[tokio::test]
async fn a_session_must_have_a_name() {
- let broker = MqttProcessHandler::new(55558);
+ let broker = MqttProcessHandler::new(55558, 3558);
let mqtt_config = Config::default().with_port(broker.port);
let result = init_session(&mqtt_config).await;
@@ -358,7 +358,7 @@ mod tests {
#[tokio::test]
async fn a_named_session_must_not_set_clean_session() {
- let broker = MqttProcessHandler::new(55559);
+ let broker = MqttProcessHandler::new(55559, 3559);
let mqtt_config = Config::default()
.with_port(broker.port)
.with_session_name("useless name")
@@ -372,7 +372,7 @@ mod tests {
#[tokio::test]
async fn cleaning_a_session() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = MqttProcessHandler::new(55560);
+ let broker = MqttProcessHandler::new(55560, 3560);
let mqtt_config = Config::default().with_port(broker.port);
// Given an MQTT config with a well-known session name
@@ -418,7 +418,7 @@ mod tests {
#[tokio::test]
async fn to_be_cleared_a_session_must_have_a_name() {
- let broker = MqttProcessHandler::new(55561);
+ let broker = MqttProcessHandler::new(55561, 3561);
let mqtt_config = Config::default().with_port(broker.port);
let result = clear_session(&mqtt_config).await;
@@ -428,7 +428,7 @@ mod tests {
#[tokio::test]
async fn subscription_failures() {
- let broker = MqttProcessHandler::new(55562);
+ let broker = MqttProcessHandler::new(55562, 3562);
let mqtt_config = Config::default().with_port(broker.port);
let topic = TopicFilter::new_unchecked("test/topic");
@@ -443,7 +443,7 @@ mod tests {
#[tokio::test]
async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> {
- let broker = MqttProcessHandler::new(55563);
+ let broker = MqttProcessHandler::new(55563, 3563);
let topic = "data/topic";
let mut messages = broker.messages_published_on(topic).await;
diff --git a/crates/core/tedge/tests/mqtt.rs b/crates/core/tedge/tests/mqtt.rs
index 457c3557..79ecaaec 100644
--- a/crates/core/tedge/tests/mqtt.rs
+++ b/crates/core/tedge/tests/mqtt.rs
@@ -29,7 +29,7 @@ mod tests {
#[test_case(None)]
#[tokio::test]
async fn test_cli_pub_basic(qos: Option<&str>) -> Result<(), anyhow::Error> {
- let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55590);
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55590, 3590);
let tmpfile = make_config(broker.port)?;
let mut messages = broker.messages_published_on("topic").await;
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index ba6508c8..2d5695dc 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -202,7 +202,7 @@ mod tests {
.unwrap(),
);
- let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55580);
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55580, 3580);
let mut mapper = create_mapper(
CUMULOCITY_MAPPER_NAME_TEST,
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 2b98c2bf..2f0b0db2 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -32,7 +32,7 @@ const MQTT_HOST: &str = "127.0.0.1";
#[serial]
async fn mapper_publishes_a_software_list_request() {
// The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`.
- let broker = MqttProcessHandler::new(55600);
+ let broker = MqttProcessHandler::new(55600, 3600);
let mut messages = broker
.messages_published_on("tedge/commands/req/software/list")
@@ -51,7 +51,7 @@ async fn mapper_publishes_a_software_list_request() {
#[serial]
async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() {
// The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists.
- let broker = MqttProcessHandler::new(55601);
+ let broker = MqttProcessHandler::new(55601, 3601);
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
@@ -68,7 +68,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
async fn mapper_publishes_software_update_request() {
// The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
// and converts it to thin-edge json message published on `tedge/commands/req/software/update`.
- let broker = MqttProcessHandler::new(55602);
+ let broker = MqttProcessHandler::new(55602, 3602);
let mut messages = broker
.messages_published_on("tedge/commands/req/software/update")
.await;
@@ -108,7 +108,7 @@ async fn mapper_publishes_software_update_request() {
async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update`
// and publishes status of the operation `501` on `c8y/s/us`
- let broker = MqttProcessHandler::new(55603);
+ let broker = MqttProcessHandler::new(55603, 3603);
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -164,7 +164,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
- let broker = MqttProcessHandler::new(55604);
+ let broker = MqttProcessHandler::new(55604, 3604);
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
@@ -213,7 +213,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error>
{
// The test assures recovery and processing of messages by the SM-Mapper when it fails in the middle of the operation.
- let broker = MqttProcessHandler::new(55605);
+ let broker = MqttProcessHandler::new(55605, 3605);
// When a software update request message is received on `c8y/s/ds` by the sm mapper,
// converts it to thin-edge json message, publishes a request message on `tedge/commands/req/software/update`.
@@ -311,7 +311,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
// Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`.
// Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them.
- let broker = MqttProcessHandler::new(55606);
+ let broker = MqttProcessHandler::new(55606, 3606);
// Create a subscriber to receive messages on `c8y/s/us` topic.
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -334,7 +334,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_alarm_mapping_to_smartrest() {
- let broker = MqttProcessHandler::new(55607);
+ let broker = MqttProcessHandler::new(55607, 3607);
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -376,7 +376,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_child_alarm_mapping_to_smartrest() {
- let broker = MqttProcessHandler::new(55608);
+ let broker = MqttProcessHandler::new(55608, 3608);
let mut messages = broker
.messages_published_on("c8y/s/us/external_sensor")
@@ -430,7 +430,7 @@ async fn c8y_mapper_child_alarm_mapping_to_smartrest() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_syncs_pending_alarms_on_startup() {
- let broker = MqttProcessHandler::new(55609);
+ let broker = MqttProcessHandler::new(55609, 3609);
let mut messages = broker.messages_published_on("c8y/s/us").await;
@@ -520,7 +520,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn c8y_mapper_syncs_pending_child_alarms_on_startup() {
- let broker = MqttProcessHandler::new(55610);
+ let broker = MqttProcessHandler::new(55610, 3610);
let mut messages = broker
.messages_published_on("c8y/s/us/external_sensor")
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index 049c1e4b..e9cf3a4e 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -187,7 +187,7 @@ mod tests {
#[tokio::test]
async fn a_valid_input_leads_to_a_translated_output() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55800);
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55800, 3800);
// Given a mapper
let name = "mapper_under_test";
@@ -232,7 +232,7 @@ mod tests {
#[tokio::test]
async fn health_check() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55801);
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55801, 3801);
// Given a mapper
let name = "mapper_under_test";
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
index 81e30508..7db0505d 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
@@ -13,8 +13,8 @@ pub struct MqttProcessHandler {
}
impl MqttProcessHandler {
- pub fn new(port: u16) -> MqttProcessHandler {
- spawn_broker(port);
+ pub fn new(port: u16, console_port: u16) -> MqttProcessHandler {
+ spawn_broker(port, console_port);
MqttProcessHandler { port }
}
@@ -61,8 +61,8 @@ impl MqttProcessHandler {
}
}
-fn spawn_broker(port: u16) {
- let config = get_rumqttd_config(port);
+fn spawn_broker(port: u16, console_port: u16) {
+ let config = get_rumqttd_config(port, console_port);
let mut broker = Broker::new(config);
let mut tx = broker.link("localclient").unwrap();
@@ -101,7 +101,7 @@ fn spawn_broker(port: u16) {
});
}
-fn get_rumqttd_config(port: u16) -> Config {
+fn get_rumqttd_config(port: u16, console_port: u16) -> Config {
let router_config = librumqttd::rumqttlog::Config {
id: 0,
dir: "/tmp/rumqttd".into(),
@@ -131,7 +131,7 @@ fn get_rumqttd_config(port: u16) -> Config {
servers.insert("1".to_string(), server_config);
let console_settings = ConsoleSettings {
- listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030)),
+ listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), console_port)),
};
librumqttd::Config {
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index c6b35bc4..60e76d38 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -323,7 +323,7 @@ mod tests {
let test_config_path = "/some/test/config";
let test_config_type = "c8y-configuration-plugin";
- let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55570);
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55570, 3570);
let mut messages = broker.messages_published_on("c8y/s/us").await;
diff --git a/plugins/c8y_configuration_plugin/src/upload.rs b/plugins/c8y_configuration_plugin/src/upload.rs
index 57de32e6..8f2424a3 100644
--- a/plugins/c8y_configuration_plugin/src/upload.rs
+++ b/plugins/c8y_configuration_plugin/src/upload.rs
@@ -169,7 +169,7 @@ mod tests {
async fn test_handle_config_upload_request() -> anyhow::Result<()> {
let config_path = Path::new("/some/test/config");
- let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55700);
+ let broker = mqtt_tests::test_mqtt_server::MqttProcessHandler::new(55700, 3700);
let mqtt_config = mqtt_channel::Config::default()
.with_port(broker.port)
.with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(