diff options
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 13 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/tests.rs | 15 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_client.rs | 13 | ||||
-rw-r--r-- | crates/tests/mqtt_tests/src/test_mqtt_server.rs | 2 |
6 files changed, 25 insertions, 21 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs index 8c3d225b..4ae00e6b 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs @@ -8,6 +8,7 @@ use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; use mqtt_channel::{Connection, TopicFilter}; use mqtt_tests::test_mqtt_server::MqttProcessHandler; use mqtt_tests::with_timeout::{Maybe, WithTimeout}; +use mqtt_tests::StreamExt; use serial_test::serial; use std::time::Duration; use tokio::task::JoinHandle; @@ -29,7 +30,7 @@ async fn mapper_publishes_a_software_list_request() { // Expect on `tedge/commands/req/software/list` a software list request. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -91,7 +92,7 @@ async fn mapper_publishes_software_update_request() { // Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -134,7 +135,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { // Expect `501` smartrest message on `c8y/s/us`. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -157,7 +158,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { // Expect `503` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -209,7 +210,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { // `502` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails. let msg = messages - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); @@ -272,7 +273,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result // Wait for the request being published by the mapper on `tedge/commands/req/software/update`. let msg = requests - .recv() + .next() .with_timeout(TEST_TIMEOUT_MS) .await .expect_or("No message received after a second."); diff --git a/crates/core/tedge_mapper/src/tests.rs b/crates/core/tedge_mapper/src/tests.rs index 35da6fe5..937cfa38 100644 --- a/crates/core/tedge_mapper/src/tests.rs +++ b/crates/core/tedge_mapper/src/tests.rs @@ -1,6 +1,7 @@ use std::time::Duration; use mqtt_tests::with_timeout::{Maybe, WithTimeout}; +use mqtt_tests::StreamExt; use serial_test::serial; use tokio::task::JoinHandle; @@ -31,7 +32,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { .unwrap(); let mut msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -41,7 +42,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() { if msg.contains("114") { // Fetch the next message which should be the alarm msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -86,7 +87,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { .unwrap(); let mut msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -96,7 +97,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { if msg.contains("114") { // Fetch the next message which should be the alarm msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -135,7 +136,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { let _ = start_c8y_mapper(broker.port).await.unwrap(); let mut msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -145,7 +146,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { if msg.contains("114") { // Fetch the next message which should be the alarm msg = messages - .recv() + .next() .with_timeout(ALARM_SYNC_TIMEOUT_MS) .await .expect_or("No message received before timeout"); @@ -155,7 +156,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { // Ignored until the rumqttd broker bug that doesn't handle empty retained messages // Expect the previously missed clear temperature alarm message // let msg = messages - // .recv() + // .next() // .with_timeout(ALARM_SYNC_TIMEOUT_MS) // .await // .expect_or("No message received after a second."); diff --git a/crates/tests/mqtt_tests/Cargo.toml b/crates/tests/mqtt_tests/Cargo.toml index da7a9de8..bb8f116b 100644 --- a/crates/tests/mqtt_tests/Cargo.toml +++ b/crates/tests/mqtt_tests/Cargo.toml @@ -14,4 +14,4 @@ once_cell = "1.8" rumqttc = "0.10" rumqttd = "0.9" rumqttlog = "0.9" -tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } +tokio = { version = "1.9", default_features = false, features = [] } diff --git a/crates/tests/mqtt_tests/src/lib.rs b/crates/tests/mqtt_tests/src/lib.rs index cd7980c0..9144dd4e 100644 --- a/crates/tests/mqtt_tests/src/lib.rs +++ b/crates/tests/mqtt_tests/src/lib.rs @@ -3,6 +3,7 @@ mod test_mqtt_client; pub mod test_mqtt_server; pub mod with_timeout; +pub use futures::{SinkExt, StreamExt}; pub use message_streams::*; pub use test_mqtt_client::assert_received; pub use test_mqtt_client::publish; diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs index b4fe0fa4..9d0909cb 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs @@ -1,13 +1,14 @@ use crate::with_timeout::WithTimeout; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::{SinkExt, StreamExt}; use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS}; use std::time::Duration; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; /// Returns the stream of messages received on a specific topic. /// /// To ease testing, the errors are returned as messages. pub async fn messages_published_on(mqtt_port: u16, topic: &str) -> UnboundedReceiver<String> { - let (sender, recv) = tokio::sync::mpsc::unbounded_channel(); + let (mut sender, recv) = futures::channel::mpsc::unbounded(); // One can have a connection error if this is called just after the broker starts // So try to subscribe again after a first error @@ -46,7 +47,7 @@ pub async fn assert_received<T>( T::Item: ToString, { for expected_msg in expected.into_iter() { - let actual_msg = messages.recv().with_timeout(timeout).await; + let actual_msg = messages.next().with_timeout(timeout).await; assert_eq!(actual_msg, Ok(Some(expected_msg.to_string()))); } } @@ -167,20 +168,20 @@ impl TestCon { } } - pub async fn forward_received_messages(&mut self, sender: UnboundedSender<String>) { + pub async fn forward_received_messages(&mut self, mut sender: UnboundedSender<String>) { loop { match self.eventloop.poll().await { Ok(Event::Incoming(Packet::Publish(response))) => { let msg = std::str::from_utf8(&response.payload) .unwrap_or("Error: non-utf8-payload") .to_string(); - if let Err(_) = sender.send(msg) { + if let Err(_) = sender.send(msg).await { break; } } Err(err) => { let msg = format!("Error: {:?}", err).to_string(); - let _ = sender.send(msg); + let _ = sender.send(msg).await; break; } _ => {} diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs index 5ad35f90..e469f8d2 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs @@ -4,10 +4,10 @@ use std::{ time::Duration, }; +use futures::channel::mpsc::UnboundedReceiver; use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings}; use once_cell::sync::Lazy; use rumqttc::QoS; -use tokio::sync::mpsc::UnboundedReceiver; const MQTT_TEST_PORT: u16 = 55555; |