summaryrefslogtreecommitdiffstats
path: root/crates/common/mqtt_channel/src/tests.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/common/mqtt_channel/src/tests.rs')
-rw-r--r--crates/common/mqtt_channel/src/tests.rs41
1 files changed, 14 insertions, 27 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs
index fb30eac7..fe873652 100644
--- a/crates/common/mqtt_channel/src/tests.rs
+++ b/crates/common/mqtt_channel/src/tests.rs
@@ -2,17 +2,16 @@
mod tests {
use crate::*;
use futures::{SinkExt, StreamExt};
- use serial_test::serial;
+ use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use std::convert::TryInto;
use std::time::Duration;
const TIMEOUT: Duration = Duration::from_millis(1000);
#[tokio::test]
- #[serial]
async fn subscribing_to_messages() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55551);
let mqtt_config = Config::default().with_port(broker.port);
// A client subscribes to a topic on connect
@@ -66,10 +65,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55552);
let mqtt_config = Config::default().with_port(broker.port);
// A client can subscribe to many topics
@@ -121,10 +119,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn publishing_messages() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55553);
let mqtt_config = Config::default().with_port(broker.port);
let mut all_messages = broker.messages_published_on("#").await;
@@ -156,10 +153,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55554);
let mqtt_config = Config::default().with_port(broker.port);
// and an MQTT connection with input and output topics
@@ -203,10 +199,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55555);
let mqtt_config = Config::default().with_port(broker.port);
// A client that connects with a well-known session name, subscribing to some topic.
@@ -246,7 +241,6 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> {
// Given an mqtt client
async fn run(mut input: impl SubChannel, mut output: impl PubChannel) {
@@ -281,7 +275,7 @@ mod tests {
assert_eq!(expected, output.collect().await);
// This very same client can be tested with an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55556);
let mqtt_config = Config::default().with_port(broker.port);
let mut out_messages = broker.messages_published_on("out/topic").await;
@@ -307,10 +301,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn creating_a_session() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55557);
let mqtt_config = Config::default().with_port(broker.port);
// Given an MQTT config with a well-known session name
@@ -354,9 +347,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn a_session_must_have_a_name() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55558);
let mqtt_config = Config::default().with_port(broker.port);
let result = init_session(&mqtt_config).await;
@@ -365,9 +357,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn a_named_session_must_not_set_clean_session() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55559);
let mqtt_config = Config::default()
.with_port(broker.port)
.with_session_name("useless name")
@@ -379,10 +370,9 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn cleaning_a_session() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55560);
let mqtt_config = Config::default().with_port(broker.port);
// Given an MQTT config with a well-known session name
@@ -427,9 +417,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn to_be_cleared_a_session_must_have_a_name() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55561);
let mqtt_config = Config::default().with_port(broker.port);
let result = clear_session(&mqtt_config).await;
@@ -438,9 +427,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn subscription_failures() {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55562);
let mqtt_config = Config::default().with_port(broker.port);
let topic = TopicFilter::new_unchecked("test/topic");
@@ -454,9 +442,8 @@ mod tests {
}
#[tokio::test]
- #[serial]
async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> {
- let broker = mqtt_tests::test_mqtt_broker();
+ let broker = MqttProcessHandler::new(55563);
let topic = "data/topic";
let mut messages = broker.messages_published_on(topic).await;