diff options
Diffstat (limited to 'crates/common/mqtt_channel/src/tests.rs')
-rw-r--r-- | crates/common/mqtt_channel/src/tests.rs | 203 |
1 files changed, 199 insertions, 4 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index 6657e73e..7a8201e5 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -16,7 +16,7 @@ mod tests { let mqtt_config = Config::default().with_port(broker.port); // A client subscribes to a topic on connect - let topic = "test/topic"; + let topic = "a/test/topic"; let mqtt_config = mqtt_config .with_session_name("test_client") .with_subscriptions(topic.try_into()?); @@ -99,7 +99,7 @@ mod tests { ] .into_iter() { - let () = broker.publish(topic, payload).await?; + broker.publish(topic, payload).await?; assert_eq!( MaybeMessage::Next(message(topic, payload)), next_message(&mut messages).await @@ -113,7 +113,7 @@ mod tests { ] .into_iter() { - let () = broker.publish(topic, payload).await?; + broker.publish(topic, payload).await?; assert_eq!(MaybeMessage::Timeout, next_message(&mut messages).await); } @@ -211,7 +211,7 @@ mod tests { // A client that connects with a well-known session name, subscribing to some topic. let session_name = "remember_me"; - let topic = "test/topic"; + let topic = "my/nice/topic"; let mqtt_config = mqtt_config .with_session_name(session_name) .with_subscriptions(topic.try_into()?); @@ -305,4 +305,199 @@ mod tests { Ok(()) } + + #[tokio::test] + #[serial] + async fn creating_a_session() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + // Given an MQTT config with a well-known session name + let session_name = "my-session-name"; + let topic = "my/topic"; + let mqtt_config = mqtt_config + .with_session_name(session_name) + .with_subscriptions(topic.try_into()?); + + // This config can be created to initialize an MQTT session + init_session(&mqtt_config).await?; + + // Any messages published on that topic + broker + .publish(topic, "1st msg sent before a first connection") + .await?; + broker + .publish(topic, "2nd msg sent before a first connection") + .await?; + broker + .publish(topic, "3rd msg sent before a first connection") + .await?; + + // Will be received by the client with the same session name even for its first connection + let mut con = Connection::new(&mqtt_config).await?; + + assert_eq!( + MaybeMessage::Next(message(topic, "1st msg sent before a first connection")), + next_message(&mut con.received).await + ); + assert_eq!( + MaybeMessage::Next(message(topic, "2nd msg sent before a first connection")), + next_message(&mut con.received).await + ); + assert_eq!( + MaybeMessage::Next(message(topic, "3rd msg sent before a first connection")), + next_message(&mut con.received).await + ); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn a_session_must_have_a_name() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let result = init_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn a_named_session_must_not_set_clean_session() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default() + .with_port(broker.port) + .with_session_name("useless name") + .with_clean_session(true); + + let result = init_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn cleaning_a_session() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + // Given an MQTT config with a well-known session name + let session_name = "a-session-name"; + let topic = "a/topic"; + let mqtt_config = mqtt_config + .with_session_name(session_name) + .with_subscriptions(topic.try_into()?); + + // The session being initialized + init_session(&mqtt_config).await?; + + // And some messages published + broker + .publish(topic, "A fst msg published before clean") + .await?; + broker + .publish(topic, "A 2nd msg published before clean") + .await?; + + // Then we clean the session + { + // One just needs a config with the same session name. + // Subscriptions can be given - but this not required: any previous subscriptions will be cleared. + let mqtt_config = Config::default() + .with_port(broker.port) + .with_session_name(session_name); + clear_session(&mqtt_config).await?; + } + + // And publish more messages + broker + .publish(topic, "A 3nd msg published after clean") + .await?; + + // Then no messages will be received by the client with the same session name + let mut con = Connection::new(&mqtt_config).await?; + + assert_eq!(MaybeMessage::Timeout, next_message(&mut con.received).await); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn to_be_cleared_a_session_must_have_a_name() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let result = clear_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn subscription_failures() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let topic = TopicFilter::new_unchecked("test/topic"); + let mqtt_config = mqtt_config.with_subscriptions(topic); + + // For some unknown reason, the test MQTT server rejects any subscription on `test/#` topics + assert!(matches!( + Connection::new(&mqtt_config).await, + Err(MqttError::SubscriptionFailure) + )); + } + + #[tokio::test] + #[serial] + async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> { + let broker = mqtt_tests::test_mqtt_broker(); + let topic = "data/topic"; + let mut messages = broker.messages_published_on(topic).await; + + // An mqtt process publishing messages + // must ensure the messages have been sent before process exit. + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mqtt_config = Config::default().with_port(broker.port); + let topic = Topic::new_unchecked(topic); + let mut con = Connection::new(&mqtt_config).await.expect("a connection"); + + con.published + .send(Message::new(&topic, "datum 1")) + .await + .expect("message sent"); + con.published + .send(Message::new(&topic, "datum 2")) + .await + .expect("message sent"); + con.published + .send(Message::new(&topic, "datum 3")) + .await + .expect("message sent"); + + // Wait for all the messages to be actually sent + // before the runtime is shutdown dropping the mqtt sender loop. + con.close().await; + }); + }); + + mqtt_tests::assert_received( + &mut messages, + TIMEOUT, + vec!["datum 1", "datum 2", "datum 3"], + ) + .await; + + Ok(()) + } } |