From 61fa89f98a1291bea7a7a0482493c00d7124fd0a Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 8 Feb 2022 16:22:34 +0000 Subject: Use `futures` channels instead of `tokio` channels The motivation is to ease testing with the `StreamExt` extension Signed-off-by: Didier Wenzek --- crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 13 +++++++------ crates/core/tedge_mapper/src/tests.rs | 15 ++++++++------- 2 files changed, 15 insertions(+), 13 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs index 8c3d225b..4ae00e6b 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs @@ -8,6 +8,7 @@ use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; use mqtt_channel::{Connection, TopicFilter}; use mqtt_tests::test_mqtt_server::MqttProcessHandler; use mqtt_tests::with_timeout::{Maybe, WithTimeout}; +use mqtt_tests::StreamExt; use serial_test::serial; use std::time::Duration; use tokio::task::JoinHandle; @@ -29,7 +30,7 @@ async fn mapper_publishes_a_software_list_request() { // Expect on `tedge/commands/req/software/list` a software list request. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -91,7 +92,7 @@ async fn mapper_publishes_software_update_request() { // Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -134,7 +135,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { // Expect `501` smartrest message on `c8y/s/us`. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -157,7 +158,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { // Expect `503` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -209,7 +210,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { // `502` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -272,7 +273,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result // Wait for the request being published by the mapper on `tedge/commands/req/software/update`. let msg = requests - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); diff --git a/crates/core/tedge_mapper/src/tests.rs b/crates/core/tedge_mapper/src/tests.rs index 35da6fe5..937cfa38 100644 --- a/crates/core/tedge_mapper/src/tests.rs +++ b/crates/core/tedge_mapper/src/tests.rs @@ -1,6 +1,7 @@ use std::time::Duration; use mqtt_tests::with_timeout::{Maybe, WithTimeout}; +use mqtt_tests::StreamExt; use serial_test::serial; use tokio::task::JoinHandle; @@ -31,7 +32,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { .unwrap(); let mut msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -41,7 +42,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { if msg.contains("114") { // Fetch the next message which should be the alarm msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -86,7 +87,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { .unwrap(); let mut msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -96,7 +97,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { if msg.contains("114") { // Fetch the next message which should be the alarm msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -135,7 +136,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { let _ = start_c8y_mapper(broker.port).await.unwrap(); let mut msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -145,7 +146,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { if msg.contains("114") { // Fetch the next message which should be the alarm msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -155,7 +156,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { // Ignored until the rumqttd broker bug that doesn't handle empty retained messages // Expect the previously missed clear temperature alarm message // let msg = messages - // .recv() + // .next() // .with_timeout(ALARM_SYNC_TIMEOUT_MS) // .await // .expect_or("No message received after a second."); -- cgit v1.2.3