summaryrefslogtreecommitdiffstats
path: root/crates/tests/mqtt_tests/src/test_mqtt_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/tests/mqtt_tests/src/test_mqtt_client.rs')
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_client.rs36
1 files changed, 36 insertions, 0 deletions
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
index 2bddf691..2d22598b 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
@@ -75,6 +75,24 @@ pub async fn wait_for_response_on_publish(
}
}
+pub async fn map_messages_loop<F>(mqtt_port: u16, func: F)
+where
+ F: Send + Sync + Fn((String, String)) -> Vec<(String, String)>,
+{
+ let mut con = TestCon::new(mqtt_port);
+ con.subscribe("#", QoS::AtLeastOnce)
+ .await
+ .expect("Fail to subscribe on #");
+
+ loop {
+ if let Ok(message) = con.next_topic_payload().await {
+ for (topic, response) in func(message).iter() {
+ let _ = con.publish(topic, QoS::AtLeastOnce, response).await;
+ }
+ }
+ }
+}
+
pub struct TestCon {
client: AsyncClient,
eventloop: EventLoop,
@@ -167,4 +185,22 @@ impl TestCon {
}
}
}
+
+ pub async fn next_topic_payload(&mut self) -> Result<(String, String), anyhow::Error> {
+ loop {
+ match self.eventloop.poll().await {
+ Ok(Event::Incoming(Packet::Publish(packet))) => {
+ let topic = packet.topic.clone();
+ let msg = std::str::from_utf8(&packet.payload)
+ .unwrap_or("Error: non-utf8-payload")
+ .to_string();
+ return Ok((topic, msg));
+ }
+ Err(err) => {
+ return Err(err)?;
+ }
+ _ => {}
+ }
+ }
+ }
}