summaryrefslogtreecommitdiffstats
path: root/crates/common/mqtt_channel/src/tests.rs
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@acidalie.com>2022-02-04 12:18:21 +0000
committerGitHub <noreply@github.com>2022-02-04 12:18:21 +0000
commit89f554fbb724cb4b9fe09336d58558cfa76638a1 (patch)
tree0180a5afb1ad1a2de2d99f24061764f62646dde1 /crates/common/mqtt_channel/src/tests.rs
parent9efe65893b5a32695156b9679fb43770b5e15c2a (diff)
Add functions to init/clear mqtt session (#819)
* Move the `mqtt_options` builder into the `Config` * Add `mqtt_channel::init_session()` function * Subscribe to all the topics use `subscribe_many` * Add `mqtt_channel::clear_session()` function * Make clearer the intent of `delay` on connection errors * Add explicit `mqtt_client.disconnect()` to drop the connection Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/common/mqtt_channel/src/tests.rs')
-rw-r--r--crates/common/mqtt_channel/src/tests.rs124
1 files changed, 124 insertions, 0 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs
index 6657e73e..187a62c2 100644
--- a/crates/common/mqtt_channel/src/tests.rs
+++ b/crates/common/mqtt_channel/src/tests.rs
@@ -305,4 +305,128 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ #[serial]
+ async fn creating_a_session() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ // Given an MQTT config with a well-known session name
+ let session_name = "my-session-name";
+ let topic = "my/topic";
+ let mqtt_config = mqtt_config
+ .with_session_name(session_name)
+ .with_subscriptions(topic.try_into()?);
+
+ // This config can be created to initialize an MQTT session
+ init_session(&mqtt_config).await?;
+
+ // Any messages published on that topic
+ broker
+ .publish(topic, "1st msg sent before a first connection")
+ .await?;
+ broker
+ .publish(topic, "2nd msg sent before a first connection")
+ .await?;
+ broker
+ .publish(topic, "3rd msg sent before a first connection")
+ .await?;
+
+ // Will be received by the client with the same session name even for its first connection
+ let mut con = Connection::new(&mqtt_config).await?;
+
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "1st msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "2nd msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "3rd msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn a_session_must_have_a_name() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let result = init_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn a_named_session_must_not_set_clean_session() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default()
+ .with_port(broker.port)
+ .with_session_name("useless name")
+ .with_clean_session(true);
+
+ let result = init_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn cleaning_a_session() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ // Given an MQTT config with a well-known session name
+ let session_name = "a-session-name";
+ let topic = "a/topic";
+ let mqtt_config = mqtt_config
+ .with_session_name(session_name)
+ .with_subscriptions(topic.try_into()?);
+
+ // The session being initialized
+ init_session(&mqtt_config).await?;
+
+ // And some messages published
+ broker
+ .publish(topic, "A fst msg published before clean")
+ .await?;
+ broker
+ .publish(topic, "A 2nd msg published before clean")
+ .await?;
+
+ // If we clean the session
+ clear_session(&mqtt_config).await?;
+
+ // And publish more messages
+ broker
+ .publish(topic, "A 3nd msg published after clean")
+ .await?;
+
+ // Then no messages will be received by the client with the same session name
+ let mut con = Connection::new(&mqtt_config).await?;
+
+ assert_eq!(MaybeMessage::Timeout, next_message(&mut con.received).await);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn to_be_cleared_a_session_must_have_a_name() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let result = clear_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
}