summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock5
-rw-r--r--common/mqtt_client/Cargo.toml6
-rw-r--r--common/mqtt_client/tests/mqtt_pub_sub_test.rs (renamed from common/mqtt_client/tests/integration_test.rs)26
-rw-r--r--common/mqtt_client/tests/packet_size_tests.rs43
-rw-r--r--common/mqtt_client/tests/rumqttd_broker.rs60
-rw-r--r--configuration/rumqttd/rumqttd_5883.conf27
-rw-r--r--configuration/rumqttd/rumqttd_5884.conf27
7 files changed, 100 insertions, 94 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 54b4fdbc..df17cbaf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1383,7 +1383,6 @@ dependencies = [
"anyhow",
"async-log",
"async-trait",
- "confy",
"env_logger 0.9.0",
"futures",
"futures-timer",
@@ -1534,9 +1533,9 @@ dependencies = [
[[package]]
name = "once_cell"
-version = "1.7.2"
+version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
+checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "oorandom"
diff --git a/common/mqtt_client/Cargo.toml b/common/mqtt_client/Cargo.toml
index ed6fa63e..93955691 100644
--- a/common/mqtt_client/Cargo.toml
+++ b/common/mqtt_client/Cargo.toml
@@ -14,6 +14,7 @@ mockall = "0.10"
async-trait = "0.1"
[dev-dependencies]
+anyhow="1.0"
async-log = "2.0"
env_logger = "0.9"
futures = "0.3"
@@ -21,13 +22,10 @@ futures-timer = "3.0"
json = "0.12"
log = "0.4"
rand = "0.8"
-tokio-test = "0.4"
-anyhow="1.0"
rumqttd = "0.7"
serde = "1.0.126"
-confy = "0.4"
rumqttlog = "0.7"
-
+tokio-test = "0.4"
[features]
integration-test = []
diff --git a/common/mqtt_client/tests/integration_test.rs b/common/mqtt_client/tests/mqtt_pub_sub_test.rs
index cde27123..29d43e25 100644
--- a/common/mqtt_client/tests/integration_test.rs
+++ b/common/mqtt_client/tests/mqtt_pub_sub_test.rs
@@ -1,22 +1,30 @@
+mod rumqttd_broker;
+
+const MQTTTESTPORT: u16 = 58586;
+
#[test]
-#[cfg(feature = "integration-test")]
-// Requires fix for access to service on Internet which is not available in gh actions.
-// Proposed to use mock server instead of using live service on the Internet.
-// Run this test by calling 'cargo test --features integration-test' from the base path of the crate
fn sending_and_receiving_a_message() {
- use mqtt_client::{Config, Message, MqttClient, Topic};
+ use mqtt_client::{Client, Message, MqttClient, Topic};
use std::time::Duration;
use tokio::time::sleep;
async fn scenario(payload: String) -> Result<Option<Message>, mqtt_client::MqttClientError> {
- let test_broker = Config::new("test.mosquitto.org", 1883);
-
+ let _mqtt_server_handle =
+ tokio::spawn(async { rumqttd_broker::start_broker_local(MQTTTESTPORT).await });
let topic = Topic::new("test/uubpb9wyi9asi46l624f")?;
- let subscriber = test_broker.connect("subscriber").await?;
+ let subscriber = Client::connect(
+ "subscribe",
+ &mqtt_client::Config::default().with_port(MQTTTESTPORT),
+ )
+ .await?;
let mut received = subscriber.subscribe(topic.filter()).await?;
let message = Message::new(&topic, payload);
- let publisher = test_broker.connect("publisher").await?;
+ let publisher = Client::connect(
+ "publisher",
+ &mqtt_client::Config::default().with_port(MQTTTESTPORT),
+ )
+ .await?;
let _pkid = publisher.publish(message).await?;
tokio::select! {
diff --git a/common/mqtt_client/tests/packet_size_tests.rs b/common/mqtt_client/tests/packet_size_tests.rs
index 7834a386..113bb523 100644
--- a/common/mqtt_client/tests/packet_size_tests.rs
+++ b/common/mqtt_client/tests/packet_size_tests.rs
@@ -1,9 +1,13 @@
-use futures::future::TryFutureExt;
-use librumqttd::{async_locallink, Config};
use mqtt_client::{Client, Message, MqttClient, MqttClientError, QoS, Topic, TopicFilter};
use rumqttc::StateError;
+mod rumqttd_broker;
+use futures::future::TryFutureExt;
use tokio::time::Duration;
+
+const MQTTTESTPORT1: u16 = 58584;
+const MQTTTESTPORT2: u16 = 58585;
+
#[derive(Debug)]
enum TestJoinError {
TestMqttClientError(MqttClientError),
@@ -14,9 +18,8 @@ enum TestJoinError {
// This checks the mqtt packets are within the limit or not
async fn packet_size_within_limit() -> Result<(), anyhow::Error> {
// Start the local broker
- let mqtt_server_handle = tokio::spawn(async {
- start_broker_local("../../configuration/rumqttd/rumqttd_5883.conf").await
- });
+ let _mqtt_server_handle =
+ tokio::spawn(async { rumqttd_broker::start_broker_local(MQTTTESTPORT1).await });
// Start the subscriber
let subscriber = tokio::spawn(async move { subscribe_until_3_messages_received().await });
@@ -25,7 +28,7 @@ async fn packet_size_within_limit() -> Result<(), anyhow::Error> {
let _ = publisher.await?;
let res = subscriber.await?;
- mqtt_server_handle.abort();
+
match res {
Err(e) => {
return Err(e);
@@ -40,16 +43,14 @@ async fn packet_size_within_limit() -> Result<(), anyhow::Error> {
// This checks the mqtt packet size that exceeds the limit
async fn packet_size_exceeds_limit() -> Result<(), anyhow::Error> {
// Start the broker
- let mqtt_server_handle = tokio::spawn(async {
- start_broker_local("../../configuration/rumqttd/rumqttd_5884.conf").await
- });
+ let _mqtt_server_handle =
+ tokio::spawn(async { rumqttd_broker::start_broker_local(MQTTTESTPORT2).await });
// Start the publisher and publish a message
let publish = tokio::spawn(async { publish_big_message_wait_for_error().await });
// if error is received then test is ok, else test should fail
let res = publish.await?;
- mqtt_server_handle.abort();
match res {
Err(e) => {
return Err(e);
@@ -81,19 +82,13 @@ async fn subscribe_errors(pub_client: &Client) -> Result<(), MqttClientError> {
Ok(())
}
-async fn start_broker_local(cfile: &str) -> anyhow::Result<()> {
- let config: Config = confy::load_path(cfile)?;
- let (mut router, _console, servers, _builder) = async_locallink::construct_broker(config);
- let router = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { Ok(router.start()?) });
- servers.await;
- let _ = router.await;
- Ok(())
-}
-
async fn subscribe_until_3_messages_received() -> Result<(), anyhow::Error> {
let sub_filter = TopicFilter::new("test/hello")?;
- let client =
- Client::connect("subscribe", &mqtt_client::Config::default().with_port(5883)).await?;
+ let client = Client::connect(
+ "subscribe",
+ &mqtt_client::Config::default().with_port(MQTTTESTPORT1),
+ )
+ .await?;
let mut messages = client.subscribe(sub_filter).await?;
let mut cnt: i32 = 0;
while let Some(_message) = messages.next().await {
@@ -113,8 +108,8 @@ async fn publish_3_messages() -> Result<(), anyhow::Error> {
let buffer = create_packet(134217728);
let topic = Topic::new("test/hello")?;
let client = Client::connect(
- "publish_big_data",
- &mqtt_client::Config::default().with_port(5883),
+ "publish_data",
+ &mqtt_client::Config::default().with_port(MQTTTESTPORT1),
)
.await?;
let message = Message::new(&topic, buffer.clone()).qos(QoS::AtMostOnce);
@@ -139,7 +134,7 @@ async fn publish_big_message_wait_for_error() -> Result<(), anyhow::Error> {
let topic = Topic::new("test/hello")?;
let publish_client = Client::connect(
"publish_big_data",
- &mqtt_client::Config::default().with_port(5884),
+ &mqtt_client::Config::default().with_port(MQTTTESTPORT2),
)
.await?;
diff --git a/common/mqtt_client/tests/rumqttd_broker.rs b/common/mqtt_client/tests/rumqttd_broker.rs
new file mode 100644
index 00000000..ec377741
--- /dev/null
+++ b/common/mqtt_client/tests/rumqttd_broker.rs
@@ -0,0 +1,60 @@
+use std::{
+ collections::HashMap,
+ net::{Ipv4Addr, SocketAddr, SocketAddrV4},
+};
+
+use librumqttd::{async_locallink, Config, ConnectionSettings, ConsoleSettings, ServerSettings};
+
+pub async fn start_broker_local(port: u16) -> anyhow::Result<()> {
+ let config: Config = get_rumqttd_config(port);
+ let (mut router, _console, servers, _builder) = async_locallink::construct_broker(config);
+ let router = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { Ok(router.start()?) });
+ servers.await;
+ let _ = router.await;
+
+ Ok(())
+}
+
+fn get_rumqttd_config(port: u16) -> librumqttd::Config {
+ let router_config = rumqttlog::Config {
+ id: 0,
+ dir: "/tmp/rumqttd".into(),
+ max_segment_size: 10240,
+ max_segment_count: 10,
+ max_connections: 10,
+ };
+
+ let connections_settings = ConnectionSettings {
+ connection_timeout_ms: 1,
+ max_client_id_len: 256,
+ throttle_delay_ms: 0,
+ max_payload_size: 268435455,
+ max_inflight_count: 200,
+ max_inflight_size: 1024,
+ username: None,
+ password: None,
+ };
+
+ let server_config = ServerSettings {
+ listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)),
+ cert: None,
+ next_connection_delay_ms: 1,
+ connections: connections_settings,
+ };
+
+ let mut servers = HashMap::new();
+ servers.insert("1".to_string(), server_config);
+
+ let console_settings = ConsoleSettings {
+ listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030)),
+ };
+
+ librumqttd::Config {
+ id: 0,
+ router: router_config,
+ servers,
+ cluster: None,
+ replicator: None,
+ console: console_settings,
+ }
+}
diff --git a/configuration/rumqttd/rumqttd_5883.conf b/configuration/rumqttd/rumqttd_5883.conf
deleted file mode 100644
index be14abf9..00000000
--- a/configuration/rumqttd/rumqttd_5883.conf
+++ /dev/null
@@ -1,27 +0,0 @@
-# Broker id. Used to identify local node of the replication mesh
-id = 0
-
-# A commitlog read will pull full segment. Make sure that a segment isn't
-# too big as async tcp writes readiness of one connection might affect tail
-# latencies of other connection. Not a problem with preempting runtimes
-[router]
-id = 0
-dir = "/tmp/rumqttd"
-max_segment_size = 10240
-max_segment_count = 10
-max_connections = 10
-
-# Configuration of server and connections that it accepts
-[servers.1]
-listen = "0.0.0.0:5883"
-next_connection_delay_ms = 1
- [servers.1.connections]
- connection_timeout_ms = 100
- max_client_id_len = 256
- throttle_delay_ms = 0
- max_payload_size = 268435455
- max_inflight_count = 200
- max_inflight_size = 1024
-
-[console]
-listen = "0.0.0.0:3030" \ No newline at end of file
diff --git a/configuration/rumqttd/rumqttd_5884.conf b/configuration/rumqttd/rumqttd_5884.conf
deleted file mode 100644
index 95f8e6ca..00000000
--- a/configuration/rumqttd/rumqttd_5884.conf
+++ /dev/null
@@ -1,27 +0,0 @@
-# Broker id. Used to identify local node of the replication mesh
-id = 0
-
-# A commitlog read will pull full segment. Make sure that a segment isn't
-# too big as async tcp writes readiness of one connection might affect tail
-# latencies of other connection. Not a problem with preempting runtimes
-[router]
-id = 0
-dir = "/tmp/rumqttd"
-max_segment_size = 10240
-max_segment_count = 10
-max_connections = 10
-
-# Configuration of server and connections that it accepts
-[servers.1]
-listen = "0.0.0.0:5884"
-next_connection_delay_ms = 1
- [servers.1.connections]
- connection_timeout_ms = 100
- max_client_id_len = 256
- throttle_delay_ms = 0
- max_payload_size = 268435455
- max_inflight_count = 200
- max_inflight_size = 1024
-
-[console]
-listen = "0.0.0.0:3030" \ No newline at end of file