summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs13
-rw-r--r--crates/core/tedge_mapper/src/tests.rs15
-rw-r--r--crates/tests/mqtt_tests/Cargo.toml2
-rw-r--r--crates/tests/mqtt_tests/src/lib.rs1
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_client.rs13
-rw-r--r--crates/tests/mqtt_tests/src/test_mqtt_server.rs2
6 files changed, 25 insertions, 21 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
index 8c3d225b..4ae00e6b 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
@@ -8,6 +8,7 @@ use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
use mqtt_channel::{Connection, TopicFilter};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use mqtt_tests::with_timeout::{Maybe, WithTimeout};
+use mqtt_tests::StreamExt;
use serial_test::serial;
use std::time::Duration;
use tokio::task::JoinHandle;
@@ -29,7 +30,7 @@ async fn mapper_publishes_a_software_list_request() {
// Expect on `tedge/commands/req/software/list` a software list request.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -91,7 +92,7 @@ async fn mapper_publishes_software_update_request() {
// Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -134,7 +135,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// Expect `501` smartrest message on `c8y/s/us`.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -157,7 +158,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// Expect `503` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -209,7 +210,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
// `502` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails.
let msg = messages
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
@@ -272,7 +273,7 @@ async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result
// Wait for the request being published by the mapper on `tedge/commands/req/software/update`.
let msg = requests
- .recv()
+ .next()
.with_timeout(TEST_TIMEOUT_MS)
.await
.expect_or("No message received after a second.");
diff --git a/crates/core/tedge_mapper/src/tests.rs b/crates/core/tedge_mapper/src/tests.rs
index 35da6fe5..937cfa38 100644
--- a/crates/core/tedge_mapper/src/tests.rs
+++ b/crates/core/tedge_mapper/src/tests.rs
@@ -1,6 +1,7 @@
use std::time::Duration;
use mqtt_tests::with_timeout::{Maybe, WithTimeout};
+use mqtt_tests::StreamExt;
use serial_test::serial;
use tokio::task::JoinHandle;
@@ -31,7 +32,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
.unwrap();
let mut msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -41,7 +42,7 @@ async fn c8y_mapper_alarm_mapping_to_smartrest() {
if msg.contains("114") {
// Fetch the next message which should be the alarm
msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -86,7 +87,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
.unwrap();
let mut msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -96,7 +97,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
if msg.contains("114") {
// Fetch the next message which should be the alarm
msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -135,7 +136,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
let _ = start_c8y_mapper(broker.port).await.unwrap();
let mut msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -145,7 +146,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
if msg.contains("114") {
// Fetch the next message which should be the alarm
msg = messages
- .recv()
+ .next()
.with_timeout(ALARM_SYNC_TIMEOUT_MS)
.await
.expect_or("No message received before timeout");
@@ -155,7 +156,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
// Ignored until the rumqttd broker bug that doesn't handle empty retained messages
// Expect the previously missed clear temperature alarm message
// let msg = messages
- // .recv()
+ // .next()
// .with_timeout(ALARM_SYNC_TIMEOUT_MS)
// .await
// .expect_or("No message received after a second.");
diff --git a/crates/tests/mqtt_tests/Cargo.toml b/crates/tests/mqtt_tests/Cargo.toml
index da7a9de8..bb8f116b 100644
--- a/crates/tests/mqtt_tests/Cargo.toml
+++ b/crates/tests/mqtt_tests/Cargo.toml
@@ -14,4 +14,4 @@ once_cell = "1.8"
rumqttc = "0.10"
rumqttd = "0.9"
rumqttlog = "0.9"
-tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
+tokio = { version = "1.9", default_features = false, features = [] }
diff --git a/crates/tests/mqtt_tests/src/lib.rs b/crates/tests/mqtt_tests/src/lib.rs
index cd7980c0..9144dd4e 100644
--- a/crates/tests/mqtt_tests/src/lib.rs
+++ b/crates/tests/mqtt_tests/src/lib.rs
@@ -3,6 +3,7 @@ mod test_mqtt_client;
pub mod test_mqtt_server;
pub mod with_timeout;
+pub use futures::{SinkExt, StreamExt};
pub use message_streams::*;
pub use test_mqtt_client::assert_received;
pub use test_mqtt_client::publish;
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
index b4fe0fa4..9d0909cb 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs
@@ -1,13 +1,14 @@
use crate::with_timeout::WithTimeout;
+use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use futures::{SinkExt, StreamExt};
use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
use std::time::Duration;
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
/// Returns the stream of messages received on a specific topic.
///
/// To ease testing, the errors are returned as messages.
pub async fn messages_published_on(mqtt_port: u16, topic: &str) -> UnboundedReceiver<String> {
- let (sender, recv) = tokio::sync::mpsc::unbounded_channel();
+ let (mut sender, recv) = futures::channel::mpsc::unbounded();
// One can have a connection error if this is called just after the broker starts
// So try to subscribe again after a first error
@@ -46,7 +47,7 @@ pub async fn assert_received<T>(
T::Item: ToString,
{
for expected_msg in expected.into_iter() {
- let actual_msg = messages.recv().with_timeout(timeout).await;
+ let actual_msg = messages.next().with_timeout(timeout).await;
assert_eq!(actual_msg, Ok(Some(expected_msg.to_string())));
}
}
@@ -167,20 +168,20 @@ impl TestCon {
}
}
- pub async fn forward_received_messages(&mut self, sender: UnboundedSender<String>) {
+ pub async fn forward_received_messages(&mut self, mut sender: UnboundedSender<String>) {
loop {
match self.eventloop.poll().await {
Ok(Event::Incoming(Packet::Publish(response))) => {
let msg = std::str::from_utf8(&response.payload)
.unwrap_or("Error: non-utf8-payload")
.to_string();
- if let Err(_) = sender.send(msg) {
+ if let Err(_) = sender.send(msg).await {
break;
}
}
Err(err) => {
let msg = format!("Error: {:?}", err).to_string();
- let _ = sender.send(msg);
+ let _ = sender.send(msg).await;
break;
}
_ => {}
diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
index 5ad35f90..e469f8d2 100644
--- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs
+++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs
@@ -4,10 +4,10 @@ use std::{
time::Duration,
};
+use futures::channel::mpsc::UnboundedReceiver;
use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings};
use once_cell::sync::Lazy;
use rumqttc::QoS;
-use tokio::sync::mpsc::UnboundedReceiver;
const MQTT_TEST_PORT: u16 = 55555;