summaryrefslogtreecommitdiffstats
path: root/crates/common
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@acidalie.com>2022-01-24 16:35:29 +0000
committerGitHub <noreply@github.com>2022-01-24 16:35:29 +0000
commitc38d88d0a8359d0370435dd454d3312733583a08 (patch)
tree7d9db847f145c94b7bdd6f45543b12b1b8739a59 /crates/common
parent486c62271421e2aedf21ef6aab56f8d00a8fc14d (diff)
Update the sawtooth publisher and remove deprecated mqtt_client crate (#778)
* Move `sawtooth_publisher` from mqtt_client/examples to crates/tests/sawtooth_publisher Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Update tests and CI README. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Refactor the sawtooth publisher to use mqtt_channel Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Removing mqtt_client crate Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Cargo fmt Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Improve local variable naming Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Fix issue with tokio rt-multi-thread flavor Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Fix github workflow Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> Co-authored-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/common')
-rw-r--r--crates/common/mqtt_channel/Cargo.toml5
-rw-r--r--crates/common/mqtt_client/Cargo.toml29
-rw-r--r--crates/common/mqtt_client/examples/publish_test.rs29
-rw-r--r--crates/common/mqtt_client/examples/sawtooth_publisher.rs222
-rw-r--r--crates/common/mqtt_client/examples/simple_mapper.rs69
-rw-r--r--crates/common/mqtt_client/examples/temperature_publisher.rs96
-rw-r--r--crates/common/mqtt_client/src/lib.rs862
-rw-r--r--crates/common/mqtt_client/tests/mqtt_pub_sub_test.rs112
-rw-r--r--crates/common/mqtt_client/tests/packet_size_tests.rs177
-rw-r--r--crates/common/tedge_utils/Cargo.toml2
10 files changed, 4 insertions, 1599 deletions
diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml
index 5061d0ce..9013d986 100644
--- a/crates/common/mqtt_channel/Cargo.toml
+++ b/crates/common/mqtt_channel/Cargo.toml
@@ -1,8 +1,9 @@
[package]
name = "mqtt_channel"
-version = "0.5.1"
-edition = "2018"
authors = ["thin-edge.io team <info@thin-edge.io>"]
+edition = "2021"
+rust-version = "1.58"
+version = "0.5.2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
diff --git a/crates/common/mqtt_client/Cargo.toml b/crates/common/mqtt_client/Cargo.toml
deleted file mode 100644
index 65bb05fc..00000000
--- a/crates/common/mqtt_client/Cargo.toml
+++ /dev/null
@@ -1,29 +0,0 @@
-[package]
-name = "mqtt_client"
-version = "0.5.2"
-authors = ["thin-edge.io team <info@thin-edge.io>"]
-edition = "2021"
-rust-version = "1.58"
-
-[dependencies]
-async-trait = "0.1"
-mockall = "0.10"
-rumqttc = "0.10"
-thiserror = "1.0"
-tokio = { version = "1.12", features = ["sync", "macros"] }
-
-[dev-dependencies]
-anyhow = "1.0"
-async-log = "2.0"
-env_logger = "0.9"
-futures = "0.3"
-futures-timer = "3.0"
-json = "0.12"
-log = "0.4"
-rand = "0.8"
-serde = "1.0"
-mqtt_tests = { path = "../../tests/mqtt_tests" }
-tokio-test = "0.4"
-
-[features]
-integration-test = []
diff --git a/crates/common/mqtt_client/examples/publish_test.rs b/crates/common/mqtt_client/examples/publish_test.rs
deleted file mode 100644
index f23b4cd8..00000000
--- a/crates/common/mqtt_client/examples/publish_test.rs
+++ /dev/null
@@ -1,29 +0,0 @@
-use mqtt_client::{Config, Message, MqttClient, MqttClientError, QoS, Topic};
-
-async fn publish(
- config: &Config,
- qos: QoS,
- payload: impl Into<String>,
-) -> Result<(), MqttClientError> {
- let topic = Topic::new("test/uubpb9wyi9asi46l624f")?;
- let client = config.connect("publisher").await?;
- let message = Message::new(&topic, payload.into()).qos(qos);
-
- let () = client.publish(message).await?;
- client.disconnect().await?;
- Ok(())
-}
-
-#[tokio::main]
-pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
- let config = Config::new("test.mosquitto.org", 1883);
- let payload = "Hello there!";
-
- env_logger::init();
-
- publish(&config, QoS::AtMostOnce, payload).await?;
- publish(&config, QoS::AtLeastOnce, payload).await?;
- publish(&config, QoS::ExactlyOnce, payload).await?;
-
- Ok(())
-}
diff --git a/crates/common/mqtt_client/examples/sawtooth_publisher.rs b/crates/common/mqtt_client/examples/sawtooth_publisher.rs
deleted file mode 100644
index 704be6c7..00000000
--- a/crates/common/mqtt_client/examples/sawtooth_publisher.rs
+++ /dev/null
@@ -1,222 +0,0 @@
-use futures::future::FutureExt;
-use futures::select;
-use futures_timer::Delay;
-use log::debug;
-use log::error;
-use log::info;
-use mqtt_client::{
- Client, Config, Message, MqttClient, MqttClientError, MqttErrorStream, MqttMessageStream, Topic,
-};
-use std::convert::TryFrom;
-use std::env;
-use std::io::Write;
-use std::process;
-use std::time::{Duration, Instant};
-//use rumqttc::QoS;
-
-/*
-
-This is a small and flexible publisher for deterministic test data.
-Its based on the temperature publisher.
-
-- TODO: Improve code quality
-- TODO: Add different data types for JSON publishing
-- TODO: Command line switch to swith betwen REST and JSON
-- TODO: Currently REST sending is disabled and JSON publishing is enabled
-- TODO: Add QoS selection
-*/
-
-const C8Y_TEMPLATE_RESTART: &str = "510";
-
-// Templates:
-// https://cumulocity.com/guides/10.4.6/device-sdk/mqtt/
-//
-// Create custom measurement (200)
-// Create signal strength measurement (210)
-// Create temperature measurement (211)
-// Create battery measurement (212)
-
-// sawtooth_publisher <wait_time_ms> <height> <iterations> <template>
-//
-// cargo run --example sawtooth_publisher 100 100 100 flux
-// cargo run --example sawtooth_publisher 1000 10 10 sawmill
-
-#[tokio::main]
-pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
- let args: Vec<String> = env::args().collect();
- // wait time, template, tooth-height,
- if args.len() != 5 {
- println!("Usage: sawtooth_publisher <wait_time_ms> <height> <iterations> <template: sawmill|flux>");
- panic!("Errof: Not enough Command line Arguments");
- }
- let wait: i32 = args[1].parse().expect("Cannot parse wait time");
- let height: i32 = args[2].parse().expect("Cannot parse height");
- let iterations: i32 = args[3].parse().expect("Cannot parse iterations");
- let template: String = String::from(&args[4]);
-
- println!(
- "Publishing sawtooth with delay {}ms height {} iterations {} template {} will cause {} publishs.",
- wait,
- height,
- iterations,
- template,
- height * iterations
- );
- let c8y_msg = Topic::new("tedge/measurements")?;
- let c8y_cmd = Topic::new("c8y/s/ds")?;
- let c8y_err = Topic::new("c8y/s/e")?;
-
- init_logger();
-
- let name = "sawtooth_".to_string() + &process::id().to_string();
- let mqtt = Client::connect(&name, &Config::default()).await?;
-
- let commands = mqtt.subscribe(c8y_cmd.filter()).await?;
- let c8y_errors = mqtt.subscribe(c8y_err.filter()).await?;
- let errors = mqtt.subscribe_errors();
-
- let start = Instant::now();
-
- if template == "flux" {
- tokio::spawn(publish_topic(mqtt, c8y_msg, wait, height, iterations));
- } else if template == "sawmill" {
- tokio::spawn(publish_multi_topic(mqtt, c8y_msg, wait, height, iterations));
- } else {
- println!("Wrong template");
- panic!("Exiting");
- };
-
- select! {
- _ = listen_command(commands).fuse() => (),
- _ = listen_c8y_error(c8y_errors).fuse() => (),
- _ = listen_error(errors).fuse() => (),
- }
-
- let elapsed = start.elapsed();
- println!(
- "Execution took {} s {} ms",
- elapsed.as_secs(),
- elapsed.as_millis()
- );
-
- let elapsedm: u32 = u32::try_from(elapsed.as_millis()).unwrap();
- let elapsedmsf: f64 = f64::try_from(elapsedm).unwrap();
- let rate: f64 =
- elapsedmsf / (f64::try_from(height).unwrap() * f64::try_from(iterations).unwrap());
-
- let pubpersec = 1.0 / rate * 1000.0;
- println!("Publish rate: {:.3} ms/pub", rate);
- println!("Publish per second: {:.3} pub/s", pubpersec);
-
- Ok(())
-}
-
-async fn publish_topic(
- mqtt: Client,
- c8y_msg: Topic,
- wait: i32,
- height: i32,
- iterations: i32,
-) -> Result<(), MqttClientError> {
- info!("Publishing temperature measurements");
- println!();
- for iteration in 0..iterations {
- for value in 0..height {
- let payload = format!("{{ {}: {} }}", "\"Flux [F]\"", value);
- debug!("{} ", value);
- debug!("{}", payload);
-
- mqtt.publish(Message::new(&c8y_msg, payload)).await?;
- Delay::new(Duration::from_millis(u64::try_from(wait).unwrap())).await;
- std::io::stdout().flush().expect("Flush failed");
- }
- println!("Iteraton: {}", iteration);
- }
- println!();
-
- mqtt.disconnect().await?;
- Ok(())
-}
-
-async fn publish_multi_topic(
- mqtt: Client,
- c8y_msg: Topic,
- wait: i32,
- height: i32,
- iterations: i32,
-) -> Result<(), MqttClientError> {
- info!("Publishing temperature measurements");
- println!();
- let series_name = "\"Sawmill [S]\"";
- let series_count = 10;
- for iteration in 0..iterations {
- for value in 0..height {
- let mut series: String = String::new();
- for s in 0..series_count {
- series += &format!(
- "\"saw_{}\": {} ,",
- s,
- (value + s * height / series_count) % height
- );
- }
- let seriesx = &series.trim_end_matches(',');
-
- let payload = format!("{{ {}: {{ {} }} }}", series_name, seriesx);
- debug!("{} ", value);
- debug!("{}", payload);
-
- mqtt.publish(Message::new(&c8y_msg, payload)).await?;
-
- Delay::new(Duration::from_millis(u64::try_from(wait).unwrap())).await;
- std::io::stdout().flush().expect("Flush failed");
- }
- println!("Iteraton: {}", iteration);
- }
- println!();
-
- mqtt.disconnect().await?;
- Ok(())
-}
-
-async fn listen_command(mut messages: Box<dyn MqttMessageStream>) {
- while let Some(message) = messages.next().await {
- debug!("C8Y command: {:?}", message.payload_str());
- if let Ok(cmd) = message.payload_str() {
- if cmd.contains(C8Y_TEMPLATE_RESTART) {
- info!("Stopping on remote request ... should be restarted by the daemon monitor.");
- break;
- }
- }
- }
-}
-
-async fn listen_c8y_error(mut messages: Box<dyn MqttMessageStream>) {
- let mut count: u32 = 0;
- while let Some(message) = messages.next().await {
- error!("C8Y error: {:?}", message.payload_str());
- if count >= 3 {
- panic!("Panic!");
- }
- count += 1;
- }
-}
-
-async fn listen_error(mut errors: Box<dyn MqttErrorStream>) {
- let mut count: u32 = 0;
- while let Some(error) = errors.next().await {
- error!("System error: {}", error);
- if count >= 3 {
- panic!("Panic!");
- }
- count += 1;
- }
-}
-
-fn init_logger() {
- let logger = env_logger::Logger::from_default_env();
- let task_id = 1;
-
- async_log::Logger::wrap(logger, move || task_id)
- .start(log::LevelFilter::Trace)
- .unwrap();
-}
diff --git a/crates/common/mqtt_client/examples/simple_mapper.rs b/crates/common/mqtt_client/examples/simple_mapper.rs
deleted file mode 100644
index c3345e30..00000000
--- a/crates/common/mqtt_client/examples/simple_mapper.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-use json::JsonValue;
-use log::{debug, error, info};
-use mqtt_client::{Client, Config, Message, MqttClient, Topic};
-
-#[tokio::main]
-pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
- let name = "c8y_mapper";
- let in_topic = Topic::new("tedge/measurements")?;
- let out_topic = Topic::new("c8y/s/us")?;
- let err_topic = Topic::new("tedge/errors")?;
-
- env_logger::init();
-
- info!("Mapping ThinEdge messages");
- let mqtt = Client::connect(name, &Config::default()).await?;
- let mut errors = mqtt.subscribe_errors();
- tokio::spawn(async move {
- while let Some(error) = errors.next().await {
- error!("{}", error);
- }
- });
-
- let mut messages = mqtt.subscribe(in_topic.filter()).await?;
- while let Some(message) = messages.next().await {
- debug!("Mapping {:?}", message);
- match translate(message.payload_str()?) {
- Ok(translation) => {
- let _ = mqtt.publish(Message::new(&out_topic, translation)).await?;
- }
- Err(error) => {
- debug!("Translation error: {}", error);
- let _ = mqtt.publish(Message::new(&err_topic, error)).await?;
- }
- }
- }
-
- Ok(())
-}
-
-const C8Y_TEMPLATE_TEMPERATURE: &str = "211";
-
-/// Naive mapper which extracts the temperature field from a ThinEdge Json value.
-///
-/// `{ "temperature": 12.4 }` is translated into `"211,12.4"`
-fn translate(input: &str) -> Result<String, String> {
- let json = json::parse(input).map_err(|err| format!("ERROR: {}", err))?;
- match json {
- JsonValue::Object(obj) => {
- for (k, v) in obj.iter() {
- if k != "temperature" {
- return Err(format!("ERROR: unknown measurement type '{}'", k));
- }
- match v {
- JsonValue::Number(num) => {
- let value: f64 = (*num).into();
- if value == 0.0 || value.is_normal() {
- return Ok(format!("{},{}", C8Y_TEMPLATE_TEMPERATURE, value));
- } else {
- return Err(format!("ERROR: value out of range '{}'", v));
- }
- }
- _ => return Err(format!("ERROR: expected a number, not '{}'", v)),
- }
- }
- Err(String::from("ERROR: empty measurement"))
- }
- _ => return Err(format!("ERROR: expected a JSON object, not {}", json)),
- }
-}
diff --git a/crates/common/mqtt_client/examples/temperature_publisher.rs b/crates/common/mqtt_client/examples/temperature_publisher.rs
deleted file mode 100644
index 0a045def..00000000
--- a/crates/common/mqtt_client/examples/temperature_publisher.rs
+++ /dev/null
@@ -1,96 +0,0 @@
-use futures::future::FutureExt;
-use futures::select;
-use futures_timer::Delay;
-use log::debug;
-use log::error;
-use log::info;
-use mqtt_client::{
- Client, Config, Message, MqttClient, MqttClientError, MqttErrorStream, MqttMessageStream, Topic,
-};
-use rand::prelude::*;
-use std::time::Duration;
-
-const C8Y_TEMPLATE_RESTART: &str = "510";
-const C8Y_TEMPLATE_TEMPERATURE: &str = "211";
-
-#[tokio::main]
-pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
- let c8y_msg = Topic::new("c8y/s/us")?;
- let c8y_cmd = Topic::new("c8y/s/ds")?;
- let c8y_err = Topic::new("c8y/s/e")?;
-
- init_logger();
-
- let mqtt = Client::connect("temperature", &Config::default()).await?;
-
- let commands = mqtt.subscribe(c8y_cmd.filter()).await?;
- let c8y_errors = mqtt.subscribe(c8y_err.filter()).await?;
- let errors = mqtt.subscribe_errors();
-
- tokio::spawn(publish_temperature(mqtt, c8y_msg));
-
- select! {
- _ = listen_command(commands).fuse() => (),
- _ = listen_c8y_error(c8y_errors).fuse() => (),
- _ = listen_error(errors).fuse() => (),
- }
-
- Ok(())
-}
-
-async fn publish_temperature(mqtt: Client, c8y_msg: Topic) -> Result<(), MqttClientError> {
- let mut temperature: i32 = random_in_range(-10, 20);
-
- info!("Publishing temperature measurements");
- for _ in 1..10 {
- let delta = random_in_range(-1, 2);
- temperature += delta;
-
- let payload = format!("{},{}", C8Y_TEMPLATE_TEMPERATURE, temperature);
- debug!("{}", payload);
- mqtt.publish(Message::new(&c8y_msg, payload)).await?;
-
- Delay::new(Duration::from_millis(1000)).await;
- }
-
- mqtt.disconnect().await?;
- Ok(())
-}
-
-fn random_in_range(low: i32, high: i32) -> i32 {
- let mut rng = thread_rng();
- rng.gen_range(low..high)
-}
-
-async fn listen_command(mut messages: Box<dyn MqttMessageStream>) {
- while let Some(message) = messages.next().await {
- debug!("C8Y command: {:?}", message.payload_str());
- if let Ok(cmd) = message.payload_str() {
- if cmd.contains(C8Y_TEMPLATE_RESTART) {
- info!("Stopping on remote request ... should be restarted by the daemon monitor.");
- break;
- }
- }
- }
-}
-
-async fn listen_c8y_error(mut messages: Box<dyn MqttMessageStream>) {
- while let Some(message) = messages.next().await {
- error!("C8Y error: {:?}", message.payload_str());
- }
-}
-
-async fn listen_error(mut errors: Box<dyn MqttErrorStream>) {
- while let Some(error) = errors.next().await {
- error!("System error: {}", error);
- }
-}
-
-fn init_logger() {
- let logger = env_logger::Logger::from_default_env();
- let task_id = 1;
-
- async_log::Logger::wrap(logger, move || task_id)
- .start(log::LevelFilter::Trace)
- .unwrap();
-}
diff --git a/crates/common/mqtt_client/src/lib.rs b/crates/common/mqtt_client/src/lib.rs
deleted file mode 100644
index 6602167e..00000000
--- a/crates/common/mqtt_client/src/lib.rs
+++ /dev/null
@@ -1,862 +0,0 @@
-//! A library to connect the local MQTT bus, publish messages and subscribe topics.
-//!
-//! ```no_run
-//! use mqtt_client::{MqttClient,Config,Message,Topic};
-//!
-//! #[tokio::main]
-//! async fn main (){
-//! let mqtt = Config::default().connect("temperature").await.unwrap();
-//! let c8y_msg = Topic::new("c8y/s/us").unwrap();
-//! mqtt.publish(Message::new(&c8y_msg, "211,23")).await.unwrap();
-//! mqtt.disconnect().await.unwrap();
-//! }
-//! ```
-#![forbid(unsafe_code)]
-#![deny(clippy::mem_forget)]
-
-use async_trait::async_trait;
-use mockall::automock;
-pub use rumqttc::QoS;
-use rumqttc::{Event, Incoming, Outgoing, Packet, Publish, Request, StateError};
-use std::collections::HashMap;
-use std::sync::{
- atomic::{AtomicIsize, Ordering},
- Arc,
-};
-use tokio::sync::{broadcast, Notify};
-
-#[automock]
-#[async_trait]
-pub trait MqttClient: Send + Sync {
- fn subscribe_errors(&self) -> Box<dyn MqttErrorStream>;
-
- async fn subscribe(
- &self,
- filter: TopicFilter,
- ) -> Result<Box<dyn MqttMessageStream>, MqttClientError>;
-
- async fn publish(&self, message: Message) -> Result<(), MqttClientError>;
-}
-
-#[async_trait]
-#[automock]
-pub trait MqttMessageStream: Send + Sync {
- async fn next(&mut self) -> Option<Message>;
-}
-
-#[async_trait]
-#[automock]
-pub trait MqttErrorStream: Send + Sync {
- async fn next(&mut self) -> Option<Arc<MqttClientError>>;
-}
-
-/// A connection to the local MQTT bus.
-///
-/// The host and port are implied: a connection can only be open on the localhost, port 1883.
-///
-/// ```no_run
-/// use mqtt_client::{Config,Message,MqttClient,Topic};
-///
-/// #[tokio::main]
-/// async fn main () {
-/// let mut mqtt = Config::default().connect("temperature").await.unwrap();
-/// let c8y_msg = Topic::new("c8y/s/us").unwrap();
-/// mqtt.publish(Message::new(&c8y_msg, "211,23")).await.unwrap();
-/// mqtt.disconnect().await.unwrap();
-/// }
-/// ```
-#[derive(Debug)]
-pub struct Client {
- name: String,
- mqtt_client: rumqttc::AsyncClient,
- message_sender: broadcast::Sender<Message>,
- error_sender: broadcast::Sender<Arc<MqttClientError>>,
- join_handle: tokio::task::JoinHandle<()>,
- requests_tx: rumqttc::Sender<Request>,
- inflight: Arc<InflightTracking>,
-}
-
-/// Tracks the number of inflight / pending publish requests.
-#[derive(Debug)]
-struct InflightTracking {
- /// Tracks number of pending publish message until they are
- /// known to be sent out by the event loop.
- pending_publish_count: AtomicIsize,
- /// Tracks number of pending puback's (not completed messages of QoS=1).
- pending_puback_count: AtomicIsize,
- /// Tracks number of pending pubcomp's (not completed messages of QoS=2).
- pending_pubcomp_count: AtomicIsize,
-
- /// Notify on the condition when all requests have completed.
- notify_completed: Notify,
-}
-
-impl InflightTracking {
- fn new() -> Self {
- Self {
- pending_publish_count: AtomicIsize::new(0),
- pending_puback_count: AtomicIsize::new(0),
- pending_pubcomp_count: AtomicIsize::new(0),
- notify_completed: Notify::new(),
- }
- }
-
- fn has_pending(&self) -> bool {
- self.pending_publish_count.load(Ordering::Relaxed) > 0
- || self.pending_puback_count.load(Ordering::Relaxed) > 0
- || self.pending_pubcomp_count.load(Ordering::Relaxed) > 0
- }
-
- /// Resolves when all pending requests have been completed.
- async fn all_completed(&self) {
- while self.has_pending() {
- // Calling `notify_one()` before `notified().await` is safe, no signal is lost.
- //
- // Nevertheless, we still use a timeout (but a larger one) to be on the safe side and
- // not risk any race condition.
- let _ = tokio::time::timeout(
- std::time::Duration::from_millis(500),
- self.notify_completed.notified(),
- )
- .await;
- }
- }
-
- fn track_publish_request(&self, qos: QoS) {
- self.pending_publish_count.fetch_add(1, Ordering::Relaxed);
- match qos {
- QoS::AtMostOnce => {}
- QoS::AtLeastOnce => {
- self.pending_puback_count.fetch_add(1, Ordering::Relaxed);
- }
- QoS::ExactlyOnce => {
- self.pending_pubcomp_count.fetch_add(1, Ordering::Relaxed);
- }
- }
- }
-
- fn track_publish_request_sentout(&self) {
- self.pending_publish_count.fetch_sub(1, Ordering::Relaxed);
- self.check_completed();
- }
-
- fn track_publish_qos1_completed(&self) {
- self.pending_puback_count.fetch_sub(1, Ordering::Relaxed);
- self.check_completed();
- }
-
- fn track_publish_qos2_completed(&self) {
- self.pending_pubcomp_count.fetch_sub(1, Ordering::Relaxed);
- self.check_completed();
- }
-
- fn check_completed(&self) {
- if !self.has_pending() {
- self.notify_completed.notify_one();
- }
- }
-}
-
-// Send a message on a broadcast channel discarding any error.
-// The send can only fail if there is no listener.
-macro_rules! send_discarding_error {
- ($sender:expr, $msg:expr) => {
- let _ = $sender.send($msg);
- };
-}
-
-/// MQTT message id
-type MessageId = u16;
-
-impl Client {
- /// Open a connection to the local MQTT bus, using the given name to register an MQTT session.
- ///
- /// Reusing the same session name on each connection allows a client
- /// to have its subscriptions persisted by the broker
- /// so messages sent while the client is disconnected
- /// will be resent on its re-connection.
- ///
- /// ```no_run
- /// use mqtt_client::{Config,Client,MqttClient,Topic};
- ///
- /// #[tokio::main]
- /// async fn main () {
- /// let c8y_cmd = Topic::new("c8y/s/ds").unwrap();
- /// let config = Config::default();
- ///
- /// let mqtt = Client::connect("temperature", &config).await.unwrap();
- /// let mut commands = mqtt.subscribe(c8y_cmd.filter()).await.unwrap();
- /// // process some commands and disconnect
- /// mqtt.disconnect().await.unwrap();
- ///
- /// // wait a while and reconnect
- /// let mqtt = Client::connect("temperature", &config).await.unwrap();
- /// let mut commands = mqtt.subscribe(c8y_cmd.filter()).await.unwrap();
- /// // process the messages even those sent during the pause
- /// }
- /// ```
- pub async fn connect(name: &str, config: &Config) -> Result<Client, MqttClientError> {
- let name = String::from(name);
- let mut mqtt_options = rumqttc::MqttOptions::new(&name, &config.host, config.port);
- mqtt_options.set_clean_session(config.clean_session);
- if let Some(inflight) = config.inflight {
- mqtt_options.set_inflight(inflight);
- }
-
- if let Some(packet_size) = config.packet_size {
- mqtt_options.set_max_packet_size(packet_size, packet_size);
- }
-
- let (mqtt_client, eventloop) =
- rumqttc::AsyncClient::new(mqtt_options, config.queue_capacity);
- let requests_tx = eventloop.requests_tx.clone();
- let (message_sender, _) = broadcast::channel(config.queue_capacity);
- let (error_sender, _) = broadcast::channel(config.queue_capacity);
-
- let inflight = Arc::new(InflightTracking::new());
-
- let join_handle = tokio::spawn(Client::bg_process(
- eventloop,
- message_sender.clone(),
- error_sender.clone(),
- inflight.clone(),
- ));
-
- Ok(Client {
- name,
- mqtt_client,
- message_sender,
- error_sender,
- join_handle,
- requests_tx,
- inflight,
- })
- }
-
- /// Returns the name used by the MQTT client.
- pub fn name(&self) -> &str {
- &self.name
- }
-
- /// Disconnect the client and drop it.
- pub async fn disconnect(self) -> Result<(), MqttClientError> {
- let () = self.mqtt_client.disconnect().await?;
- self.join_handle
- .await
- .map_err(|_| MqttClientError::JoinError)
- }
-
- /// Returns `true` if there are pending inflight messages.
- pub fn has_pending(&self) -> bool {
- self.inflight.has_pending()
- }
-
- /// Resolves when all pending requests have been completed.
- pub async fn all_completed(&self) {
- self.inflight.all_completed().await
- }
-
- /// Process all the MQTT events
- /// - broadcasting the incoming messages to the message sender,
- /// - broadcasting the errors to the error sender.
- async fn bg_process(
- mut event_loop: rumqttc::EventLoop,
- message_sender: broadcast::Sender<Message>,
- error_sender: broadcast::Sender<Arc<MqttClientError>>,
- inflight: Arc<InflightTracking>,
- ) {
- // Delay announcing a QoS=2 message to the client until we have seen a PUBREL.
- let mut pending_received_messages: HashMap<MessageId, Message> = HashMap::new();
-
- loop {
- match event_loop.poll().await {
- Ok(Event::Incoming(Packet::Publish(msg))) => {
- match msg.qos {
- QoS::AtLeastOnce | QoS::AtMostOnce => {
- send_discarding_error!(message_sender, msg.into());
- }
- QoS::ExactlyOnce => {
- // Do not announce the incoming publish message immediately in case
- // of QoS=2. Wait for the PUBREL.
-
- let _ = pending_received_messages.insert(msg.pkid, msg.into());
- }
- }
- }
-
- Ok(Event::Incoming(Packet::PubRel(pubrel))) => {
- if let Some(msg) = pending_received_messages.remove(&pubrel.pkid) {
- assert!(msg.qos == QoS::ExactlyOnce);
- send_discarding_error!(message_sender, msg);
- }
- }
-
- Ok(Event::Outgoing(Outgoing::Publish(_id))) => {
- inflight.track_publish_request_sentout();
- }
-
- Ok(Event::Incoming(Packet::PubAck(_))) => {
- // Reception of PUBACK means that a QoS=1 request completed.
- inflight.track_publish_qos1_completed();
- }
-
- Ok(Event::Incoming(Packet::PubComp(_))) => {
- // Reception of PUBCOMP means that a QoS=2 request completed.
- inflight.track_publish_qos2_completed();
- }
-
- Ok(Event::Incoming(Incoming::Disconnect))
- | Ok(Event::Outgoing(Outgoing::Disconnect)) => {
- break;
- }
-
- Err(err) => {
- let delay = match &err {
- rumqttc::ConnectionError::Io(_) => true,
- rumqttc::ConnectionError::MqttState(state_error)
- if matches!(state_error, StateError::Io(_)) =>
- {
- true
- }
- rumqttc::ConnectionError::MqttState(_) => true,
- rumqttc::ConnectionError::Mqtt4Bytes(_) => true,
- _ => false,
- };
-
- send_discarding_error!(error_sender, Arc::new(err.into()));
-
- if delay {
- tokio::time::sleep(std::time::Duration::from_secs(1)).await;
- }
- }
- _ => (),
- }
- }
- }
-}
-
-#[async_trait]
-im