summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@free.fr>2022-02-08 16:22:34 +0000
committerDidier Wenzek <didier.wenzek@free.fr>2022-02-08 16:22:34 +0000
commit61fa89f98a1291bea7a7a0482493c00d7124fd0a (patch)
treed44f65aecfd20f4890410ba28c0691e9af306261 /crates/core/tedge_mapper
parent88062f5008da779c76d1f747922ddb0ab651c3bb (diff)
Use `futures` channels instead of `tokio` channels
The motivation is to ease testing with the `StreamExt` extension Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs13
-rw-r--r--crates/core/tedge_mapper/src/tests.rs15
2 files changed, 15 insertions, 13 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
index 8c3d225b..4ae00e6b 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
@@ -8,6 +8,7 @@ use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
use mqtt_channel::{Connection, TopicFilter};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use mqtt_tests::with_timeout::{Maybe, WithTimeout};
+use mqtt_tests::StreamExt;
use serial_test::serial;
use std::time::Duration;
use tokio::task::JoinHandle;
@@ -29,7 +30,7 @@ async fn mapper_publishes_a_software_list_request() {
// Expect on `tedge/commands/req/software/list` a software list request.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -91,7 +92,7 @@ async fn mapper_publishes_software_update_request() {
// Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -134,7 +135,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// Expect `501` smartrest message on `c8y/s/us`.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -157,7 +158,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// Expect `503` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -209,7 +210,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
// `502` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -272,7 +273,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
// Wait for the request being published by the mapper on `tedge/commands/req/software/update`.
let msg = requests
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
diff --git a/crates/core/tedge_mapper/src/tests.rs b/crates/core/tedge_mapper/src/tests.rs
index 35da6fe5..937cfa38 100644
--- a/crates/core/tedge_mapper/src/tests.rs
+++ b/crates/core/tedge_mapper/src/tests.rs
@@ -1,6 +1,7 @@
use std::time::Duration;
use mqtt_tests::with_timeout::{Maybe, WithTimeout};
+use mqtt_tests::StreamExt;
use serial_test::serial;
use tokio::task::JoinHandle;
@@ -31,7 +32,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
.unwrap();
let mut msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -41,7 +42,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
if msg.contains("114") {
// Fetch the next message which should be the alarm
msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -86,7 +87,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
.unwrap();
let mut msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -96,7 +97,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
if msg.contains("114") {
// Fetch the next message which should be the alarm
msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -135,7 +136,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
let _ = start_c8y_mapper(broker.port).await.unwrap();
let mut msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -145,7 +146,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
if msg.contains("114") {
// Fetch the next message which should be the alarm
msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -155,7 +156,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
// Ignored until the rumqttd broker bug that doesn't handle empty retained messages
// Expect the previously missed clear temperature alarm message
// let msg = messages
- // .recv()
+ // .next()
// .with_timeout(ALARM_SYNC_TIMEOUT_MS)
// .await
// .expect_or("No message received after a second.");