diff options
author | Didier Wenzek <didier.wenzek@acidalie.com> | 2022-01-24 16:35:29 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-24 16:35:29 +0000 |
commit | c38d88d0a8359d0370435dd454d3312733583a08 (patch) | |
tree | 7d9db847f145c94b7bdd6f45543b12b1b8739a59 /crates/common | |
parent | 486c62271421e2aedf21ef6aab56f8d00a8fc14d (diff) |
Update the sawtooth publisher and remove deprecated mqtt_client crate (#778)
* Move `sawtooth_publisher` from mqtt_client/examples to crates/tests/sawtooth_publisher
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Update tests and CI README.
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Refactor the sawtooth publisher to use mqtt_channel
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Removing mqtt_client crate
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Cargo fmt
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Improve local variable naming
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Fix issue with tokio rt-multi-thread flavor
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
* Fix github workflow
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Co-authored-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/common')
-rw-r--r-- | crates/common/mqtt_channel/Cargo.toml | 5 | ||||
-rw-r--r-- | crates/common/mqtt_client/Cargo.toml | 29 | ||||
-rw-r--r-- | crates/common/mqtt_client/examples/publish_test.rs | 29 | ||||
-rw-r--r-- | crates/common/mqtt_client/examples/sawtooth_publisher.rs | 222 | ||||
-rw-r--r-- | crates/common/mqtt_client/examples/simple_mapper.rs | 69 | ||||
-rw-r--r-- | crates/common/mqtt_client/examples/temperature_publisher.rs | 96 | ||||
-rw-r--r-- | crates/common/mqtt_client/src/lib.rs | 862 | ||||
-rw-r--r-- | crates/common/mqtt_client/tests/mqtt_pub_sub_test.rs | 112 | ||||
-rw-r--r-- | crates/common/mqtt_client/tests/packet_size_tests.rs | 177 | ||||
-rw-r--r-- | crates/common/tedge_utils/Cargo.toml | 2 |
10 files changed, 4 insertions, 1599 deletions
diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml index 5061d0ce..9013d986 100644 --- a/crates/common/mqtt_channel/Cargo.toml +++ b/crates/common/mqtt_channel/Cargo.toml @@ -1,8 +1,9 @@ [package] name = "mqtt_channel" -version = "0.5.1" -edition = "2018" authors = ["thin-edge.io team <info@thin-edge.io>"] +edition = "2021" +rust-version = "1.58" +version = "0.5.2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/crates/common/mqtt_client/Cargo.toml b/crates/common/mqtt_client/Cargo.toml deleted file mode 100644 index 65bb05fc..00000000 --- a/crates/common/mqtt_client/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "mqtt_client" -version = "0.5.2" -authors = ["thin-edge.io team <info@thin-edge.io>"] -edition = "2021" -rust-version = "1.58" - -[dependencies] -async-trait = "0.1" -mockall = "0.10" -rumqttc = "0.10" -thiserror = "1.0" -tokio = { version = "1.12", features = ["sync", "macros"] } - -[dev-dependencies] -anyhow = "1.0" -async-log = "2.0" -env_logger = "0.9" -futures = "0.3" -futures-timer = "3.0" -json = "0.12" -log = "0.4" -rand = "0.8" -serde = "1.0" -mqtt_tests = { path = "../../tests/mqtt_tests" } -tokio-test = "0.4" - -[features] -integration-test = [] diff --git a/crates/common/mqtt_client/examples/publish_test.rs b/crates/common/mqtt_client/examples/publish_test.rs deleted file mode 100644 index f23b4cd8..00000000 --- a/crates/common/mqtt_client/examples/publish_test.rs +++ /dev/null @@ -1,29 +0,0 @@ -use mqtt_client::{Config, Message, MqttClient, MqttClientError, QoS, Topic}; - -async fn publish( - config: &Config, - qos: QoS, - payload: impl Into<String>, -) -> Result<(), MqttClientError> { - let topic = Topic::new("test/uubpb9wyi9asi46l624f")?; - let client = config.connect("publisher").await?; - let message = Message::new(&topic, payload.into()).qos(qos); - - let () = client.publish(message).await?; - client.disconnect().await?; - Ok(()) -} - -#[tokio::main] -pub async fn main() -> Result<(), Box<dyn std::error::Error>> { - let config = Config::new("test.mosquitto.org", 1883); - let payload = "Hello there!"; - - env_logger::init(); - - publish(&config, QoS::AtMostOnce, payload).await?; - publish(&config, QoS::AtLeastOnce, payload).await?; - publish(&config, QoS::ExactlyOnce, payload).await?; - - Ok(()) -} diff --git a/crates/common/mqtt_client/examples/sawtooth_publisher.rs b/crates/common/mqtt_client/examples/sawtooth_publisher.rs deleted file mode 100644 index 704be6c7..00000000 --- a/crates/common/mqtt_client/examples/sawtooth_publisher.rs +++ /dev/null @@ -1,222 +0,0 @@ -use futures::future::FutureExt; -use futures::select; -use futures_timer::Delay; -use log::debug; -use log::error; -use log::info; -use mqtt_client::{ - Client, Config, Message, MqttClient, MqttClientError, MqttErrorStream, MqttMessageStream, Topic, -}; -use std::convert::TryFrom; -use std::env; -use std::io::Write; -use std::process; -use std::time::{Duration, Instant}; -//use rumqttc::QoS; - -/* - -This is a small and flexible publisher for deterministic test data. -Its based on the temperature publisher. - -- TODO: Improve code quality -- TODO: Add different data types for JSON publishing -- TODO: Command line switch to swith betwen REST and JSON -- TODO: Currently REST sending is disabled and JSON publishing is enabled -- TODO: Add QoS selection -*/ - -const C8Y_TEMPLATE_RESTART: &str = "510"; - -// Templates: -// https://cumulocity.com/guides/10.4.6/device-sdk/mqtt/ -// -// Create custom measurement (200) -// Create signal strength measurement (210) -// Create temperature measurement (211) -// Create battery measurement (212) - -// sawtooth_publisher <wait_time_ms> <height> <iterations> <template> -// -// cargo run --example sawtooth_publisher 100 100 100 flux -// cargo run --example sawtooth_publisher 1000 10 10 sawmill - -#[tokio::main] -pub async fn main() -> Result<(), Box<dyn std::error::Error>> { - let args: Vec<String> = env::args().collect(); - // wait time, template, tooth-height, - if args.len() != 5 { - println!("Usage: sawtooth_publisher <wait_time_ms> <height> <iterations> <template: sawmill|flux>"); - panic!("Errof: Not enough Command line Arguments"); - } - let wait: i32 = args[1].parse().expect("Cannot parse wait time"); - let height: i32 = args[2].parse().expect("Cannot parse height"); - let iterations: i32 = args[3].parse().expect("Cannot parse iterations"); - let template: String = String::from(&args[4]); - - println!( - "Publishing sawtooth with delay {}ms height {} iterations {} template {} will cause {} publishs.", - wait, - height, - iterations, - template, - height * iterations - ); - let c8y_msg = Topic::new("tedge/measurements")?; - let c8y_cmd = Topic::new("c8y/s/ds")?; - let c8y_err = Topic::new("c8y/s/e")?; - - init_logger(); - - let name = "sawtooth_".to_string() + &process::id().to_string(); - let mqtt = Client::connect(&name, &Config::default()).await?; - - let commands = mqtt.subscribe(c8y_cmd.filter()).await?; - let c8y_errors = mqtt.subscribe(c8y_err.filter()).await?; - let errors = mqtt.subscribe_errors(); - - let start = Instant::now(); - - if template == "flux" { - tokio::spawn(publish_topic(mqtt, c8y_msg, wait, height, iterations)); - } else if template == "sawmill" { - tokio::spawn(publish_multi_topic(mqtt, c8y_msg, wait, height, iterations)); - } else { - println!("Wrong template"); - panic!("Exiting"); - }; - - select! { - _ = listen_command(commands).fuse() => (), - _ = listen_c8y_error(c8y_errors).fuse() => (), - _ = listen_error(errors).fuse() => (), - } - - let elapsed = start.elapsed(); - println!( - "Execution took {} s {} ms", - elapsed.as_secs(), - elapsed.as_millis() - ); - - let elapsedm: u32 = u32::try_from(elapsed.as_millis()).unwrap(); - let elapsedmsf: f64 = f64::try_from(elapsedm).unwrap(); - let rate: f64 = - elapsedmsf / (f64::try_from(height).unwrap() * f64::try_from(iterations).unwrap()); - - let pubpersec = 1.0 / rate * 1000.0; - println!("Publish rate: {:.3} ms/pub", rate); - println!("Publish per second: {:.3} pub/s", pubpersec); - - Ok(()) -} - -async fn publish_topic( - mqtt: Client, - c8y_msg: Topic, - wait: i32, - height: i32, - iterations: i32, -) -> Result<(), MqttClientError> { - info!("Publishing temperature measurements"); - println!(); - for iteration in 0..iterations { - for value in 0..height { - let payload = format!("{{ {}: {} }}", "\"Flux [F]\"", value); - debug!("{} ", value); - debug!("{}", payload); - - mqtt.publish(Message::new(&c8y_msg, payload)).await?; - Delay::new(Duration::from_millis(u64::try_from(wait).unwrap())).await; - std::io::stdout().flush().expect("Flush failed"); - } - println!("Iteraton: {}", iteration); - } - println!(); - - mqtt.disconnect().await?; - Ok(()) -} - -async fn publish_multi_topic( - mqtt: Client, - c8y_msg: Topic, - wait: i32, - height: i32, - iterations: i32, -) -> Result<(), MqttClientError> { - info!("Publishing temperature measurements"); - println!(); - let series_name = "\"Sawmill [S]\""; - let series_count = 10; - for iteration in 0..iterations { - for value in 0..height { - let mut series: String = String::new(); - for s in 0..series_count { - series += &format!( - "\"saw_{}\": {} ,", - s, - (value + s * height / series_count) % height - ); - } - let seriesx = &series.trim_end_matches(','); - - let payload = format!("{{ {}: {{ {} }} }}", series_name, seriesx); - debug!("{} ", value); - debug!("{}", payload); - - mqtt.publish(Message::new(&c8y_msg, payload)).await?; - - Delay::new(Duration::from_millis(u64::try_from(wait).unwrap())).await; - std::io::stdout().flush().expect("Flush failed"); - } - println!("Iteraton: {}", iteration); - } - println!(); - - mqtt.disconnect().await?; - Ok(()) -} - -async fn listen_command(mut messages: Box<dyn MqttMessageStream>) { - while let Some(message) = messages.next().await { - debug!("C8Y command: {:?}", message.payload_str()); - if let Ok(cmd) = message.payload_str() { - if cmd.contains(C8Y_TEMPLATE_RESTART) { - info!("Stopping on remote request ... should be restarted by the daemon monitor."); - break; - } - } - } -} - -async fn listen_c8y_error(mut messages: Box<dyn MqttMessageStream>) { - let mut count: u32 = 0; - while let Some(message) = messages.next().await { - error!("C8Y error: {:?}", message.payload_str()); - if count >= 3 { - panic!("Panic!"); - } - count += 1; - } -} - -async fn listen_error(mut errors: Box<dyn MqttErrorStream>) { - let mut count: u32 = 0; - while let Some(error) = errors.next().await { - error!("System error: {}", error); - if count >= 3 { - panic!("Panic!"); - } - count += 1; - } -} - -fn init_logger() { - let logger = env_logger::Logger::from_default_env(); - let task_id = 1; - - async_log::Logger::wrap(logger, move || task_id) - .start(log::LevelFilter::Trace) - .unwrap(); -} diff --git a/crates/common/mqtt_client/examples/simple_mapper.rs b/crates/common/mqtt_client/examples/simple_mapper.rs deleted file mode 100644 index c3345e30..00000000 --- a/crates/common/mqtt_client/examples/simple_mapper.rs +++ /dev/null @@ -1,69 +0,0 @@ -use json::JsonValue; -use log::{debug, error, info}; -use mqtt_client::{Client, Config, Message, MqttClient, Topic}; - -#[tokio::main] -pub async fn main() -> Result<(), Box<dyn std::error::Error>> { - let name = "c8y_mapper"; - let in_topic = Topic::new("tedge/measurements")?; - let out_topic = Topic::new("c8y/s/us")?; - let err_topic = Topic::new("tedge/errors")?; - - env_logger::init(); - - info!("Mapping ThinEdge messages"); - let mqtt = Client::connect(name, &Config::default()).await?; - let mut errors = mqtt.subscribe_errors(); - tokio::spawn(async move { - while let Some(error) = errors.next().await { - error!("{}", error); - } - }); - - let mut messages = mqtt.subscribe(in_topic.filter()).await?; - while let Some(message) = messages.next().await { - debug!("Mapping {:?}", message); - match translate(message.payload_str()?) { - Ok(translation) => { - let _ = mqtt.publish(Message::new(&out_topic, translation)).await?; - } - Err(error) => { - debug!("Translation error: {}", error); - let _ = mqtt.publish(Message::new(&err_topic, error)).await?; - } - } - } - - Ok(()) -} - -const C8Y_TEMPLATE_TEMPERATURE: &str = "211"; - -/// Naive mapper which extracts the temperature field from a ThinEdge Json value. -/// -/// `{ "temperature": 12.4 }` is translated into `"211,12.4"` -fn translate(input: &str) -> Result<String, String> { - let json = json::parse(input).map_err(|err| format!("ERROR: {}", err))?; - match json { - JsonValue::Object(obj) => { - for (k, v) in obj.iter() { - if k != "temperature" { - return Err(format!("ERROR: unknown measurement type '{}'", k)); - } - match v { - JsonValue::Number(num) => { - let value: f64 = (*num).into(); - if value == 0.0 || value.is_normal() { - return Ok(format!("{},{}", C8Y_TEMPLATE_TEMPERATURE, value)); - } else { - return Err(format!("ERROR: value out of range '{}'", v)); - } - } - _ => return Err(format!("ERROR: expected a number, not '{}'", v)), - } - } - Err(String::from("ERROR: empty measurement")) - } - _ => return Err(format!("ERROR: expected a JSON object, not {}", json)), - } -} diff --git a/crates/common/mqtt_client/examples/temperature_publisher.rs b/crates/common/mqtt_client/examples/temperature_publisher.rs deleted file mode 100644 index 0a045def..00000000 --- a/crates/common/mqtt_client/examples/temperature_publisher.rs +++ /dev/null @@ -1,96 +0,0 @@ -use futures::future::FutureExt; -use futures::select; -use futures_timer::Delay; -use log::debug; -use log::error; -use log::info; -use mqtt_client::{ - Client, Config, Message, MqttClient, MqttClientError, MqttErrorStream, MqttMessageStream, Topic, -}; -use rand::prelude::*; -use std::time::Duration; - -const C8Y_TEMPLATE_RESTART: &str = "510"; -const C8Y_TEMPLATE_TEMPERATURE: &str = "211"; - -#[tokio::main] -pub async fn main() -> Result<(), Box<dyn std::error::Error>> { - let c8y_msg = Topic::new("c8y/s/us")?; - let c8y_cmd = Topic::new("c8y/s/ds")?; - let c8y_err = Topic::new("c8y/s/e")?; - - init_logger(); - - let mqtt = Client::connect("temperature", &Config::default()).await?; - - let commands = mqtt.subscribe(c8y_cmd.filter()).await?; - let c8y_errors = mqtt.subscribe(c8y_err.filter()).await?; - let errors = mqtt.subscribe_errors(); - - tokio::spawn(publish_temperature(mqtt, c8y_msg)); - - select! { - _ = listen_command(commands).fuse() => (), - _ = listen_c8y_error(c8y_errors).fuse() => (), - _ = listen_error(errors).fuse() => (), - } - - Ok(()) -} - -async fn publish_temperature(mqtt: Client, c8y_msg: Topic) -> Result<(), MqttClientError> { - let mut temperature: i32 = random_in_range(-10, 20); - - info!("Publishing temperature measurements"); - for _ in 1..10 { - let delta = random_in_range(-1, 2); - temperature += delta; - - let payload = format!("{},{}", C8Y_TEMPLATE_TEMPERATURE, temperature); - debug!("{}", payload); - mqtt.publish(Message::new(&c8y_msg, payload)).await?; - - Delay::new(Duration::from_millis(1000)).await; - } - - mqtt.disconnect().await?; - Ok(()) -} - -fn random_in_range(low: i32, high: i32) -> i32 { - let mut rng = thread_rng(); - rng.gen_range(low..high) -} - -async fn listen_command(mut messages: Box<dyn MqttMessageStream>) { - while let Some(message) = messages.next().await { - debug!("C8Y command: {:?}", message.payload_str()); - if let Ok(cmd) = message.payload_str() { - if cmd.contains(C8Y_TEMPLATE_RESTART) { - info!("Stopping on remote request ... should be restarted by the daemon monitor."); - break; - } - } - } -} - -async fn listen_c8y_error(mut messages: Box<dyn MqttMessageStream>) { - while let Some(message) = messages.next().await { - error!("C8Y error: {:?}", message.payload_str()); - } -} - -async fn listen_error(mut errors: Box<dyn MqttErrorStream>) { - while let Some(error) = errors.next().await { - error!("System error: {}", error); - } -} - -fn init_logger() { - let logger = env_logger::Logger::from_default_env(); - let task_id = 1; - - async_log::Logger::wrap(logger, move || task_id) - .start(log::LevelFilter::Trace) - .unwrap(); -} diff --git a/crates/common/mqtt_client/src/lib.rs b/crates/common/mqtt_client/src/lib.rs deleted file mode 100644 index 6602167e..00000000 --- a/crates/common/mqtt_client/src/lib.rs +++ /dev/null @@ -1,862 +0,0 @@ -//! A library to connect the local MQTT bus, publish messages and subscribe topics. -//! -//! ```no_run -//! use mqtt_client::{MqttClient,Config,Message,Topic}; -//! -//! #[tokio::main] -//! async fn main (){ -//! let mqtt = Config::default().connect("temperature").await.unwrap(); -//! let c8y_msg = Topic::new("c8y/s/us").unwrap(); -//! mqtt.publish(Message::new(&c8y_msg, "211,23")).await.unwrap(); -//! mqtt.disconnect().await.unwrap(); -//! } -//! ``` -#![forbid(unsafe_code)] -#![deny(clippy::mem_forget)] - -use async_trait::async_trait; -use mockall::automock; -pub use rumqttc::QoS; -use rumqttc::{Event, Incoming, Outgoing, Packet, Publish, Request, StateError}; -use std::collections::HashMap; -use std::sync::{ - atomic::{AtomicIsize, Ordering}, - Arc, -}; -use tokio::sync::{broadcast, Notify}; - -#[automock] -#[async_trait] -pub trait MqttClient: Send + Sync { - fn subscribe_errors(&self) -> Box<dyn MqttErrorStream>; - - async fn subscribe( - &self, - filter: TopicFilter, - ) -> Result<Box<dyn MqttMessageStream>, MqttClientError>; - - async fn publish(&self, message: Message) -> Result<(), MqttClientError>; -} - -#[async_trait] -#[automock] -pub trait MqttMessageStream: Send + Sync { - async fn next(&mut self) -> Option<Message>; -} - -#[async_trait] -#[automock] -pub trait MqttErrorStream: Send + Sync { - async fn next(&mut self) -> Option<Arc<MqttClientError>>; -} - -/// A connection to the local MQTT bus. -/// -/// The host and port are implied: a connection can only be open on the localhost, port 1883. -/// -/// ```no_run -/// use mqtt_client::{Config,Message,MqttClient,Topic}; -/// -/// #[tokio::main] -/// async fn main () { -/// let mut mqtt = Config::default().connect("temperature").await.unwrap(); -/// let c8y_msg = Topic::new("c8y/s/us").unwrap(); -/// mqtt.publish(Message::new(&c8y_msg, "211,23")).await.unwrap(); -/// mqtt.disconnect().await.unwrap(); -/// } -/// ``` -#[derive(Debug)] -pub struct Client { - name: String, - mqtt_client: rumqttc::AsyncClient, - message_sender: broadcast::Sender<Message>, - error_sender: broadcast::Sender<Arc<MqttClientError>>, - join_handle: tokio::task::JoinHandle<()>, - requests_tx: rumqttc::Sender<Request>, - inflight: Arc<InflightTracking>, -} - -/// Tracks the number of inflight / pending publish requests. -#[derive(Debug)] -struct InflightTracking { - /// Tracks number of pending publish message until they are - /// known to be sent out by the event loop. - pending_publish_count: AtomicIsize, - /// Tracks number of pending puback's (not completed messages of QoS=1). - pending_puback_count: AtomicIsize, - /// Tracks number of pending pubcomp's (not completed messages of QoS=2). - pending_pubcomp_count: AtomicIsize, - - /// Notify on the condition when all requests have completed. - notify_completed: Notify, -} - -impl InflightTracking { - fn new() -> Self { - Self { - pending_publish_count: AtomicIsize::new(0), - pending_puback_count: AtomicIsize::new(0), - pending_pubcomp_count: AtomicIsize::new(0), - notify_completed: Notify::new(), - } - } - - fn has_pending(&self) -> bool { - self.pending_publish_count.load(Ordering::Relaxed) > 0 - || self.pending_puback_count.load(Ordering::Relaxed) > 0 - || self.pending_pubcomp_count.load(Ordering::Relaxed) > 0 - } - - /// Resolves when all pending requests have been completed. - async fn all_completed(&self) { - while self.has_pending() { - // Calling `notify_one()` before `notified().await` is safe, no signal is lost. - // - // Nevertheless, we still use a timeout (but a larger one) to be on the safe side and - // not risk any race condition. - let _ = tokio::time::timeout( - std::time::Duration::from_millis(500), - self.notify_completed.notified(), - ) - .await; - } - } - - fn track_publish_request(&self, qos: QoS) { - self.pending_publish_count.fetch_add(1, Ordering::Relaxed); - match qos { - QoS::AtMostOnce => {} - QoS::AtLeastOnce => { - self.pending_puback_count.fetch_add(1, Ordering::Relaxed); - } - QoS::ExactlyOnce => { - self.pending_pubcomp_count.fetch_add(1, Ordering::Relaxed); - } - } - } - - fn track_publish_request_sentout(&self) { - self.pending_publish_count.fetch_sub(1, Ordering::Relaxed); - self.check_completed(); - } - - fn track_publish_qos1_completed(&self) { - self.pending_puback_count.fetch_sub(1, Ordering::Relaxed); - self.check_completed(); - } - - fn track_publish_qos2_completed(&self) { - self.pending_pubcomp_count.fetch_sub(1, Ordering::Relaxed); - self.check_completed(); - } - - fn check_completed(&self) { - if !self.has_pending() { - self.notify_completed.notify_one(); - } - } -} - -// Send a message on a broadcast channel discarding any error. -// The send can only fail if there is no listener. -macro_rules! send_discarding_error { - ($sender:expr, $msg:expr) => { - let _ = $sender.send($msg); - }; -} - -/// MQTT message id -type MessageId = u16; - -impl Client { - /// Open a connection to the local MQTT bus, using the given name to register an MQTT session. - /// - /// Reusing the same session name on each connection allows a client - /// to have its subscriptions persisted by the broker - /// so messages sent while the client is disconnected - /// will be resent on its re-connection. - /// - /// ```no_run - /// use mqtt_client::{Config,Client,MqttClient,Topic}; - /// - /// #[tokio::main] - /// async fn main () { - /// let c8y_cmd = Topic::new("c8y/s/ds").unwrap(); - /// let config = Config::default(); - /// - /// let mqtt = Client::connect("temperature", &config).await.unwrap(); - /// let mut commands = mqtt.subscribe(c8y_cmd.filter()).await.unwrap(); - /// // process some commands and disconnect - /// mqtt.disconnect().await.unwrap(); - /// - /// // wait a while and reconnect - /// let mqtt = Client::connect("temperature", &config).await.unwrap(); - /// let mut commands = mqtt.subscribe(c8y_cmd.filter()).await.unwrap(); - /// // process the messages even those sent during the pause - /// } - /// ``` - pub async fn connect(name: &str, config: &Config) -> Result<Client, MqttClientError> { - let name = String::from(name); - let mut mqtt_options = rumqttc::MqttOptions::new(&name, &config.host, config.port); - mqtt_options.set_clean_session(config.clean_session); - if let Some(inflight) = config.inflight { - mqtt_options.set_inflight(inflight); - } - - if let Some(packet_size) = config.packet_size { - mqtt_options.set_max_packet_size(packet_size, packet_size); - } - - let (mqtt_client, eventloop) = - rumqttc::AsyncClient::new(mqtt_options, config.queue_capacity); - let requests_tx = eventloop.requests_tx.clone(); - let (message_sender, _) = broadcast::channel(config.queue_capacity); - let (error_sender, _) = broadcast::channel(config.queue_capacity); - - let inflight = Arc::new(InflightTracking::new()); - - let join_handle = tokio::spawn(Client::bg_process( - eventloop, - message_sender.clone(), - error_sender.clone(), - inflight.clone(), - )); - - Ok(Client { - name, - mqtt_client, - message_sender, - error_sender, - join_handle, - requests_tx, - inflight, - }) - } - - /// Returns the name used by the MQTT client. - pub fn name(&self) -> &str { - &self.name - } - - /// Disconnect the client and drop it. - pub async fn disconnect(self) -> Result<(), MqttClientError> { - let () = self.mqtt_client.disconnect().await?; - self.join_handle - .await - .map_err(|_| MqttClientError::JoinError) - } - - /// Returns `true` if there are pending inflight messages. - pub fn has_pending(&self) -> bool { - self.inflight.has_pending() - } - - /// Resolves when all pending requests have been completed. - pub async fn all_completed(&self) { - self.inflight.all_completed().await - } - - /// Process all the MQTT events - /// - broadcasting the incoming messages to the message sender, - /// - broadcasting the errors to the error sender. - async fn bg_process( - mut event_loop: rumqttc::EventLoop, - message_sender: broadcast::Sender<Message>, - error_sender: broadcast::Sender<Arc<MqttClientError>>, - inflight: Arc<InflightTracking>, - ) { - // Delay announcing a QoS=2 message to the client until we have seen a PUBREL. - let mut pending_received_messages: HashMap<MessageId, Message> = HashMap::new(); - - loop { - match event_loop.poll().await { - Ok(Event::Incoming(Packet::Publish(msg))) => { - match msg.qos { - QoS::AtLeastOnce | QoS::AtMostOnce => { - send_discarding_error!(message_sender, msg.into()); - } - QoS::ExactlyOnce => { - // Do not announce the incoming publish message immediately in case - // of QoS=2. Wait for the PUBREL. - - let _ = pending_received_messages.insert(msg.pkid, msg.into()); - } - } - } - - Ok(Event::Incoming(Packet::PubRel(pubrel))) => { - if let Some(msg) = pending_received_messages.remove(&pubrel.pkid) { - assert!(msg.qos == QoS::ExactlyOnce); - send_discarding_error!(message_sender, msg); - } - } - - Ok(Event::Outgoing(Outgoing::Publish(_id))) => { - inflight.track_publish_request_sentout(); - } - - Ok(Event::Incoming(Packet::PubAck(_))) => { - // Reception of PUBACK means that a QoS=1 request completed. - inflight.track_publish_qos1_completed(); - } - - Ok(Event::Incoming(Packet::PubComp(_))) => { - // Reception of PUBCOMP means that a QoS=2 request completed. - inflight.track_publish_qos2_completed(); - } - - Ok(Event::Incoming(Incoming::Disconnect)) - | Ok(Event::Outgoing(Outgoing::Disconnect)) => { - break; - } - - Err(err) => { - let delay = match &err { - rumqttc::ConnectionError::Io(_) => true, - rumqttc::ConnectionError::MqttState(state_error) - if matches!(state_error, StateError::Io(_)) => - { - true - } - rumqttc::ConnectionError::MqttState(_) => true, - rumqttc::ConnectionError::Mqtt4Bytes(_) => true, - _ => false, - }; - - send_discarding_error!(error_sender, Arc::new(err.into())); - - if delay { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - _ => (), - } - } - } -} - -#[async_trait] -im |