summaryrefslogtreecommitdiffstats
path: root/crates/tests
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@acidalie.com>2022-01-19 15:21:04 +0000
committerGitHub <noreply@github.com>2022-01-19 15:21:04 +0000
commit7b559af54b5413f987b1a9bd9747ad6393c095da (patch)
treeb390280494af330af7f41ca10af52781b395ac28 /crates/tests
parent0f38421fb78058552e8b477f431abfc24c61e48b (diff)
Refactoring the MQTT client API (#575)
* Basic data types for MQTT channels * Mqtt connection * Mqtt-channel: subscribe to topics * Mqtt-channel: publish messages Signed-off-by: Wenzek <diw@softwareag.com> * Mqtt-channel: an MQTT client as no direct dependency to MQTT Signed-off-by: Wenzek <diw@softwareag.com> * Mqtt-chanels: no message lost on reconnect Signed-off-by: Wenzek <diw@softwareag.com> * Cargo fmt Signed-off-by: Wenzek <diw@softwareag.com> * Merge with child device management * Remove the dependency to the `async-broadcast` and `async-channel` crates * Moving the helper functions to the `mqtt_tests` crate * Ensure an MQTT client can be tested without an MQTT broker * Removing dependency on the mqtt_client crate * Use the new `mqtt_channel` crate in the `sm-c8y` mapper This actually fixes the bug #570 * [570] Cargo fmt * Remove the dependency of the collectd mapper on the `mqtt_client` crate Signed-off-by: Wenzek <diw@softwareag.com> * Make configurable the maximum size of a message Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Fix typo in comment Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Use mqtt_channel instead of mqtt_client in tedge_agent Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Improve doc comments Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Add error case on closed channel Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Add PubChannel & SubChannel traits Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Cargo fmt Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Revert erroneously added changes Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Add a channel for errors Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Log MQTT connection errors Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Fix HTTP_proxy port Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Remove unused code Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Improve tests Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Move session configuration from Connection to Config * Set max_packet_size default to 1M Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Cargo fmt Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> * Fix broken tests (on missing operation directory) Signed-off-by: Didier Wenzek <didier.wenzek@free.fr> Co-authored-by: Wenzek <diw@softwareag.com> Co-authored-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/tests')
-rw-r--r--crates/tests/mqtt_tests/Cargo.toml1
-rw-r--r--crates/tests/mqtt_tests/src/lib.rs5
-rw-r--r--crates/tests/mqtt_tests/src/message_streams.rs66
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_client.rs23
4 files changed, 89 insertions, 6 deletions
diff --git a/crates/tests/mqtt_tests/Cargo.toml b/crates/tests/mqtt_tests/Cargo.toml
index 4c27514e..936cf91c 100644
--- a/crates/tests/mqtt_tests/Cargo.toml
+++ b/crates/tests/mqtt_tests/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
anyhow = "1.0"
fastrand = "1.5"
+futures = "0.3"
once_cell = "1.8"
rumqttc = "0.10"
rumqttd = "0.9"
diff --git a/crates/tests/mqtt_tests/src/lib.rs b/crates/tests/mqtt_tests/src/lib.rs
index e618111f..cd7980c0 100644
--- a/crates/tests/mqtt_tests/src/lib.rs
+++ b/crates/tests/mqtt_tests/src/lib.rs
@@ -1,6 +1,9 @@
+mod message_streams;
mod test_mqtt_client;
pub mod test_mqtt_server;
pub mod with_timeout;
-pub use test_mqtt_client::{assert_received, publish};
+pub use message_streams::*;
+pub use test_mqtt_client::assert_received;
+pub use test_mqtt_client::publish;
pub use test_mqtt_server::test_mqtt_broker;
diff --git a/crates/tests/mqtt_tests/src/message_streams.rs b/crates/tests/mqtt_tests/src/message_streams.rs
new file mode 100644
index 00000000..b71760c7
--- /dev/null
+++ b/crates/tests/mqtt_tests/src/message_streams.rs
@@ -0,0 +1,66 @@
+use futures::channel::mpsc;
+use futures::SinkExt;
+use futures::StreamExt;
+
+/// A `Sink` of `T` that populates a vector of `T`s on closed.
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::SinkExt;
+///
+/// let (output, mut output_sink) = mqtt_tests::output_stream();
+/// tokio::spawn(async move {
+/// output_sink.send(1).await;
+/// output_sink.send(2).await;
+/// output_sink.send(3).await;
+/// });
+/// assert_eq!(vec![1,2,3], output.collect().await);
+/// # }
+/// ```
+pub fn output_stream<T>() -> (MessageOutputStream<T>, mpsc::UnboundedSender<T>) {
+ let (sender, receiver) = mpsc::unbounded();
+ let recorder = MessageOutputStream { receiver };
+ (recorder, sender)
+}
+
+pub struct MessageOutputStream<T> {
+ receiver: mpsc::UnboundedReceiver<T>,
+}
+
+impl<T> MessageOutputStream<T> {
+ pub async fn collect(mut self) -> Vec<T> {
+ let mut result = vec![];
+ while let Some(item) = self.receiver.next().await {
+ result.push(item);
+ }
+ result
+ }
+}
+
+/// A `Stream` of `T` that is populated using a vector of `T` samples.
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::StreamExt;
+///
+/// let mut input_stream = mqtt_tests::input_stream(vec![
+/// 1,
+/// 2,
+/// 3,
+/// ]).await;
+///
+/// assert_eq!(Some(1), input_stream.next().await);
+/// assert_eq!(Some(2), input_stream.next().await);
+/// assert_eq!(Some(3), input_stream.next().await);
+/// assert_eq!(None, input_stream.next().await);
+/// # }
+/// ```
+pub async fn input_stream<T>(items: Vec<T>) -> mpsc::UnboundedReceiver<T> {
+ let (mut sender, receiver) = mpsc::unbounded();
+ for item in items {
+ let _ = sender.send(item).await;
+ }
+ receiver
+}
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
index 2d22598b..7efa73a5 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
@@ -9,12 +9,24 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
pub async fn messages_published_on(mqtt_port: u16, topic: &str) -> UnboundedReceiver<String> {
let (sender, recv) = tokio::sync::mpsc::unbounded_channel();
+ // One can have a connection error if this is called just after the broker starts
+ // So try to subscribe again after a first error
let mut con = TestCon::new(mqtt_port);
-
- if let Err(err) = con.subscribe(topic, QoS::AtLeastOnce).await {
- let msg = format!("Error: {:?}", err).to_string();
- let _ = sender.send(msg);
- return recv;
+ let mut retry = 1;
+ loop {
+ match con.subscribe(topic, QoS::AtLeastOnce).await {
+ Ok(()) => break,
+ Err(_) if retry > 0 => {
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ retry -= 1;
+ continue;
+ }
+ Err(err) => {
+ let msg = format!("Error: {:?}", err).to_string();
+ let _ = sender.send(msg);
+ return recv;
+ }
+ }
}
tokio::spawn(async move {
@@ -86,6 +98,7 @@ where
loop {
if let Ok(message) = con.next_topic_payload().await {
+ dbg!(&message);
for (topic, response) in func(message).iter() {
let _ = con.publish(topic, QoS::AtLeastOnce, response).await;
}