diff options
author | Didier Wenzek <didier.wenzek@acidalie.com> | 2022-02-04 12:18:21 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-04 12:18:21 +0000 |
commit | 89f554fbb724cb4b9fe09336d58558cfa76638a1 (patch) | |
tree | 0180a5afb1ad1a2de2d99f24061764f62646dde1 /crates/common/mqtt_channel/src | |
parent | 9efe65893b5a32695156b9679fb43770b5e15c2a (diff) |
Add functions to init/clear mqtt session (#819)
* Move the `mqtt_options` builder into the `Config`
* Add `mqtt_channel::init_session()` function
* Subscribe to all the topics use `subscribe_many`
* Add `mqtt_channel::clear_session()` function
* Make clearer the intent of `delay` on connection errors
* Add explicit `mqtt_client.disconnect()` to drop the connection
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/common/mqtt_channel/src')
-rw-r--r-- | crates/common/mqtt_channel/src/config.rs | 16 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/connection.rs | 36 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/errors.rs | 3 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/lib.rs | 2 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/session.rs | 94 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/tests.rs | 124 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/topics.rs | 14 |
7 files changed, 260 insertions, 29 deletions
diff --git a/crates/common/mqtt_channel/src/config.rs b/crates/common/mqtt_channel/src/config.rs index 17875753..3715d56d 100644 --- a/crates/common/mqtt_channel/src/config.rs +++ b/crates/common/mqtt_channel/src/config.rs @@ -118,4 +118,20 @@ impl Config { ..self } } + + /// Wrap this config into an internal set of options for `rumqttc`. + pub(crate) fn mqtt_options(&self) -> rumqttc::MqttOptions { + let id = match &self.session_name { + None => std::iter::repeat_with(fastrand::lowercase) + .take(10) + .collect(), + Some(name) => name.clone(), + }; + + let mut mqtt_options = rumqttc::MqttOptions::new(id, &self.host, self.port); + mqtt_options.set_clean_session(self.clean_session); + mqtt_options.set_max_packet_size(self.max_packet_size, self.max_packet_size); + + mqtt_options + } } diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index f6de0cd5..666021ef 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -90,42 +90,22 @@ impl Connection { }) } - fn mqtt_options(config: &Config) -> rumqttc::MqttOptions { - let id = match &config.session_name { - None => std::iter::repeat_with(fastrand::lowercase) - .take(10) - .collect(), - Some(name) => name.clone(), - }; - - let mut mqtt_options = rumqttc::MqttOptions::new(id, &config.host, config.port); - mqtt_options.set_clean_session(config.clean_session); - mqtt_options.set_max_packet_size(config.max_packet_size, config.max_packet_size); - - mqtt_options - } - async fn open( config: &Config, mut message_sender: mpsc::UnboundedSender<Message>, mut error_sender: mpsc::UnboundedSender<MqttError>, ) -> Result<(AsyncClient, EventLoop), MqttError> { - let mqtt_options = Connection::mqtt_options(config); + let mqtt_options = config.mqtt_options(); let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity); - let topic = &config.subscriptions; - let qos = topic.qos; - loop { match event_loop.poll().await { Ok(Event::Incoming(Packet::ConnAck(_))) => { - if topic.patterns.is_empty() { + let subscriptions = config.subscriptions.filters(); + if subscriptions.is_empty() { break; } - - for pattern in topic.patterns.iter() { - let () = mqtt_client.subscribe(pattern, qos).await?; - } + mqtt_client.subscribe_many(subscriptions).await?; } Ok(Event::Incoming(Packet::SubAck(_))) => { @@ -139,12 +119,12 @@ impl Connection { } Err(err) => { - let delay = Connection::pause_on_error(&err); + let should_delay = Connection::pause_on_error(&err); // Errors on send are ignored: it just means the client has closed the receiving channel. let _ = error_sender.send(err.into()).await; - if delay { + if should_delay { Connection::do_pause().await; } } @@ -217,7 +197,7 @@ impl Connection { let _ = mqtt_client.disconnect().await; } - fn pause_on_error(err: &ConnectionError) -> bool { + pub(crate) fn pause_on_error(err: &ConnectionError) -> bool { match &err { rumqttc::ConnectionError::Io(_) => true, rumqttc::ConnectionError::MqttState(state_error) @@ -231,7 +211,7 @@ impl Connection { } } - async fn do_pause() { + pub(crate) async fn do_pause() { sleep(Duration::from_secs(1)).await; } } diff --git a/crates/common/mqtt_channel/src/errors.rs b/crates/common/mqtt_channel/src/errors.rs index 6804dd94..b072136b 100644 --- a/crates/common/mqtt_channel/src/errors.rs +++ b/crates/common/mqtt_channel/src/errors.rs @@ -7,6 +7,9 @@ pub enum MqttError { #[error("Invalid topic filter: {pattern:?}")] InvalidFilter { pattern: String }, + #[error("Invalid session: a session name must be provided")] + InvalidSessionConfig, + #[error("MQTT client error: {0}")] ClientError(#[from] rumqttc::ClientError), diff --git a/crates/common/mqtt_channel/src/lib.rs b/crates/common/mqtt_channel/src/lib.rs index 7fbee70d..96a4fa56 100644 --- a/crates/common/mqtt_channel/src/lib.rs +++ b/crates/common/mqtt_channel/src/lib.rs @@ -34,6 +34,7 @@ mod config; mod connection; mod errors; mod messages; +mod session; mod topics; mod tests; @@ -43,6 +44,7 @@ pub use config::*; pub use connection::*; pub use errors::*; pub use messages::*; +pub use session::*; pub use topics::*; pub use futures::{ diff --git a/crates/common/mqtt_channel/src/session.rs b/crates/common/mqtt_channel/src/session.rs new file mode 100644 index 00000000..c41acb52 --- /dev/null +++ b/crates/common/mqtt_channel/src/session.rs @@ -0,0 +1,94 @@ +use crate::{Config, Connection, MqttError}; +use rumqttc::{AsyncClient, Event, Packet}; + +/// Create a persistent session on the MQTT server `config.host`. +/// +/// The session is named after the `config.session_name` +/// subscribing to all the topics given by the `config.subscriptions`. +/// +/// A new `Connection` created with a config with the same session name, +/// will receive all the messages published meantime on the subscribed topics. +/// +/// This function can be called multiple times with the same session name, +/// since it consumes no messages. +pub async fn init_session(config: &Config) -> Result<(), MqttError> { + if config.clean_session || config.session_name.is_none() { + return Err(MqttError::InvalidSessionConfig); + } + + let mqtt_options = config.mqtt_options(); + let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity); + + loop { + match event_loop.poll().await { + Ok(Event::Incoming(Packet::ConnAck(_))) => { + let subscriptions = config.subscriptions.filters(); + if subscriptions.is_empty() { + break; + } + mqtt_client.subscribe_many(subscriptions).await?; + } + + Ok(Event::Incoming(Packet::SubAck(_))) => { + break; + } + + Err(err) => { + if Connection::pause_on_error(&err) { + Connection::do_pause().await; + } + } + _ => (), + } + } + + let _ = mqtt_client.disconnect().await; + Ok(()) +} + +/// Clear a persistent session on the MQTT server `config.host`. +/// +/// The session named after the `config.session_name` is cleared +/// unsubscribing to all the topics given by the `config.subscriptions`. +/// +/// All the messages persisted for that session all cleared. +/// and no more messages will be stored till the session is re-created. +/// +/// A new `Connection` created with a config with the same session name, +/// will receive no messages that have been published meantime. +pub async fn clear_session(config: &Config) -> Result<(), MqttError> { + if config.session_name.is_none() { + return Err(MqttError::InvalidSessionConfig); + } + let mut mqtt_options = config.mqtt_options(); + mqtt_options.set_clean_session(true); + let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity); + + loop { + match event_loop.poll().await { + Ok(Event::Incoming(Packet::ConnAck(_))) => { + let subscriptions = config.subscriptions.filters(); + if subscriptions.is_empty() { + break; + } + for s in subscriptions.iter() { + mqtt_client.unsubscribe(&s.path).await?; + } + } + + Ok(Event::Incoming(Packet::UnsubAck(_))) => { + break; + } + + Err(err) => { + if Connection::pause_on_error(&err) { + Connection::do_pause().await; + } + } + _ => (), + } + } + + let _ = mqtt_client.disconnect().await; + Ok(()) +} diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index 6657e73e..187a62c2 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -305,4 +305,128 @@ mod tests { Ok(()) } + + #[tokio::test] + #[serial] + async fn creating_a_session() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + // Given an MQTT config with a well-known session name + let session_name = "my-session-name"; + let topic = "my/topic"; + let mqtt_config = mqtt_config + .with_session_name(session_name) + .with_subscriptions(topic.try_into()?); + + // This config can be created to initialize an MQTT session + init_session(&mqtt_config).await?; + + // Any messages published on that topic + broker + .publish(topic, "1st msg sent before a first connection") + .await?; + broker + .publish(topic, "2nd msg sent before a first connection") + .await?; + broker + .publish(topic, "3rd msg sent before a first connection") + .await?; + + // Will be received by the client with the same session name even for its first connection + let mut con = Connection::new(&mqtt_config).await?; + + assert_eq!( + MaybeMessage::Next(message(topic, "1st msg sent before a first connection")), + next_message(&mut con.received).await + ); + assert_eq!( + MaybeMessage::Next(message(topic, "2nd msg sent before a first connection")), + next_message(&mut con.received).await + ); + assert_eq!( + MaybeMessage::Next(message(topic, "3rd msg sent before a first connection")), + next_message(&mut con.received).await + ); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn a_session_must_have_a_name() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let result = init_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn a_named_session_must_not_set_clean_session() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default() + .with_port(broker.port) + .with_session_name("useless name") + .with_clean_session(true); + + let result = init_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } + + #[tokio::test] + #[serial] + async fn cleaning_a_session() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + // Given an MQTT config with a well-known session name + let session_name = "a-session-name"; + let topic = "a/topic"; + let mqtt_config = mqtt_config + .with_session_name(session_name) + .with_subscriptions(topic.try_into()?); + + // The session being initialized + init_session(&mqtt_config).await?; + + // And some messages published + broker + .publish(topic, "A fst msg published before clean") + .await?; + broker + .publish(topic, "A 2nd msg published before clean") + .await?; + + // If we clean the session + clear_session(&mqtt_config).await?; + + // And publish more messages + broker + .publish(topic, "A 3nd msg published after clean") + .await?; + + // Then no messages will be received by the client with the same session name + let mut con = Connection::new(&mqtt_config).await?; + + assert_eq!(MaybeMessage::Timeout, next_message(&mut con.received).await); + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn to_be_cleared_a_session_must_have_a_name() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = Config::default().with_port(broker.port); + + let result = clear_session(&mqtt_config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid session")); + } } diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs index 20d7a124..ea2a4d0b 100644 --- a/crates/common/mqtt_channel/src/topics.rs +++ b/crates/common/mqtt_channel/src/topics.rs @@ -1,6 +1,6 @@ use crate::errors::MqttError; use crate::Message; -use rumqttc::QoS; +use rumqttc::{QoS, SubscribeFilter}; use std::convert::TryInto; /// An MQTT topic @@ -108,6 +108,18 @@ impl TopicFilter { pub fn with_qos(self, qos: QoS) -> Self { Self { qos, ..self } } + + /// The list of `SubscribeFilter` expected by `mqttc` + pub(crate) fn filters(&self) -> Vec<SubscribeFilter> { + let qos = self.qos; + self.patterns + .iter() + .map(|path| SubscribeFilter { + path: path.clone(), + qos, + }) + .collect() + } } impl TryInto<Topic> for &str { |