diff options
author | Didier Wenzek <didier.wenzek@acidalie.com> | 2021-11-10 13:12:38 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-10 13:12:38 +0000 |
commit | a5659a86d004b43de5961e0917595f257bf36574 (patch) | |
tree | 821e0a5af44af2a71779b830715084dce8439b88 | |
parent | 1779ee14c41ac21ad0754153cae6a69923eebcf6 (diff) |
[CIT 515] Fix the tests of the MQTT component. (#561)
* Add a test helper to except a message on MQTT
* Fix doc string
Signed-off-by: Wenzek <diw@softwareag.com>
* Move the test_mqtt_server in a dev-dependency
* The tests of the agent are not dependent of MQTT
* Working around sub ack
* Refactor the test message logger
* Add test helper functions
* Make test clearer
* Use test helper functions to simplify the tests
* Rename module
* Launch the test MQTT broker just once
* Encapsulate the port of the MQTT test broker
* Display the first 30 bytes of the messages recieved by the test MQTT broker
* Make serial the agent tests
* Upgrade to rumqtt latest
* Fix mqtt test broker log
* Reproduce the issue with lost messages when the sm_mapper is down
* Cargo fmt
* Addressing comments
* Update test
* Remove test_serial attribute
* Ignore test `tedge_agent_check_no_multiple_instances_running`
* Run commit-workflow tests with no-fail-fast
* ignore test requesting sudo
* Make helper functions method of the test broker handler
* Cargo fmt
* Wait for the mapper under test to start
Co-authored-by: Wenzek <diw@softwareag.com>
23 files changed, 752 insertions, 516 deletions
diff --git a/.github/workflows/commit-workflow.yml b/.github/workflows/commit-workflow.yml index 8638f20e..6cd05994 100644 --- a/.github/workflows/commit-workflow.yml +++ b/.github/workflows/commit-workflow.yml @@ -70,7 +70,7 @@ jobs: # https://github.com/marketplace/actions/rust-cargo with: command: test - args: --verbose + args: --no-fail-fast cargo-build: @@ -730,6 +730,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" [[package]] +name = "fastrand" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e" +dependencies = [ + "instant", +] + +[[package]] name = "float-cmp" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1407,29 +1416,33 @@ dependencies = [ "json", "log", "mockall", + "mqtt_tests", "rand 0.8.4", - "rumqttc 0.9.0", + "rumqttc", "serde", - "tedge_utils", "thiserror", "tokio", "tokio-test", ] [[package]] -name = "mqttbytes" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d710573f2144656d97b82b7ad85c464a1c35ae065278fbee545d40034ec1de5a" +name = "mqtt_tests" +version = "0.4.1" dependencies = [ - "bytes", + "anyhow", + "fastrand", + "once_cell", + "rumqttc", + "rumqttd", + "rumqttlog", + "tokio", ] [[package]] name = "mqttbytes" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12793a86f38eb258c5dbddf801dfd521c1d7a9def6e3a3de1ee248441c9dcc28" +checksum = "a7bd39d24e28e1544d74ff5746e322a477e52353c8ba7adcaa83d2e760752853" dependencies = [ "bytes", ] @@ -2203,33 +2216,15 @@ dependencies = [ [[package]] name = "rumqttc" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd4cf48aa8588d907fc835c47fbc8ef01dc30552467fbf0123cd794f77fd2072" -dependencies = [ - "async-channel", - "bytes", - "http", - "log", - "mqttbytes 0.4.0", - "pollster", - "thiserror", - "tokio", - "tokio-rustls", - "webpki", -] - -[[package]] -name = "rumqttc" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613456fdab2f811204edb36f25e397cc62956c85326c274d153bc0b17c44a777" +checksum = "e63ee9fd315db8880bf3fd3c20684dee03ca42cdd59b7d5cfdd4378f100a2aa0" dependencies = [ "async-channel", "bytes", "http", "log", - "mqttbytes 0.5.0", + "mqttbytes", "pollster", "thiserror", "tokio", @@ -2239,9 +2234,9 @@ dependencies = [ [[package]] name = "rumqttd" -version = "0.7.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b794b3ee690eb1029e75e54dd75440613936685748e4d97e1888080477de515d" +checksum = "3a1b0c1bb4e345c681c0a13174ce78df48885b0855c06f6eadbad53efb8b5e3b" dependencies = [ "argh", "bytes", @@ -2249,7 +2244,7 @@ dependencies = [ "futures-util", "jemallocator", "log", - "mqttbytes 0.4.0", + "mqttbytes", "pretty_env_logger", "rumqttlog", "serde", @@ -2261,9 +2256,9 @@ dependencies = [ [[package]] name = "rumqttlog" -version = "0.7.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8209adbf8b592c49b1e02d92ea5b1dd4cb3e47007a9cdb8786100ab82068c628" +checksum = "031d0d6fc69b8f36267870724b7a28e4a9396d0b3f722f84b5631c56c0840244" dependencies = [ "byteorder", "bytes", @@ -2271,7 +2266,7 @@ dependencies = [ "jackiechan", "log", "memmap", - "mqttbytes 0.4.0", + "mqttbytes", "segments", "serde", "thiserror", @@ -2595,7 +2590,7 @@ dependencies = [ "predicates 2.0.3", "reqwest", "rpassword", - "rumqttc 0.8.0", + "rumqttc", "rustls", "serde", "structopt", @@ -2626,10 +2621,9 @@ dependencies = [ "once_cell", "plugin_sm", "predicates 2.0.3", - "rumqttd", - "rumqttlog", "serde", "serde_json", + "serial_test", "structopt", "tedge_config", "tedge_utils", @@ -2697,6 +2691,7 @@ dependencies = [ "json_sm", "mockall", "mqtt_client", + "mqtt_tests", "reqwest", "serde", "serde_json", @@ -2729,8 +2724,6 @@ dependencies = [ "anyhow", "assert_matches", "futures", - "rumqttd", - "rumqttlog", "tempfile", "thiserror", "tokio", @@ -7,6 +7,7 @@ members = [ "common/flockfile", "common/json_writer", "common/mqtt_client", + "common/mqtt_tests", "common/tedge_users", "common/tedge_utils", "mapper/cumulocity/c8y_translator_lib", diff --git a/common/mqtt_client/Cargo.toml b/common/mqtt_client/Cargo.toml index 71ac46d7..2583c9c1 100644 --- a/common/mqtt_client/Cargo.toml +++ b/common/mqtt_client/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] async-trait = "0.1" mockall = "0.10" -rumqttc = "0.9" +rumqttc = "0.10" thiserror = "1.0" tokio = { version = "1.12", features = ["sync", "macros"] } @@ -21,7 +21,7 @@ json = "0.12" log = "0.4" rand = "0.8" serde = "1.0" -tedge_utils = { path = "../../common/tedge_utils" } +mqtt_tests = { path = "../../common/mqtt_tests" } tokio-test = "0.4" [features] diff --git a/common/mqtt_client/tests/mqtt_pub_sub_test.rs b/common/mqtt_client/tests/mqtt_pub_sub_test.rs index b6926b85..392be20d 100644 --- a/common/mqtt_client/tests/mqtt_pub_sub_test.rs +++ b/common/mqtt_client/tests/mqtt_pub_sub_test.rs @@ -1,30 +1,27 @@ use mqtt_client::{Client, Message, MqttClient, Topic, TopicFilter}; use std::time::Duration; -use tedge_utils::test_mqtt_server::start_broker_local; use tokio::time::sleep; -const MQTTTESTPORT: u16 = 58586; - -#[ignore = "CIT-515"] #[test] fn sending_and_receiving_a_message() { async fn scenario(payload: String) -> Result<Option<Message>, mqtt_client::MqttClientError> { - let _mqtt_server_handle = tokio::spawn(async { start_broker_local(MQTTTESTPORT).await }); + let broker = mqtt_tests::test_mqtt_broker(); let topic = Topic::new("test/uubpb9wyi9asi46l624f")?; let subscriber = Client::connect( "subscribe", - &mqtt_client::Config::default().with_port(MQTTTESTPORT), + &mqtt_client::Config::default().with_port(broker.port), ) .await?; let mut received = subscriber.subscribe(topic.filter()).await?; + sleep(Duration::from_millis(1000)).await; let message = Message::new(&topic, payload); let publisher = Client::connect( "publisher", - &mqtt_client::Config::default().with_port(MQTTTESTPORT), + &mqtt_client::Config::default().with_port(broker.port), ) .await?; - let _pkid = publisher.publish(message).await?; + let () = publisher.publish(message).await?; tokio::select! { msg = received.next() => Ok(msg), @@ -40,17 +37,15 @@ fn sending_and_receiving_a_message() { } } -#[ignore = "CIT-515"] #[tokio::test] async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { // Given an MQTT broker - let mqtt_port: u16 = 55555; - let _mqtt_server_handle = tokio::spawn(async move { start_broker_local(mqtt_port).await }); + let broker = mqtt_tests::test_mqtt_broker(); // And an MQTT client connected to that server let subscriber = Client::connect( "client_subscribing_to_many_topics", - &mqtt_client::Config::default().with_port(mqtt_port), + &mqtt_client::Config::default().with_port(broker.port), ) .await?; @@ -62,11 +57,12 @@ async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { // The messages for these topics will all be received on the same message stream let mut messages = subscriber.subscribe(topic_filter).await?; + sleep(Duration::from_millis(1000)).await; // So let us create another MQTT client publishing messages. let publisher = Client::connect( "client_publishing_to_many_topics", - &mqtt_client::Config::default().with_port(mqtt_port), + &mqtt_client::Config::default().with_port(broker.port), ) .await?; diff --git a/common/mqtt_client/tests/packet_size_tests.rs b/common/mqtt_client/tests/packet_size_tests.rs index caad8004..84865b99 100644 --- a/common/mqtt_client/tests/packet_size_tests.rs +++ b/common/mqtt_client/tests/packet_size_tests.rs @@ -1,29 +1,26 @@ use futures::future::TryFutureExt; use mqtt_client::{Client, Message, MqttClient, MqttClientError, QoS, Topic, TopicFilter}; use rumqttc::StateError; -use tedge_utils::test_mqtt_server::start_broker_local; use tokio::time::Duration; -const MQTTTESTPORT1: u16 = 58584; -const MQTTTESTPORT2: u16 = 58585; - #[derive(Debug)] enum TestJoinError { TestMqttClientError(MqttClientError), ElapseTime, } -#[ignore = "CIT-515"] #[tokio::test] // This checks the mqtt packets are within the limit or not async fn packet_size_within_limit() -> Result<(), anyhow::Error> { // Start the local broker - let _mqtt_server_handle = tokio::spawn(async { start_broker_local(MQTTTESTPORT1).await }); + let broker = mqtt_tests::test_mqtt_broker(); + // Start the subscriber - let subscriber = tokio::spawn(async move { subscribe_until_3_messages_received().await }); + let subscriber = + tokio::spawn(async move { subscribe_until_3_messages_received(broker.port).await }); // Start the publisher and publish 3 messages - let publisher = tokio::spawn(async move { publish_3_messages().await }); + let publisher = tokio::spawn(async move { publish_3_messages(broker.port).await }); let _ = publisher.await?; let res = subscriber.await?; @@ -38,15 +35,15 @@ async fn packet_size_within_limit() -> Result<(), anyhow::Error> { } } -#[ignore = "CIT-515"] #[tokio::test] // This checks the mqtt packet size that exceeds the limit async fn packet_size_exceeds_limit() -> Result<(), anyhow::Error> { // Start the broker - let _mqtt_server_handle = tokio::spawn(async { start_broker_local(MQTTTESTPORT2).await }); + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_port = broker.port; // Start the publisher and publish a message - let publish = tokio::spawn(async { publish_big_message_wait_for_error().await }); + let publish = tokio::spawn(async move { publish_big_message_wait_for_error(mqtt_port).await }); // if error is received then test is ok, else test should fail let res = publish.await?; @@ -81,11 +78,11 @@ async fn subscribe_errors(pub_client: &Client) -> Result<(), MqttClientError> { Ok(()) } -async fn subscribe_until_3_messages_received() -> Result<(), anyhow::Error> { +async fn subscribe_until_3_messages_received(mqtt_port: u16) -> Result<(), anyhow::Error> { let sub_filter = TopicFilter::new("test/hello")?; let client = Client::connect( "subscribe", - &mqtt_client::Config::default().with_port(MQTTTESTPORT1), + &mqtt_client::Config::default().with_port(mqtt_port), ) .await?; let mut messages = client.subscribe(sub_filter).await?; @@ -102,13 +99,13 @@ async fn subscribe_until_3_messages_received() -> Result<(), anyhow::Error> { Ok(()) } -async fn publish_3_messages() -> Result<(), anyhow::Error> { +async fn publish_3_messages(mqtt_port: u16) -> Result<(), anyhow::Error> { // create a 128MB message let buffer = create_packet(134217728); let topic = Topic::new("test/hello")?; let client = Client::connect( "publish_data", - &mqtt_client::Config::default().with_port(MQTTTESTPORT1), + &mqtt_client::Config::default().with_port(mqtt_port), ) .await?; let message = Message::new(&topic, buffer.clone()).qos(QoS::AtMostOnce); @@ -126,14 +123,14 @@ async fn publish_3_messages() -> Result<(), anyhow::Error> { Ok(()) } -async fn publish_big_message_wait_for_error() -> Result<(), anyhow::Error> { +async fn publish_big_message_wait_for_error(mqtt_port: u16) -> Result<(), anyhow::Error> { // create a 260MB message let buffer = create_packet(272629760); let topic = Topic::new("test/hello")?; let publish_client = Client::connect( "publish_big_data", - &mqtt_client::Config::default().with_port(MQTTTESTPORT2), + &mqtt_client::Config::default().with_port(mqtt_port), ) .await?; diff --git a/common/mqtt_tests/Cargo.toml b/common/mqtt_tests/Cargo.toml new file mode 100644 index 00000000..d60b1610 --- /dev/null +++ b/common/mqtt_tests/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "mqtt_tests" +version = "0.4.1" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +fastrand = "1.5" +once_cell = "1.8" +rumqttc = "0.10" +rumqttd = "0.9" +rumqttlog = "0.9" +tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } diff --git a/common/mqtt_tests/src/lib.rs b/common/mqtt_tests/src/lib.rs new file mode 100644 index 00000000..7aa1a51d --- /dev/null +++ b/common/mqtt_tests/src/lib.rs @@ -0,0 +1,6 @@ +mod test_mqtt_client; +pub mod test_mqtt_server; +pub mod with_timeout; + +pub use test_mqtt_client::assert_received; +pub use test_mqtt_server::test_mqtt_broker; diff --git a/common/mqtt_tests/src/test_mqtt_client.rs b/common/mqtt_tests/src/test_mqtt_client.rs new file mode 100644 index 00000000..2bddf691 --- /dev/null +++ b/common/mqtt_tests/src/test_mqtt_client.rs @@ -0,0 +1,170 @@ +use crate::with_timeout::WithTimeout; +use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS}; +use std::time::Duration; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +/// Returns the stream of messages received on a specific topic. +/// +/// To ease testing, the errors are returned as messages. +pub async fn messages_published_on(mqtt_port: u16, topic: &str) -> UnboundedReceiver<String> { + let (sender, recv) = tokio::sync::mpsc::unbounded_channel(); + + let mut con = TestCon::new(mqtt_port); + + if let Err(err) = con.subscribe(topic, QoS::AtLeastOnce).await { + let msg = format!("Error: {:?}", err).to_string(); + let _ = sender.send(msg); + return recv; + } + + tokio::spawn(async move { + con.forward_received_messages(sender).await; + }); + + recv +} + +/// Check that a list of messages has been received in the given order +pub async fn assert_received<T>( + messages: &mut UnboundedReceiver<String>, + timeout: Duration, + expected: T, +) where + T: IntoIterator, + T::Item: ToString, +{ + for expected_msg in expected.into_iter() { + let actual_msg = messages.recv().with_timeout(timeout).await; + assert_eq!(actual_msg, Ok(Some(expected_msg.to_string()))); + } +} + +/// Publish a message +/// +/// Return only when the message has been acknowledged. +pub async fn publish(mqtt_port: u16, topic: &str, payload: &str) -> Result<(), anyhow::Error> { + let mut con = TestCon::new(mqtt_port); + + con.publish(topic, QoS::AtLeastOnce, payload).await +} + +/// Publish the `pub_message` on the `pub_topic` only when ready to receive a message on `sub_topic`. +/// +/// 1. Subscribe to the `sub_topic`, +/// 2. Wait for the acknowledgment of the subscription +/// 3 Publish the `pub_message` on the `pub_topic`, +/// 4. Return the first received message +/// 5. or give up after `timeout_sec` secondes. +pub async fn wait_for_response_on_publish( + mqtt_port: u16, + pub_topic: &str, + pub_message: &str, + sub_topic: &str, + timeout: Duration, +) -> Option<String> { + let mut con = TestCon::new(mqtt_port); + + con.subscribe(sub_topic, QoS::AtLeastOnce).await.ok()?; + con.publish(pub_topic, QoS::AtLeastOnce, pub_message) + .await + .ok()?; + match tokio::time::timeout(timeout, con.next_message()).await { + // One collapse both timeout and error to None + Err(_) | Ok(Err(_)) => None, + Ok(Ok(x)) => Some(x), + } +} + +pub struct TestCon { + client: AsyncClient, + eventloop: EventLoop, +} + +impl TestCon { + pub fn new(mqtt_port: u16) -> TestCon { + let id: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(10) + .collect(); + let mut options = MqttOptions::new(id, "localhost", mqtt_port); + options.set_clean_session(true); + + let (client, eventloop) = AsyncClient::new(options, 10); + TestCon { client, eventloop } + } + + pub async fn subscribe(&mut self, topic: &str, qos: QoS) -> Result<(), anyhow::Error> { + self.client.subscribe(topic, qos).await?; + + loop { + match self.eventloop.poll().await { + Ok(Event::Incoming(Packet::SubAck(_))) => { + return Ok(()); + } + Err(err) => { + return Err(err)?; + } + _ => {} + } + } + } + + pub async fn publish( + &mut self, + topic: &str, + qos: QoS, + payload: &str, + ) -> Result<(), anyhow::Error> { + self.client.publish(topic, qos, false, payload).await?; + + loop { + match self.eventloop.poll().await { + Ok(Event::Incoming(Packet::PubAck(_))) => { + return Ok(()); + } + Err(err) => { + return Err(err)?; + } + _ => {} + } + } + } + + pub async fn forward_received_messages(&mut self, sender: UnboundedSender<String>) { + loop { + match self.eventloop.poll().await { + Ok(Event::Incoming(Packet::Publish(response))) => { + let msg = std::str::from_utf8(&response.payload) + .unwrap_or("Error: non-utf8-payload") + .to_string(); + if let Err(_) = sender.send(msg) { + break; + } + } + Err(err) => { + let msg = format!("Error: {:?}", err).to_string(); + let _ = sender.send(msg); + break; + } + _ => {} + } + } + let _ = self.client.disconnect().await; + } + + pub async fn next_message(&mut self) -> Result<String, anyhow::Error> { + loop { + match self.eventloop.poll().await { + Ok(Event::Incoming(Packet::Publish(packet))) => { + let msg = std::str::from_utf8(&packet.payload) + .unwrap_or("Error: non-utf8-payload") + .to_string(); + return Ok(msg); + } + Err(err) => { + return Err(err)?; + } + _ => {} + } + } + } +} diff --git a/common/mqtt_tests/src/test_mqtt_server.rs b/common/mqtt_tests/src/test_mqtt_server.rs new file mode 100644 index 00000000..188f53ea --- /dev/null +++ b/common/mqtt_tests/src/test_mqtt_server.rs @@ -0,0 +1,136 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::Duration, +}; + +use librumqttd::{Broker, Config, ConnectionSettings, ConsoleSettings, ServerSettings}; +use once_cell::sync::Lazy; +use tokio::sync::mpsc::UnboundedReceiver; + +const MQTT_TEST_PORT: u16 = 55555; + +static SERVER: Lazy<MqttProcessHandler> = Lazy::new(|| MqttProcessHandler::new(MQTT_TEST_PORT)); + +pub fn test_mqtt_broker() -> &'static MqttProcessHandler { + Lazy::force(&SERVER) +} + +pub struct MqttProcessHandler { + pub port: u16, +} + +impl MqttProcessHandler { + pub fn new(port: u16) -> MqttProcessHandler { + spawn_broker(port); + MqttProcessHandler { port } + } + + pub async fn publish(&self, topic: &str, payload: &str) -> Result<(), anyhow::Error> { + crate::test_mqtt_client::publish(self.port, topic, payload).await + } + + pub async fn messages_published_on(&self, topic: &str) -> UnboundedReceiver<String> { + crate::test_mqtt_client::messages_published_on(self.port, topic).await + } + + pub async fn wait_for_response_on_publish( + &self, + pub_topic: &str, + pub_message: &str, + sub_topic: &str, + timeout: Duration, + ) -> Option<String> { + crate::test_mqtt_client::wait_for_response_on_publish( + self.port, + pub_topic, + pub_message, + sub_topic, + timeout, + ) + .await + } +} + +fn spawn_broker(port: u16) { + let config = get_rumqttd_config(port); + let mut broker = Broker::new(config); + let mut tx = broker.link("localclient").unwrap(); + + std::thread::spawn(move || { + eprintln!("MQTT-TEST INFO: start test MQTT broker (port = {})", port); + if let Err(err) = broker.start() { + eprintln!( + "MQTT-TEST ERROR: fail to start the test MQTT broker: {:?}", + err + ); + } + }); + + std::thread::spawn(move || { + let mut rx = tx.connect(200).unwrap(); + tx.subscribe("#").unwrap(); + + loop { + if let Some(message) = rx.recv().unwrap() { + for chunk in message.payload.into_iter() { + let mut bytes: Vec<u8> = vec![]; + for byte in chunk.into_iter() { + bytes.push(byte); + } + let payload = match std::str::from_utf8(bytes.as_ref()) { + Ok(payload) => format!("{:.60}", payload), + Err(_) => format!("Non uft8 ({} bytes)", bytes.len()), + }; + eprintln!( + "MQTT-TEST MSG: topic = {}, payload = {:?}", + message.topic, payload + ); + } + } + } + }); +} + +fn get_rumqttd_config(port: u16) -> Config { + let router_config = rumqttlog::Config { + id: 0, + dir: "/tmp/rumqttd".into(), + max_segment_size: 10240, + max_segment_count: 10, + max_connections: 10, + }; + + let connections_settings = ConnectionSettings { + connection_timeout_ms: 1, + max_client_id_len: 256, + throttle_delay_ms: 0, + max_payload_size: 268435455, + max_inflight_count: 200, + max_inflight_size: 1024, + login_credentials: None, + }; + + let server_config = ServerSettings { + listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)), + cert: None, + next_connection_delay_ms: 1, + connections: connections_settings, + }; + + let mut servers = HashMap::new(); + servers.insert("1".to_string(), server_config); + + let console_settings = ConsoleSettings { + listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030)), + }; + + librumqttd::Config { + id: 0, + router: router_config, + servers, + cluster: None, + replicator: None, + console: console_settings, + } +} diff --git a/common/mqtt_tests/src/with_timeout.rs b/common/mqtt_tests/src/with_timeout.rs new file mode 100644 index 00000000..db109807 --- /dev/null +++ b/common/mqtt_tests/src/with_timeout.rs @@ -0,0 +1,32 @@ +use core::future::Future; +use tokio::time::timeout; +use tokio::time::{Duration, Timeout}; + +pub trait WithTimeout<T> +where + T: Future, +{ + fn with_timeout(self, duration: Duration) -> Timeout<T>; +} + +impl<F> WithTimeout<F> for F +wh |