diff options
author | Didier Wenzek <didier.wenzek@acidalie.com> | 2022-02-04 12:18:21 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-04 12:18:21 +0000 |
commit | 89f554fbb724cb4b9fe09336d58558cfa76638a1 (patch) | |
tree | 0180a5afb1ad1a2de2d99f24061764f62646dde1 /crates/common/mqtt_channel/src/tests.rs | |
parent | 9efe65893b5a32695156b9679fb43770b5e15c2a (diff) |
Add functions to init/clear mqtt session (#819)
* Move the `mqtt_options` builder into the `Config`
* Add `mqtt_channel::init_session()` function
* Subscribe to all the topics use `subscribe_many`
* Add `mqtt_channel::clear_session()` function
* Make clearer the intent of `delay` on connection errors
* Add explicit `mqtt_client.disconnect()` to drop the connection
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/common/mqtt_channel/src/tests.rs')
-rw-r--r-- | crates/common/mqtt_channel/src/tests.rs | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index 6657e73e..187a62c2 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -305,4 +305,128 @@ mod tests { Ok(()) } + + #[tokio::test] + #[serial] + async fn creating_a_session() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + // Given an MQTT config with a well-known session name + let session_name = "my-session-name"; + let topic = "my/topic"; + let mqtt_config = mqtt_config + .with_session_name(session_name) + .with_subscriptions(topic.try_into()?); + + // This config can be created to initialize an MQTT session + init_session(&mqtt_config).await?; + + // Any messages published on that topic + broker + .publish(topic, "1st msg sent before a first connection") + .await?; + broker + .publish(topic, "2nd msg sent before a first connection") + .await?; + broker + .publish(topic, "3rd msg sent before a first connection") + .await?; + + // Will be received by the client with the same session name even for its first connection + let mut con = Connection::new(&mqtt_config).await?; + + assert_eq!( + MaybeMessage::Next(message(topic, "1st msg sent before a first connection")), + next_message(&mut con.received).await + ); + assert_eq!( + MaybeMessage::Next(message(topic, "2nd msg sent before a first connection")), + next_message(&mut con.received).await + ); + assert_eq!( + MaybeMessage::Next(message(topic, "3rd msg sent before a first connection")), + next_message(&mut con.received).await + ); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn a_session_must_have_a_name() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let result = init_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn a_named_session_must_not_set_clean_session() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default() + .with_port(broker.port) + .with_session_name("useless name") + .with_clean_session(true); + + let result = init_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn cleaning_a_session() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + // Given an MQTT config with a well-known session name + let session_name = "a-session-name"; + let topic = "a/topic"; + let mqtt_config = mqtt_config + .with_session_name(session_name) + .with_subscriptions(topic.try_into()?); + + // The session being initialized + init_session(&mqtt_config).await?; + + // And some messages published + broker + .publish(topic, "A fst msg published before clean") + .await?; + broker + .publish(topic, "A 2nd msg published before clean") + .await?; + + // If we clean the session + clear_session(&mqtt_config).await?; + + // And publish more messages + broker + .publish(topic, "A 3nd msg published after clean") + .await?; + + // Then no messages will be received by the client with the same session name + let mut con = Connection::new(&mqtt_config).await?; + + assert_eq!(MaybeMessage::Timeout, next_message(&mut con.received).await); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn to_be_cleared_a_session_must_have_a_name() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let result = clear_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } } |