summaryrefslogtreecommitdiffstats
path: root/crates/common/mqtt_channel/src
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@acidalie.com>2022-02-04 12:18:21 +0000
committerGitHub <noreply@github.com>2022-02-04 12:18:21 +0000
commit89f554fbb724cb4b9fe09336d58558cfa76638a1 (patch)
tree0180a5afb1ad1a2de2d99f24061764f62646dde1 /crates/common/mqtt_channel/src
parent9efe65893b5a32695156b9679fb43770b5e15c2a (diff)
Add functions to init/clear mqtt session (#819)
* Move the `mqtt_options` builder into the `Config` * Add `mqtt_channel::init_session()` function * Subscribe to all the topics use `subscribe_many` * Add `mqtt_channel::clear_session()` function * Make clearer the intent of `delay` on connection errors * Add explicit `mqtt_client.disconnect()` to drop the connection Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/common/mqtt_channel/src')
-rw-r--r--crates/common/mqtt_channel/src/config.rs16
-rw-r--r--crates/common/mqtt_channel/src/connection.rs36
-rw-r--r--crates/common/mqtt_channel/src/errors.rs3
-rw-r--r--crates/common/mqtt_channel/src/lib.rs2
-rw-r--r--crates/common/mqtt_channel/src/session.rs94
-rw-r--r--crates/common/mqtt_channel/src/tests.rs124
-rw-r--r--crates/common/mqtt_channel/src/topics.rs14
7 files changed, 260 insertions, 29 deletions
diff --git a/crates/common/mqtt_channel/src/config.rs b/crates/common/mqtt_channel/src/config.rs
index 17875753..3715d56d 100644
--- a/crates/common/mqtt_channel/src/config.rs
+++ b/crates/common/mqtt_channel/src/config.rs
@@ -118,4 +118,20 @@ impl Config {
..self
}
}
+
+ /// Wrap this config into an internal set of options for `rumqttc`.
+ pub(crate) fn mqtt_options(&self) -> rumqttc::MqttOptions {
+ let id = match &self.session_name {
+ None => std::iter::repeat_with(fastrand::lowercase)
+ .take(10)
+ .collect(),
+ Some(name) => name.clone(),
+ };
+
+ let mut mqtt_options = rumqttc::MqttOptions::new(id, &self.host, self.port);
+ mqtt_options.set_clean_session(self.clean_session);
+ mqtt_options.set_max_packet_size(self.max_packet_size, self.max_packet_size);
+
+ mqtt_options
+ }
}
diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs
index f6de0cd5..666021ef 100644
--- a/crates/common/mqtt_channel/src/connection.rs
+++ b/crates/common/mqtt_channel/src/connection.rs
@@ -90,42 +90,22 @@ impl Connection {
})
}
- fn mqtt_options(config: &Config) -> rumqttc::MqttOptions {
- let id = match &config.session_name {
- None => std::iter::repeat_with(fastrand::lowercase)
- .take(10)
- .collect(),
- Some(name) => name.clone(),
- };
-
- let mut mqtt_options = rumqttc::MqttOptions::new(id, &config.host, config.port);
- mqtt_options.set_clean_session(config.clean_session);
- mqtt_options.set_max_packet_size(config.max_packet_size, config.max_packet_size);
-
- mqtt_options
- }
-
async fn open(
config: &Config,
mut message_sender: mpsc::UnboundedSender<Message>,
mut error_sender: mpsc::UnboundedSender<MqttError>,
) -> Result<(AsyncClient, EventLoop), MqttError> {
- let mqtt_options = Connection::mqtt_options(config);
+ let mqtt_options = config.mqtt_options();
let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity);
- let topic = &config.subscriptions;
- let qos = topic.qos;
-
loop {
match event_loop.poll().await {
Ok(Event::Incoming(Packet::ConnAck(_))) => {
- if topic.patterns.is_empty() {
+ let subscriptions = config.subscriptions.filters();
+ if subscriptions.is_empty() {
break;
}
-
- for pattern in topic.patterns.iter() {
- let () = mqtt_client.subscribe(pattern, qos).await?;
- }
+ mqtt_client.subscribe_many(subscriptions).await?;
}
Ok(Event::Incoming(Packet::SubAck(_))) => {
@@ -139,12 +119,12 @@ impl Connection {
}
Err(err) => {
- let delay = Connection::pause_on_error(&err);
+ let should_delay = Connection::pause_on_error(&err);
// Errors on send are ignored: it just means the client has closed the receiving channel.
let _ = error_sender.send(err.into()).await;
- if delay {
+ if should_delay {
Connection::do_pause().await;
}
}
@@ -217,7 +197,7 @@ impl Connection {
let _ = mqtt_client.disconnect().await;
}
- fn pause_on_error(err: &ConnectionError) -> bool {
+ pub(crate) fn pause_on_error(err: &ConnectionError) -> bool {
match &err {
rumqttc::ConnectionError::Io(_) => true,
rumqttc::ConnectionError::MqttState(state_error)
@@ -231,7 +211,7 @@ impl Connection {
}
}
- async fn do_pause() {
+ pub(crate) async fn do_pause() {
sleep(Duration::from_secs(1)).await;
}
}
diff --git a/crates/common/mqtt_channel/src/errors.rs b/crates/common/mqtt_channel/src/errors.rs
index 6804dd94..b072136b 100644
--- a/crates/common/mqtt_channel/src/errors.rs
+++ b/crates/common/mqtt_channel/src/errors.rs
@@ -7,6 +7,9 @@ pub enum MqttError {
#[error("Invalid topic filter: {pattern:?}")]
InvalidFilter { pattern: String },
+ #[error("Invalid session: a session name must be provided")]
+ InvalidSessionConfig,
+
#[error("MQTT client error: {0}")]
ClientError(#[from] rumqttc::ClientError),
diff --git a/crates/common/mqtt_channel/src/lib.rs b/crates/common/mqtt_channel/src/lib.rs
index 7fbee70d..96a4fa56 100644
--- a/crates/common/mqtt_channel/src/lib.rs
+++ b/crates/common/mqtt_channel/src/lib.rs
@@ -34,6 +34,7 @@ mod config;
mod connection;
mod errors;
mod messages;
+mod session;
mod topics;
mod tests;
@@ -43,6 +44,7 @@ pub use config::*;
pub use connection::*;
pub use errors::*;
pub use messages::*;
+pub use session::*;
pub use topics::*;
pub use futures::{
diff --git a/crates/common/mqtt_channel/src/session.rs b/crates/common/mqtt_channel/src/session.rs
new file mode 100644
index 00000000..c41acb52
--- /dev/null
+++ b/crates/common/mqtt_channel/src/session.rs
@@ -0,0 +1,94 @@
+use crate::{Config, Connection, MqttError};
+use rumqttc::{AsyncClient, Event, Packet};
+
+/// Create a persistent session on the MQTT server `config.host`.
+///
+/// The session is named after the `config.session_name`
+/// subscribing to all the topics given by the `config.subscriptions`.
+///
+/// A new `Connection` created with a config with the same session name,
+/// will receive all the messages published meantime on the subscribed topics.
+///
+/// This function can be called multiple times with the same session name,
+/// since it consumes no messages.
+pub async fn init_session(config: &Config) -> Result<(), MqttError> {
+ if config.clean_session || config.session_name.is_none() {
+ return Err(MqttError::InvalidSessionConfig);
+ }
+
+ let mqtt_options = config.mqtt_options();
+ let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity);
+
+ loop {
+ match event_loop.poll().await {
+ Ok(Event::Incoming(Packet::ConnAck(_))) => {
+ let subscriptions = config.subscriptions.filters();
+ if subscriptions.is_empty() {
+ break;
+ }
+ mqtt_client.subscribe_many(subscriptions).await?;
+ }
+
+ Ok(Event::Incoming(Packet::SubAck(_))) => {
+ break;
+ }
+
+ Err(err) => {
+ if Connection::pause_on_error(&err) {
+ Connection::do_pause().await;
+ }
+ }
+ _ => (),
+ }
+ }
+
+ let _ = mqtt_client.disconnect().await;
+ Ok(())
+}
+
+/// Clear a persistent session on the MQTT server `config.host`.
+///
+/// The session named after the `config.session_name` is cleared
+/// unsubscribing to all the topics given by the `config.subscriptions`.
+///
+/// All the messages persisted for that session all cleared.
+/// and no more messages will be stored till the session is re-created.
+///
+/// A new `Connection` created with a config with the same session name,
+/// will receive no messages that have been published meantime.
+pub async fn clear_session(config: &Config) -> Result<(), MqttError> {
+ if config.session_name.is_none() {
+ return Err(MqttError::InvalidSessionConfig);
+ }
+ let mut mqtt_options = config.mqtt_options();
+ mqtt_options.set_clean_session(true);
+ let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity);
+
+ loop {
+ match event_loop.poll().await {
+ Ok(Event::Incoming(Packet::ConnAck(_))) => {
+ let subscriptions = config.subscriptions.filters();
+ if subscriptions.is_empty() {
+ break;
+ }
+ for s in subscriptions.iter() {
+ mqtt_client.unsubscribe(&s.path).await?;
+ }
+ }
+
+ Ok(Event::Incoming(Packet::UnsubAck(_))) => {
+ break;
+ }
+
+ Err(err) => {
+ if Connection::pause_on_error(&err) {
+ Connection::do_pause().await;
+ }
+ }
+ _ => (),
+ }
+ }
+
+ let _ = mqtt_client.disconnect().await;
+ Ok(())
+}
diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs
index 6657e73e..187a62c2 100644
--- a/crates/common/mqtt_channel/src/tests.rs
+++ b/crates/common/mqtt_channel/src/tests.rs
@@ -305,4 +305,128 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ #[serial]
+ async fn creating_a_session() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ // Given an MQTT config with a well-known session name
+ let session_name = "my-session-name";
+ let topic = "my/topic";
+ let mqtt_config = mqtt_config
+ .with_session_name(session_name)
+ .with_subscriptions(topic.try_into()?);
+
+ // This config can be created to initialize an MQTT session
+ init_session(&mqtt_config).await?;
+
+ // Any messages published on that topic
+ broker
+ .publish(topic, "1st msg sent before a first connection")
+ .await?;
+ broker
+ .publish(topic, "2nd msg sent before a first connection")
+ .await?;
+ broker
+ .publish(topic, "3rd msg sent before a first connection")
+ .await?;
+
+ // Will be received by the client with the same session name even for its first connection
+ let mut con = Connection::new(&mqtt_config).await?;
+
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "1st msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "2nd msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+ assert_eq!(
+ MaybeMessage::Next(message(topic, "3rd msg sent before a first connection")),
+ next_message(&mut con.received).await
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn a_session_must_have_a_name() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let result = init_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn a_named_session_must_not_set_clean_session() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default()
+ .with_port(broker.port)
+ .with_session_name("useless name")
+ .with_clean_session(true);
+
+ let result = init_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn cleaning_a_session() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ // Given an MQTT config with a well-known session name
+ let session_name = "a-session-name";
+ let topic = "a/topic";
+ let mqtt_config = mqtt_config
+ .with_session_name(session_name)
+ .with_subscriptions(topic.try_into()?);
+
+ // The session being initialized
+ init_session(&mqtt_config).await?;
+
+ // And some messages published
+ broker
+ .publish(topic, "A fst msg published before clean")
+ .await?;
+ broker
+ .publish(topic, "A 2nd msg published before clean")
+ .await?;
+
+ // If we clean the session
+ clear_session(&mqtt_config).await?;
+
+ // And publish more messages
+ broker
+ .publish(topic, "A 3nd msg published after clean")
+ .await?;
+
+ // Then no messages will be received by the client with the same session name
+ let mut con = Connection::new(&mqtt_config).await?;
+
+ assert_eq!(MaybeMessage::Timeout, next_message(&mut con.received).await);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[serial]
+ async fn to_be_cleared_a_session_must_have_a_name() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = Config::default().with_port(broker.port);
+
+ let result = clear_session(&mqtt_config).await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("Invalid session"));
+ }
}
diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs
index 20d7a124..ea2a4d0b 100644
--- a/crates/common/mqtt_channel/src/topics.rs
+++ b/crates/common/mqtt_channel/src/topics.rs
@@ -1,6 +1,6 @@
use crate::errors::MqttError;
use crate::Message;
-use rumqttc::QoS;
+use rumqttc::{QoS, SubscribeFilter};
use std::convert::TryInto;
/// An MQTT topic
@@ -108,6 +108,18 @@ impl TopicFilter {
pub fn with_qos(self, qos: QoS) -> Self {
Self { qos, ..self }
}
+
+ /// The list of `SubscribeFilter` expected by `mqttc`
+ pub(crate) fn filters(&self) -> Vec<SubscribeFilter> {
+ let qos = self.qos;
+ self.patterns
+ .iter()
+ .map(|path| SubscribeFilter {
+ path: path.clone(),
+ qos,
+ })
+ .collect()
+ }
}
impl TryInto<Topic> for &str {