summaryrefslogtreecommitdiffstats
path: root/crates/common/mqtt_channel/src/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/common/mqtt_channel/src/connection.rs')
-rw-r--r--crates/common/mqtt_channel/src/connection.rs36
1 files changed, 8 insertions, 28 deletions
diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs
index f6de0cd5..666021ef 100644
--- a/crates/common/mqtt_channel/src/connection.rs
+++ b/crates/common/mqtt_channel/src/connection.rs
@@ -90,42 +90,22 @@ impl Connection {
})
}
- fn mqtt_options(config: &Config) -> rumqttc::MqttOptions {
- let id = match &config.session_name {
- None => std::iter::repeat_with(fastrand::lowercase)
- .take(10)
- .collect(),
- Some(name) => name.clone(),
- };
-
- let mut mqtt_options = rumqttc::MqttOptions::new(id, &config.host, config.port);
- mqtt_options.set_clean_session(config.clean_session);
- mqtt_options.set_max_packet_size(config.max_packet_size, config.max_packet_size);
-
- mqtt_options
- }
-
async fn open(
config: &Config,
mut message_sender: mpsc::UnboundedSender<Message>,
mut error_sender: mpsc::UnboundedSender<MqttError>,
) -> Result<(AsyncClient, EventLoop), MqttError> {
- let mqtt_options = Connection::mqtt_options(config);
+ let mqtt_options = config.mqtt_options();
let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity);
- let topic = &config.subscriptions;
- let qos = topic.qos;
-
loop {
match event_loop.poll().await {
Ok(Event::Incoming(Packet::ConnAck(_))) => {
- if topic.patterns.is_empty() {
+ let subscriptions = config.subscriptions.filters();
+ if subscriptions.is_empty() {
break;
}
-
- for pattern in topic.patterns.iter() {
- let () = mqtt_client.subscribe(pattern, qos).await?;
- }
+ mqtt_client.subscribe_many(subscriptions).await?;
}
Ok(Event::Incoming(Packet::SubAck(_))) => {
@@ -139,12 +119,12 @@ impl Connection {
}
Err(err) => {
- let delay = Connection::pause_on_error(&err);
+ let should_delay = Connection::pause_on_error(&err);
// Errors on send are ignored: it just means the client has closed the receiving channel.
let _ = error_sender.send(err.into()).await;
- if delay {
+ if should_delay {
Connection::do_pause().await;
}
}
@@ -217,7 +197,7 @@ impl Connection {
let _ = mqtt_client.disconnect().await;
}
- fn pause_on_error(err: &ConnectionError) -> bool {
+ pub(crate) fn pause_on_error(err: &ConnectionError) -> bool {
match &err {
rumqttc::ConnectionError::Io(_) => true,
rumqttc::ConnectionError::MqttState(state_error)
@@ -231,7 +211,7 @@ impl Connection {
}
}
- async fn do_pause() {
+ pub(crate) async fn do_pause() {
sleep(Duration::from_secs(1)).await;
}
}