summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/commit-workflow.yml2
-rw-r--r--Cargo.lock75
-rw-r--r--Cargo.toml1
-rw-r--r--common/mqtt_client/Cargo.toml4
-rw-r--r--common/mqtt_client/tests/mqtt_pub_sub_test.rs22
-rw-r--r--common/mqtt_client/tests/packet_size_tests.rs31
-rw-r--r--common/mqtt_tests/Cargo.toml15
-rw-r--r--common/mqtt_tests/src/lib.rs6
-rw-r--r--common/mqtt_tests/src/test_mqtt_client.rs170
-rw-r--r--common/mqtt_tests/src/test_mqtt_server.rs136
-rw-r--r--common/mqtt_tests/src/with_timeout.rs32
-rw-r--r--common/tedge_utils/Cargo.toml2
-rw-r--r--common/tedge_utils/src/lib.rs1
-rw-r--r--common/tedge_utils/src/test_mqtt_server.rs60
-rw-r--r--mapper/tedge_mapper/Cargo.toml2
-rw-r--r--mapper/tedge_mapper/src/mapper.rs83
-rw-r--r--mapper/tedge_mapper/src/sm_c8y_mapper/mapper.rs78
-rw-r--r--mapper/tedge_mapper/src/sm_c8y_mapper/tests.rs521
-rw-r--r--mapper/thin_edge_json/src/parser.rs4
-rw-r--r--sm/tedge_agent/Cargo.toml3
-rw-r--r--sm/tedge_agent/tests/main.rs14
-rw-r--r--tedge/Cargo.toml2
-rw-r--r--tedge/src/cli/connect/command.rs4
23 files changed, 752 insertions, 516 deletions
diff --git a/.github/workflows/commit-workflow.yml b/.github/workflows/commit-workflow.yml
index 8638f20e..6cd05994 100644
--- a/.github/workflows/commit-workflow.yml
+++ b/.github/workflows/commit-workflow.yml
@@ -70,7 +70,7 @@ jobs:
# https://github.com/marketplace/actions/rust-cargo
with:
command: test
- args: --verbose
+ args: --no-fail-fast
cargo-build:
diff --git a/Cargo.lock b/Cargo.lock
index ba3e2f12..c52c6da1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -730,6 +730,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
+name = "fastrand"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e"
+dependencies = [
+ "instant",
+]
+
+[[package]]
name = "float-cmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1407,29 +1416,33 @@ dependencies = [
"json",
"log",
"mockall",
+ "mqtt_tests",
"rand 0.8.4",
- "rumqttc 0.9.0",
+ "rumqttc",
"serde",
- "tedge_utils",
"thiserror",
"tokio",
"tokio-test",
]
[[package]]
-name = "mqttbytes"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d710573f2144656d97b82b7ad85c464a1c35ae065278fbee545d40034ec1de5a"
+name = "mqtt_tests"
+version = "0.4.1"
dependencies = [
- "bytes",
+ "anyhow",
+ "fastrand",
+ "once_cell",
+ "rumqttc",
+ "rumqttd",
+ "rumqttlog",
+ "tokio",
]
[[package]]
name = "mqttbytes"
-version = "0.5.0"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "12793a86f38eb258c5dbddf801dfd521c1d7a9def6e3a3de1ee248441c9dcc28"
+checksum = "a7bd39d24e28e1544d74ff5746e322a477e52353c8ba7adcaa83d2e760752853"
dependencies = [
"bytes",
]
@@ -2203,33 +2216,15 @@ dependencies = [
[[package]]
name = "rumqttc"
-version = "0.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd4cf48aa8588d907fc835c47fbc8ef01dc30552467fbf0123cd794f77fd2072"
-dependencies = [
- "async-channel",
- "bytes",
- "http",
- "log",
- "mqttbytes 0.4.0",
- "pollster",
- "thiserror",
- "tokio",
- "tokio-rustls",
- "webpki",
-]
-
-[[package]]
-name = "rumqttc"
-version = "0.9.0"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "613456fdab2f811204edb36f25e397cc62956c85326c274d153bc0b17c44a777"
+checksum = "e63ee9fd315db8880bf3fd3c20684dee03ca42cdd59b7d5cfdd4378f100a2aa0"
dependencies = [
"async-channel",
"bytes",
"http",
"log",
- "mqttbytes 0.5.0",
+ "mqttbytes",
"pollster",
"thiserror",
"tokio",
@@ -2239,9 +2234,9 @@ dependencies = [
[[package]]
name = "rumqttd"
-version = "0.7.0"
+version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b794b3ee690eb1029e75e54dd75440613936685748e4d97e1888080477de515d"
+checksum = "3a1b0c1bb4e345c681c0a13174ce78df48885b0855c06f6eadbad53efb8b5e3b"
dependencies = [
"argh",
"bytes",
@@ -2249,7 +2244,7 @@ dependencies = [
"futures-util",
"jemallocator",
"log",
- "mqttbytes 0.4.0",
+ "mqttbytes",
"pretty_env_logger",
"rumqttlog",
"serde",
@@ -2261,9 +2256,9 @@ dependencies = [
[[package]]
name = "rumqttlog"
-version = "0.7.0"
+version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8209adbf8b592c49b1e02d92ea5b1dd4cb3e47007a9cdb8786100ab82068c628"
+checksum = "031d0d6fc69b8f36267870724b7a28e4a9396d0b3f722f84b5631c56c0840244"
dependencies = [
"byteorder",
"bytes",
@@ -2271,7 +2266,7 @@ dependencies = [
"jackiechan",
"log",
"memmap",
- "mqttbytes 0.4.0",
+ "mqttbytes",
"segments",
"serde",
"thiserror",
@@ -2595,7 +2590,7 @@ dependencies = [
"predicates 2.0.3",
"reqwest",
"rpassword",
- "rumqttc 0.8.0",
+ "rumqttc",
"rustls",
"serde",
"structopt",
@@ -2626,10 +2621,9 @@ dependencies = [
"once_cell",
"plugin_sm",
"predicates 2.0.3",
- "rumqttd",
- "rumqttlog",
"serde",
"serde_json",
+ "serial_test",
"structopt",
"tedge_config",
"tedge_utils",
@@ -2697,6 +2691,7 @@ dependencies = [
"json_sm",
"mockall",
"mqtt_client",
+ "mqtt_tests",
"reqwest",
"serde",
"serde_json",
@@ -2729,8 +2724,6 @@ dependencies = [
"anyhow",
"assert_matches",
"futures",
- "rumqttd",
- "rumqttlog",
"tempfile",
"thiserror",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index 264c9117..fca1517c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,6 +7,7 @@ members = [
"common/flockfile",
"common/json_writer",
"common/mqtt_client",
+ "common/mqtt_tests",
"common/tedge_users",
"common/tedge_utils",
"mapper/cumulocity/c8y_translator_lib",
diff --git a/common/mqtt_client/Cargo.toml b/common/mqtt_client/Cargo.toml
index 71ac46d7..2583c9c1 100644
--- a/common/mqtt_client/Cargo.toml
+++ b/common/mqtt_client/Cargo.toml
@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
async-trait = "0.1"
mockall = "0.10"
-rumqttc = "0.9"
+rumqttc = "0.10"
thiserror = "1.0"
tokio = { version = "1.12", features = ["sync", "macros"] }
@@ -21,7 +21,7 @@ json = "0.12"
log = "0.4"
rand = "0.8"
serde = "1.0"
-tedge_utils = { path = "../../common/tedge_utils" }
+mqtt_tests = { path = "../../common/mqtt_tests" }
tokio-test = "0.4"
[features]
diff --git a/common/mqtt_client/tests/mqtt_pub_sub_test.rs b/common/mqtt_client/tests/mqtt_pub_sub_test.rs
index b6926b85..392be20d 100644
--- a/common/mqtt_client/tests/mqtt_pub_sub_test.rs
+++ b/common/mqtt_client/tests/mqtt_pub_sub_test.rs
@@ -1,30 +1,27 @@
use mqtt_client::{Client, Message, MqttClient, Topic, TopicFilter};
use std::time::Duration;
-use tedge_utils::test_mqtt_server::start_broker_local;
use tokio::time::sleep;
-const MQTTTESTPORT: u16 = 58586;
-
-#[ignore = "CIT-515"]
#[test]
fn sending_and_receiving_a_message() {
async fn scenario(payload: String) -> Result<Option<Message>, mqtt_client::MqttClientError> {
- let _mqtt_server_handle = tokio::spawn(async { start_broker_local(MQTTTESTPORT).await });
+ let broker = mqtt_tests::test_mqtt_broker();
let topic = Topic::new("test/uubpb9wyi9asi46l624f")?;
let subscriber = Client::connect(
"subscribe",
- &mqtt_client::Config::default().with_port(MQTTTESTPORT),
+ &mqtt_client::Config::default().with_port(broker.port),
)
.await?;
let mut received = subscriber.subscribe(topic.filter()).await?;
+ sleep(Duration::from_millis(1000)).await;
let message = Message::new(&topic, payload);
let publisher = Client::connect(
"publisher",
- &mqtt_client::Config::default().with_port(MQTTTESTPORT),
+ &mqtt_client::Config::default().with_port(broker.port),
)
.await?;
- let _pkid = publisher.publish(message).await?;
+ let () = publisher.publish(message).await?;
tokio::select! {
msg = received.next() => Ok(msg),
@@ -40,17 +37,15 @@ fn sending_and_receiving_a_message() {
}
}
-#[ignore = "CIT-515"]
#[tokio::test]
async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> {
// Given an MQTT broker
- let mqtt_port: u16 = 55555;
- let _mqtt_server_handle = tokio::spawn(async move { start_broker_local(mqtt_port).await });
+ let broker = mqtt_tests::test_mqtt_broker();
// And an MQTT client connected to that server
let subscriber = Client::connect(
"client_subscribing_to_many_topics",
- &mqtt_client::Config::default().with_port(mqtt_port),
+ &mqtt_client::Config::default().with_port(broker.port),
)
.await?;
@@ -62,11 +57,12 @@ async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> {
// The messages for these topics will all be received on the same message stream
let mut messages = subscriber.subscribe(topic_filter).await?;
+ sleep(Duration::from_millis(1000)).await;
// So let us create another MQTT client publishing messages.
let publisher = Client::connect(
"client_publishing_to_many_topics",
- &mqtt_client::Config::default().with_port(mqtt_port),
+ &mqtt_client::Config::default().with_port(broker.port),
)
.await?;
diff --git a/common/mqtt_client/tests/packet_size_tests.rs b/common/mqtt_client/tests/packet_size_tests.rs
index caad8004..84865b99 100644
--- a/common/mqtt_client/tests/packet_size_tests.rs
+++ b/common/mqtt_client/tests/packet_size_tests.rs
@@ -1,29 +1,26 @@
use futures::future::TryFutureExt;
use mqtt_client::{Client, Message, MqttClient, MqttClientError, QoS, Topic, TopicFilter};
use rumqttc::StateError;
-use tedge_utils::test_mqtt_server::start_broker_local;
use tokio::time::Duration;
-const MQTTTESTPORT1: u16 = 58584;
-const MQTTTESTPORT2: u16 = 58585;
-
#[derive(Debug)]
enum TestJoinError {
TestMqttClientError(MqttClientError),
ElapseTime,
}
-#[ignore = "CIT-515"]
#[tokio::test]
// This checks the mqtt packets are within the limit or not
async fn packet_size_within_limit() -> Result<(), anyhow::Error> {
// Start the local broker
- let _mqtt_server_handle = tokio::spawn(async { start_broker_local(MQTTTESTPORT1).await });
+ let broker = mqtt_tests::test_mqtt_broker();
+
// Start the subscriber
- let subscriber = tokio::spawn(async move { subscribe_until_3_messages_received().await });
+ let subscriber =
+ tokio::spawn(async move { subscribe_until_3_messages_received(broker.port).await });
// Start the publisher and publish 3 messages
- let publisher = tokio::spawn(async move { publish_3_messages().await });
+ let publisher = tokio::spawn(async move { publish_3_messages(broker.port).await });
let _ = publisher.await?;
let res = subscriber.await?;
@@ -38,15 +35,15 @@ async fn packet_size_within_limit() -> Result<(), anyhow::Error> {
}
}
-#[ignore = "CIT-515"]
#[tokio::test]
// This checks the mqtt packet size that exceeds the limit
async fn packet_size_exceeds_limit() -> Result<(), anyhow::Error> {
// Start the broker
- let _mqtt_server_handle = tokio::spawn(async { start_broker_local(MQTTTESTPORT2).await });
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_port = broker.port;
// Start the publisher and publish a message
- let publish = tokio::spawn(async { publish_big_message_wait_for_error().await });
+ let publish = tokio::spawn(async move { publish_big_message_wait_for_error(mqtt_port).await });
// if error is received then test is ok, else test should fail
let res = publish.await?;
@@ -81,11 +78,11 @@ async fn subscribe_errors(pub_client: &Client) -> Result<(), MqttClientError> {
Ok(())
}
-async fn subscribe_until_3_messages_received() -> Result<(), anyhow::Error> {
+async fn subscribe_until_3_messages_received(mqtt_port: u16) -> Result<(), anyhow::Error> {
let sub_filter = TopicFilter::new("test/hello")?;
let client = Client::connect(
"subscribe",
- &mqtt_client::Config::default().with_port(MQTTTESTPORT1),
+ &mqtt_client::Config::default().with_port(mqtt_port),
)
.await?;
let mut messages = client.subscribe(sub_filter).await?;
@@ -102,13 +99,13 @@ async fn subscribe_until_3_messages_received() -> Result<(), anyhow::Error> {
Ok(())
}
-async fn publish_3_messages() -> Result<(), anyhow::Error> {
+async fn publish_3_messages(mqtt_port: u16) -> Result<(), anyhow::Error> {
// create a 128MB message
let buffer = create_packet(134217728);
let topic = Topic::new("test/hello")?;
let client = Client::connect(
"publish_data",
- &mqtt_client::Config::default().with_port(MQTTTESTPORT1),
+ &mqtt_client::Config::default().with_port(mqtt_port),
)
.await?;
let message = Message::new(&topic, buffer.clone()).qos(QoS::AtMostOnce);
@@ -126,14 +123,14 @@ async fn publish_3_messages() -> Result<(), anyhow::Error> {
Ok(())
}
-async fn publish_big_message_wait_for_error() -> Result<(), anyhow::Error> {
+async fn publish_big_message_wait_for_error(mqtt_port: u16) -> Result<(), anyhow::Error> {
// create a 260MB message
let buffer = create_packet(272629760);
let topic = Topic::new("test/hello")?;
let publish_client = Client::connect(
"publish_big_data",
- &mqtt_client::Config::default().with_port(MQTTTESTPORT2),
+ &mqtt_client::Config::default().with_port(mqtt_port),
)
.await?;
diff --git a/common/mqtt_tests/Cargo.toml b/common/mqtt_tests/Cargo.toml
new file mode 100644
index 00000000..d60b1610
--- /dev/null
+++ b/common/mqtt_tests/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "mqtt_tests"
+version = "0.4.1"
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0"
+fastrand = "1.5"
+once_cell = "1.8"
+rumqttc = "0.10"
+rumqttd = "0.9"
+rumqttlog = "0.9"
+tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
diff --git a/common/mqtt_tests/src/lib.rs b/common/mqtt_tests/src/lib.rs
new file mode 100644
index 00000000..7aa1a51d
--- /dev/null
+++ b/common/mqtt_tests/src/lib.rs
@@ -0,0 +1,6 @@
+mod test_mqtt_client;
+pub mod test_mqtt_server;
+pub mod with_timeout;
+
+pub use test_mqtt_client::assert_received;
+pub use test_mqtt_server::test_mqtt_broker;
diff --git a/common/mqtt_tests/src/test_mqtt_client.rs b/common/mqtt_tests/src/test_mqtt_client.rs
new file mode 100644
index 00000000..2bddf691
--- /dev/null
+++ b/common/mqtt_tests/src/test_mqtt_client.rs
@@ -0,0 +1,170 @@
+use crate::with_timeout::WithTimeout;
+use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
+use std::time::Duration;
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+
+/// Returns the stream of messages received on a specific topic.
+///
+/// To ease testing, the errors are returned as messages.
+pub async fn messages_published_on(mqtt_port: u16, topic: &str) -> UnboundedReceiver<String> {
+ let (sender, recv) = tokio::sync::mpsc::unbounded_channel();
+
+ let mut con = TestCon::new(mqtt_port);
+
+ if let Err(err) = con.subscribe(topic, QoS::AtLeastOnce).await {
+ let msg = format!("Error: {:?}", err).to_string();
+ let _ = sender.send(msg);
+ return recv;
+ }
+
+ tokio::spawn(async move {
+ con.forward_received_messages(sender).await;
+ });
+
+ recv
+}
+
+/// Check that a list of messages has been received in the given order
+pub async fn assert_received<T>(
+ messages: &mut UnboundedReceiver<String>,
+ timeout: Duration,
+ expected: T,
+) where
+ T: IntoIterator,
+ T::Item: ToString,
+{
+ for expected_msg in expected.into_iter() {
+ let actual_msg = messages.recv().with_timeout(timeout).await;
+ assert_eq!(actual_msg, Ok(Some(expected_msg.to_string())));
+ }
+}
+
+/// Publish a message
+///
+/// Return only when the message has been acknowledged.
+pub async fn publish(mqtt_port: u16, topic: &str, payload: &str) -> Result<(), anyhow::Error> {
+ let mut con = TestCon::new(mqtt_port);
+
+ con.publish(topic, QoS::AtLeastOnce, payload).await
+}
+
+/// Publish the `pub_message` on the `pub_topic` only when ready to receive a message on `sub_topic`.
+///
+/// 1. Subscribe to the `sub_topic`,
+/// 2. Wait for the acknowledgment of the subscription
+/// 3 Publish the `pub_message` on the `pub_topic`,
+/// 4. Return the first received message
+/// 5. or give up after `timeout_sec` secondes.
+pub async fn wait_for_response_on_publish(
+ mqtt_port: u16,
+ pub_topic: &str,
+ pub_message: &str,
+ sub_topic: &str,
+ timeout: Duration,
+) -> Option<String> {
+ let mut con = TestCon::new(mqtt_port);
+
+ con.subscribe(sub_topic, QoS::AtLeastOnce).await.ok()?;
+ con.publish(pub_topic, QoS::AtLeastOnce, pub_message)
+ .await
+ .ok()?;
+ match tokio::time::timeout(timeout, con.next_message()).await {
+ // One collapse both timeout and error to None
+ Err(_) | Ok(Err(_)) => None,
+ Ok(Ok(x)) => Some(x),
+ }
+}
+
+pub struct TestCon {
+ client: AsyncClient,
+ eventloop: EventLoop,
+}
+
+impl TestCon {
+ pub fn new(mqtt_port: u16) -> TestCon {
+ let id: String = std::iter::repeat_with(fastrand::alphanumeric)
+ .take(10)
+ .collect();
+ let mut options = MqttOptions::new(id, "localhost", mqtt_port);
+ options.set_clean_session(true);
+
+ let (client, eventloop) = AsyncClient::new(options, 10);
+ TestCon { client, eventloop }
+ }
+
+ pub async fn subscribe(&mut self, topic: &str, qos: QoS) -> Result<(), anyhow::Error> {
+ self.client.subscribe(topic, qos).await?;
+
+ loop {
+ match self.eventloop.poll().await {
+ Ok(Event::Incoming(Packet::SubAck(_))) => {
+ return Ok(());
+ }
+ Err(err) => {
+ return Err(err)?;
+ }
+ _ => {}
+ }
+ }
+ }
+
+ pub async fn publish(
+ &mut self,
+ topic: &str,
+ qos: QoS,
+ payload: &str,
+ ) -> Result<(), anyhow::Error> {
+ self.client.publish(topic, qos, false, payload).await?;
+
+ loop {
+ match self.eventloop.poll().await {
+ Ok(Event::Incoming(Packet::PubAck(_))) => {
+ return Ok(());
+ }
+ Err(err) => {
+ return Err(err)?;
+ }
+ _ => {}
+ }
+ }
+ }
+
+ pub async fn forward_received_messages(&mut self, sender: UnboundedSender<String>) {
+ loop {
+ match self.eventloop.poll().await {
+ Ok(Event::Incoming(Packet::Publish(response))) => {
+ let msg = std::str::from_utf8(&response.payload)
+ .unwrap_or("Error: non-utf8-payload")
+ .to_string();
+ if let Err(_) = sender.send(msg) {
+ break;
+ }
+ }
+ Err(err) => {
+ let msg = format!("Error: {:?}", err).to_string();
+ let _ = sender.send(msg);
+ break;
+ }
+ _ => {}
+ }
+ }
+ let _ = self.client.disconnect().await;
+ }
+
+ pub async fn next_message(&mut self) -> Result<String, anyhow::Error> {
+ loop {
+ match self.eventloop.poll().await {
+ Ok(Event::Incoming(Packet::Publish(packet))) => {
+ let msg = std::str::from_utf8(&packet.payload)
+ .unwrap_or("Error: non-utf8-payload")
+ .to_string();
+ return Ok(msg);
+ }
+ Err(err) => {
+ return Err(err)?;
+ }
+ _ => {}
+ }
+ }
+ }
+}
diff --git a/common/mqtt_tests/src/test_mqtt_server.rs b/common/mqtt_tests/src/test_mqtt_server.rs
new file mode 100644
index 00000000..188f53ea
--- /dev/null
+++ b/common/mqtt_tests/src/test_mqtt_server.rs
@@ -0,0 +1,136 @@
+use std::{
+ collections::HashMap,
+ net::{Ipv4Addr, SocketAddr, SocketAddrV4},
+ time::Duration,
+};
+
+use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings};
+use once_cell::sync::Lazy;
+use tokio::sync::mpsc::UnboundedReceiver;
+
+const MQTT_TEST_PORT: u16 = 55555;
+
+static SERVER: Lazy<MqttProcessHandler> = Lazy::new(|| MqttProcessHandler::new(MQTT_TEST_PORT));
+
+pub fn test_mqtt_broker() -> &'static MqttProcessHandler {
+ Lazy::force(&SERVER)
+}
+
+pub struct MqttProcessHandler {
+ pub port: u16,
+}
+
+impl MqttProcessHandler {
+ pub fn new(port: u16) -> MqttProcessHandler {
+ spawn_broker(port);
+ MqttProcessHandler { port }
+ }
+
+ pub async fn publish(&self, topic: &str, payload: &str) -> Result<(), anyhow::Error> {
+ crate::test_mqtt_client::publish(self.port, topic, payload).await
+ }
+
+ pub async fn messages_published_on(&self, topic: &str) -> UnboundedReceiver<String> {
+ crate::test_mqtt_client::messages_published_on(self.port, topic).await
+ }
+
+ pub async fn wait_for_response_on_publish(
+ &self,
+ pub_topic: &str,
+ pub_message: &str,
+ sub_topic: &str,
+ timeout: Duration,
+ ) -> Option<String> {
+ crate::test_mqtt_client::wait_for_response_on_publish(
+ self.port,
+ pub_topic,
+ pub_message,
+ sub_topic,
+ timeout,
+ )
+ .await
+ }
+}
+
+fn spawn_broker(port: u16) {
+ let config = get_rumqttd_config(port);
+ let mut broker = Broker::new(config);
+ let mut tx = broker.link("localclient").unwrap();
+
+ std::thread::spawn(move || {
+ eprintln!("MQTT-TEST INFO: start test MQTT broker (port = {})", port);
+ if let Err(err) = broker.start() {
+ eprintln!(
+ "MQTT-TEST ERROR: fail to start the test MQTT broker: {:?}",
+ err
+ );
+ }
+ });
+
+ std::thread::spawn(move || {
+ let mut rx = tx.connect(200).unwrap();
+ tx.subscribe("#").unwrap();
+
+ loop {
+ if let Some(message) = rx.recv().unwrap() {
+ for chunk in message.payload.into_iter() {
+ let mut bytes: Vec<u8> = vec![];
+ for byte in chunk.into_iter() {
+ bytes.push(byte);
+ }
+ let payload = match std::str::from_utf8(bytes.as_ref()) {
+ Ok(payload) => format!("{:.60}", payload),
+ Err(_) => format!("Non uft8 ({} bytes)", bytes.len()),
+ };
+ eprintln!(
+ "MQTT-TEST MSG: topic = {}, payload = {:?}",
+ message.topic, payload
+ );
+ }
+ }
+ }
+ });
+}
+
+fn get_rumqttd_config(port: u16) -> Config {
+ let router_config = rumqttlog::Config {
+ id: 0,
+ dir: "/tmp/rumqttd".into(),
+ max_segment_size: 10240,
+ max_segment_count: 10,
+ max_connections: 10,
+ };
+
+ let connections_settings = ConnectionSettings {
+ connection_timeout_ms: 1,
+ max_client_id_len: 256,
+ throttle_delay_ms: 0,
+ max_payload_size: 268435455,
+ max_inflight_count: 200,
+ max_inflight_size: 1024,
+ login_credentials: None,
+ };
+
+ let server_config = ServerSettings {
+ listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)),
+ cert: None,
+ next_connection_delay_ms: 1,
+ connections: connections_settings,
+ };
+
+ let mut servers = HashMap::new();
+ servers.insert("1".to_string(), server_config);
+
+ let console_settings = ConsoleSettings {
+ listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030)),
+ };
+
+ librumqttd::Config {
+ id: 0,
+ router: router_config,
+ servers,
+ cluster: None,
+ replicator: None,
+ console: console_settings,
+ }
+}
diff --git a/common/mqtt_tests/src/with_timeout.rs b/common/mqtt_tests/src/with_timeout.rs
new file mode 100644
index 00000000..db109807
--- /dev/null
+++ b/common/mqtt_tests/src/with_timeout.rs
@@ -0,0 +1,32 @@
+use core::future::Future;
+use tokio::time::timeout;
+use tokio::time::{Duration, Timeout};
+
+pub trait WithTimeout<T>
+where
+ T: Future,
+{
+ fn with_timeout(self, duration: Duration) -> Timeout<T>;
+}
+
+impl<F> WithTimeout<F> for F
+where
+ F: Future,
+{
+ fn with_timeout(self, duration: Duration) -> Timeout<F> {
+ timeout(duration, self)
+ }
+}
+
+pub trait Maybe<T> {
+ fn expect_or(self, msg: &str) -> T;
+}
+
+impl<T, E> Maybe<T> for Result<Option<T>, E> {
+ fn expect_or(self, msg: &str) -> T {
+ match self {
+ Ok(Some(x)) => x,
+ Err(_) | Ok(None) => panic!("{}", msg),
+ }
+ }
+}
diff --git a/common/tedge_utils/Cargo.toml b/common/tedge_utils/Cargo.toml
index c87cff4d..dc41e024 100644
--- a/common/tedge_utils/Cargo.toml
+++ b/common/tedge_utils/Cargo.toml
@@ -14,8 +14,6 @@ logging = ["tracing", "tracing-subscriber"]
[dependencies]
anyhow = "1.0"
futures = "0.3"
-rumqttd = "0.7"
-rumqttlog = "0.7"
tempfile = "3.2"
thiserror = "1.0"
tokio = { version = "1.12", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
diff --git a/common/tedge_utils/src/lib.rs b/common/tedge_utils/src/lib.rs
index 79e4bb8d..ea37e166 100644
--- a/common/tedge_utils/src/lib.rs
+++ b/common/tedge