summaryrefslogtreecommitdiffstats
path: root/crates/common/mqtt_channel/src/tests.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/common/mqtt_channel/src/tests.rs')
-rw-r--r--crates/common/mqtt_channel/src/tests.rs203
1 files changed, 199 insertions, 4 deletions
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs
index 6657e73e..7a8201e5 100644
--- a/crates/common/mqtt_channel/src/tests.rs
+++ b/crates/common/mqtt_channel/src/tests.rs
@@ -16,7 +16,7 @@ mod tests {
let mqtt_config = Config::default().with_port(broker.port);
// A client subscribes to a topic on connect
- let topic = "test/topic";
+ let topic = "a/test/topic";
let mqtt_config = mqtt_config
.with_session_name("test_client")
.with_subscriptions(topic.try_into()?);
@@ -99,7 +99,7 @@ mod tests {
]
.into_iter()
{
- let () = broker.publish(topic, payload).await?;
+ broker.publish(topic, payload).await?;
assert_eq!(
MaybeMessage::Next(message(topic, payload)),
next_message(&mut messages).await
@@ -113,7 +113,7 @@ mod tests {
]
.into_iter()
{
- let () = broker.publish(topic, payload).await?;
+ broker.publish(topic, payload).await?;
assert_eq!(MaybeMessage::Timeout, next_message(&mut messages).await);
}
@@ -211,7 +211,7 @@ mod tests {
// A client that connects with a well-known session name, subscribing to some topic.
let session_name = "remember_me";
- let topic = "test/topic";
+ let topic = "my/nice/topic";
let mqtt_config = mqtt_config
.with_session_name(session_name)
.with_subscriptions(topic.try_into()?);
@@ -305,4 +305,199 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ #[serial]
+ async fn creating_a_session() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ // Given an MQTT config with a well-known session name
+ let session_name = "my-session-name";
+ let topic = "my/topic";
+ let mqtt_config = mqtt_config
+ .with_session_name(session_name)
+ .with_subscriptions(topic.try_into()?);
+
+ // This config can be created to initialize an MQTT session
+ init_session(&mqtt_config).await?;
+
+ // Any messages published on that topic
+ broker
+ .publish(topic, "1st msg sent before a first connection")
+ .await?;
+ broker
+ .publish(topic, "2nd msg sent before a first connection")
+ .await?;
+ broker
+ .publish(topic, "3rd msg sent before a first connection")
+ .await?;
+
+ // Will be received by the client with the same session name even for its first connection
+ let mut con = Connection::new(&mqtt_config).await?;
+
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "1st msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "2nd msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "3rd msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn a_session_must_have_a_name() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let result = init_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn a_named_session_must_not_set_clean_session() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default()
+ .with_port(broker.port)
+ .with_session_name("useless name")
+ .with_clean_session(true);
+
+ let result = init_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn cleaning_a_session() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ // Given an MQTT config with a well-known session name
+ let session_name = "a-session-name";
+ let topic = "a/topic";
+ let mqtt_config = mqtt_config
+ .with_session_name(session_name)
+ .with_subscriptions(topic.try_into()?);
+
+ // The session being initialized
+ init_session(&mqtt_config).await?;
+
+ // And some messages published
+ broker
+ .publish(topic, "A fst msg published before clean")
+ .await?;
+ broker
+ .publish(topic, "A 2nd msg published before clean")
+ .await?;
+
+ // Then we clean the session
+ {
+ // One just needs a config with the same session name.
+ // Subscriptions can be given - but this not required: any previous subscriptions will be cleared.
+ let mqtt_config = Config::default()
+ .with_port(broker.port)
+ .with_session_name(session_name);
+ clear_session(&mqtt_config).await?;
+ }
+
+ // And publish more messages
+ broker
+ .publish(topic, "A 3nd msg published after clean")
+ .await?;
+
+ // Then no messages will be received by the client with the same session name
+ let mut con = Connection::new(&mqtt_config).await?;
+
+ assert_eq!(MaybeMessage::Timeout, next_message(&mut con.received).await);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn to_be_cleared_a_session_must_have_a_name() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let result = clear_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn subscription_failures() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let topic = TopicFilter::new_unchecked("test/topic");
+ let mqtt_config = mqtt_config.with_subscriptions(topic);
+
+ // For some unknown reason, the test MQTT server rejects any subscription on `test/#` topics
+ assert!(matches!(
+ Connection::new(&mqtt_config).await,
+ Err(MqttError::SubscriptionFailure)
+ ));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let topic = "data/topic";
+ let mut messages = broker.messages_published_on(topic).await;
+
+ // An mqtt process publishing messages
+ // must ensure the messages have been sent before process exit.
+ std::thread::spawn(move || {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(async {
+ let mqtt_config = Config::default().with_port(broker.port);
+ let topic = Topic::new_unchecked(topic);
+ let mut con = Connection::new(&mqtt_config).await.expect("a connection");
+
+ con.published
+ .send(Message::new(&topic, "datum 1"))
+ .await
+ .expect("message sent");
+ con.published
+ .send(Message::new(&topic, "datum 2"))
+ .await
+ .expect("message sent");
+ con.published
+ .send(Message::new(&topic, "datum 3"))
+ .await
+ .expect("message sent");
+
+ // Wait for all the messages to be actually sent
+ // before the runtime is shutdown dropping the mqtt sender loop.
+ con.close().await;
+ });
+ });
+
+ mqtt_tests::assert_received(
+ &mut messages,
+ TIMEOUT,
+ vec!["datum 1", "datum 2", "datum 3"],
+ )
+ .await;
+
+ Ok(())
+ }
}