diff options
Diffstat (limited to 'crates/common/mqtt_channel/src/tests.rs')
-rw-r--r-- | crates/common/mqtt_channel/src/tests.rs | 41 |
1 files changed, 14 insertions, 27 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index fb30eac7..fe873652 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -2,17 +2,16 @@ mod tests { use crate::*; use futures::{SinkExt, StreamExt}; - use serial_test::serial; + use mqtt_tests::test_mqtt_server::MqttProcessHandler; use std::convert::TryInto; use std::time::Duration; const TIMEOUT: Duration = Duration::from_millis(1000); #[tokio::test] - #[serial] async fn subscribing_to_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55551); let mqtt_config = Config::default().with_port(broker.port); // A client subscribes to a topic on connect @@ -66,10 +65,9 @@ mod tests { } #[tokio::test] - #[serial] async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55552); let mqtt_config = Config::default().with_port(broker.port); // A client can subscribe to many topics @@ -121,10 +119,9 @@ mod tests { } #[tokio::test] - #[serial] async fn publishing_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55553); let mqtt_config = Config::default().with_port(broker.port); let mut all_messages = broker.messages_published_on("#").await; @@ -156,10 +153,9 @@ mod tests { } #[tokio::test] - #[serial] async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55554); let mqtt_config = Config::default().with_port(broker.port); // and an MQTT connection with input and output topics @@ -203,10 +199,9 @@ mod tests { } #[tokio::test] - #[serial] async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55555); let mqtt_config = Config::default().with_port(broker.port); // A client that connects with a well-known session name, subscribing to some topic. @@ -246,7 +241,6 @@ mod tests { } #[tokio::test] - #[serial] async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { // Given an mqtt client async fn run(mut input: impl SubChannel, mut output: impl PubChannel) { @@ -281,7 +275,7 @@ mod tests { assert_eq!(expected, output.collect().await); // This very same client can be tested with an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55556); let mqtt_config = Config::default().with_port(broker.port); let mut out_messages = broker.messages_published_on("out/topic").await; @@ -307,10 +301,9 @@ mod tests { } #[tokio::test] - #[serial] async fn creating_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55557); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name @@ -354,9 +347,8 @@ mod tests { } #[tokio::test] - #[serial] async fn a_session_must_have_a_name() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55558); let mqtt_config = Config::default().with_port(broker.port); let result = init_session(&mqtt_config).await; @@ -365,9 +357,8 @@ mod tests { } #[tokio::test] - #[serial] async fn a_named_session_must_not_set_clean_session() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55559); let mqtt_config = Config::default() .with_port(broker.port) .with_session_name("useless name") @@ -379,10 +370,9 @@ mod tests { } #[tokio::test] - #[serial] async fn cleaning_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55560); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name @@ -427,9 +417,8 @@ mod tests { } #[tokio::test] - #[serial] async fn to_be_cleared_a_session_must_have_a_name() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55561); let mqtt_config = Config::default().with_port(broker.port); let result = clear_session(&mqtt_config).await; @@ -438,9 +427,8 @@ mod tests { } #[tokio::test] - #[serial] async fn subscription_failures() { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55562); let mqtt_config = Config::default().with_port(broker.port); let topic = TopicFilter::new_unchecked("test/topic"); @@ -454,9 +442,8 @@ mod tests { } #[tokio::test] - #[serial] async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> { - let broker = mqtt_tests::test_mqtt_broker(); + let broker = MqttProcessHandler::new(55563); let topic = "data/topic"; let mut messages = broker.messages_published_on(topic).await; |