summaryrefslogtreecommitdiffstats
path: root/crates/tests
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-02-04 21:32:32 +0530
committerGitHub <noreply@github.com>2022-02-04 21:32:32 +0530
commitc15cfc4bee97473411ed173903f808c1132b8d8c (patch)
treeeb9c1337c25b81764ede6ad1834705c62ae72959 /crates/tests
parent43bf45a3bd868586afe83a645be67f5042d9ac5d (diff)
[#735] Detect and reconcile alarms updated while mapper was down (#791)
* [#735] Detect and reconcile cleared alarms while mapper was down on its restart * Rust integration test for C8Y mapper * Rust integration test for alarm syncing on startup * New publish apis in mqtt_test crate that supports retain flag and QoS * Refactor alarm conversion logic to dedicated AlarmConverter enum
Diffstat (limited to 'crates/tests')
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_client.rs17
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_server.rs13
2 files changed, 24 insertions, 6 deletions
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
index 7efa73a5..b4fe0fa4 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
@@ -54,10 +54,16 @@ pub async fn assert_received<T>(
/// Publish a message
///
/// Return only when the message has been acknowledged.
-pub async fn publish(mqtt_port: u16, topic: &str, payload: &str) -> Result<(), anyhow::Error> {
+pub async fn publish(
+ mqtt_port: u16,
+ topic: &str,
+ payload: &str,
+ qos: QoS,
+ retain: bool,
+) -> Result<(), anyhow::Error> {
let mut con = TestCon::new(mqtt_port);
- con.publish(topic, QoS::AtLeastOnce, payload).await
+ con.publish(topic, qos, retain, payload).await
}
/// Publish the `pub_message` on the `pub_topic` only when ready to receive a message on `sub_topic`.
@@ -77,7 +83,7 @@ pub async fn wait_for_response_on_publish(
let mut con = TestCon::new(mqtt_port);
con.subscribe(sub_topic, QoS::AtLeastOnce).await.ok()?;
- con.publish(pub_topic, QoS::AtLeastOnce, pub_message)
+ con.publish(pub_topic, QoS::AtLeastOnce, false, pub_message)
.await
.ok()?;
match tokio::time::timeout(timeout, con.next_message()).await {
@@ -100,7 +106,7 @@ where
if let Ok(message) = con.next_topic_payload().await {
dbg!(&message);
for (topic, response) in func(message).iter() {
- let _ = con.publish(topic, QoS::AtLeastOnce, response).await;
+ let _ = con.publish(topic, QoS::AtLeastOnce, false, response).await;
}
}
}
@@ -143,9 +149,10 @@ impl TestCon {
&mut self,
topic: &str,
qos: QoS,
+ retain: bool,
payload: &str,
) -> Result<(), anyhow::Error> {
- self.client.publish(topic, qos, false, payload).await?;
+ self.client.publish(topic, qos, retain, payload).await?;
loop {
match self.eventloop.poll().await {
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
index 0997c181..5ad35f90 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
@@ -6,6 +6,7 @@ use std::{
use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings};
use once_cell::sync::Lazy;
+use rumqttc::QoS;
use tokio::sync::mpsc::UnboundedReceiver;
const MQTT_TEST_PORT: u16 = 55555;
@@ -27,7 +28,17 @@ impl MqttProcessHandler {
}
pub async fn publish(&self, topic: &str, payload: &str) -> Result<(), anyhow::Error> {
- crate::test_mqtt_client::publish(self.port, topic, payload).await
+ crate::test_mqtt_client::publish(self.port, topic, payload, QoS::AtLeastOnce, false).await
+ }
+
+ pub async fn publish_with_opts(
+ &self,
+ topic: &str,
+ payload: &str,
+ qos: QoS,
+ retain: bool,
+ ) -> Result<(), anyhow::Error> {
+ crate::test_mqtt_client::publish(self.port, topic, payload, qos, retain).await
}
pub async fn messages_published_on(&self, topic: &str) -> UnboundedReceiver<String> {