diff options
-rw-r--r-- | Cargo.lock | 5 | ||||
-rw-r--r-- | common/mqtt_client/Cargo.toml | 6 | ||||
-rw-r--r-- | common/mqtt_client/tests/mqtt_pub_sub_test.rs (renamed from common/mqtt_client/tests/integration_test.rs) | 26 | ||||
-rw-r--r-- | common/mqtt_client/tests/packet_size_tests.rs | 43 | ||||
-rw-r--r-- | common/mqtt_client/tests/rumqttd_broker.rs | 60 | ||||
-rw-r--r-- | configuration/rumqttd/rumqttd_5883.conf | 27 | ||||
-rw-r--r-- | configuration/rumqttd/rumqttd_5884.conf | 27 |
7 files changed, 100 insertions, 94 deletions
@@ -1383,7 +1383,6 @@ dependencies = [ "anyhow", "async-log", "async-trait", - "confy", "env_logger 0.9.0", "futures", "futures-timer", @@ -1534,9 +1533,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" +checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" [[package]] name = "oorandom" diff --git a/common/mqtt_client/Cargo.toml b/common/mqtt_client/Cargo.toml index ed6fa63e..93955691 100644 --- a/common/mqtt_client/Cargo.toml +++ b/common/mqtt_client/Cargo.toml @@ -14,6 +14,7 @@ mockall = "0.10" async-trait = "0.1" [dev-dependencies] +anyhow="1.0" async-log = "2.0" env_logger = "0.9" futures = "0.3" @@ -21,13 +22,10 @@ futures-timer = "3.0" json = "0.12" log = "0.4" rand = "0.8" -tokio-test = "0.4" -anyhow="1.0" rumqttd = "0.7" serde = "1.0.126" -confy = "0.4" rumqttlog = "0.7" - +tokio-test = "0.4" [features] integration-test = [] diff --git a/common/mqtt_client/tests/integration_test.rs b/common/mqtt_client/tests/mqtt_pub_sub_test.rs index cde27123..29d43e25 100644 --- a/common/mqtt_client/tests/integration_test.rs +++ b/common/mqtt_client/tests/mqtt_pub_sub_test.rs @@ -1,22 +1,30 @@ +mod rumqttd_broker; + +const MQTTTESTPORT: u16 = 58586; + #[test] -#[cfg(feature = "integration-test")] -// Requires fix for access to service on Internet which is not available in gh actions. -// Proposed to use mock server instead of using live service on the Internet. -// Run this test by calling 'cargo test --features integration-test' from the base path of the crate fn sending_and_receiving_a_message() { - use mqtt_client::{Config, Message, MqttClient, Topic}; + use mqtt_client::{Client, Message, MqttClient, Topic}; use std::time::Duration; use tokio::time::sleep; async fn scenario(payload: String) -> Result<Option<Message>, mqtt_client::MqttClientError> { - let test_broker = Config::new("test.mosquitto.org", 1883); - + let _mqtt_server_handle = + tokio::spawn(async { rumqttd_broker::start_broker_local(MQTTTESTPORT).await }); let topic = Topic::new("test/uubpb9wyi9asi46l624f")?; - let subscriber = test_broker.connect("subscriber").await?; + let subscriber = Client::connect( + "subscribe", + &mqtt_client::Config::default().with_port(MQTTTESTPORT), + ) + .await?; let mut received = subscriber.subscribe(topic.filter()).await?; let message = Message::new(&topic, payload); - let publisher = test_broker.connect("publisher").await?; + let publisher = Client::connect( + "publisher", + &mqtt_client::Config::default().with_port(MQTTTESTPORT), + ) + .await?; let _pkid = publisher.publish(message).await?; tokio::select! { diff --git a/common/mqtt_client/tests/packet_size_tests.rs b/common/mqtt_client/tests/packet_size_tests.rs index 7834a386..113bb523 100644 --- a/common/mqtt_client/tests/packet_size_tests.rs +++ b/common/mqtt_client/tests/packet_size_tests.rs @@ -1,9 +1,13 @@ -use futures::future::TryFutureExt; -use librumqttd::{async_locallink, Config}; use mqtt_client::{Client, Message, MqttClient, MqttClientError, QoS, Topic, TopicFilter}; use rumqttc::StateError; +mod rumqttd_broker; +use futures::future::TryFutureExt; use tokio::time::Duration; + +const MQTTTESTPORT1: u16 = 58584; +const MQTTTESTPORT2: u16 = 58585; + #[derive(Debug)] enum TestJoinError { TestMqttClientError(MqttClientError), @@ -14,9 +18,8 @@ enum TestJoinError { // This checks the mqtt packets are within the limit or not async fn packet_size_within_limit() -> Result<(), anyhow::Error> { // Start the local broker - let mqtt_server_handle = tokio::spawn(async { - start_broker_local("../../configuration/rumqttd/rumqttd_5883.conf").await - }); + let _mqtt_server_handle = + tokio::spawn(async { rumqttd_broker::start_broker_local(MQTTTESTPORT1).await }); // Start the subscriber let subscriber = tokio::spawn(async move { subscribe_until_3_messages_received().await }); @@ -25,7 +28,7 @@ async fn packet_size_within_limit() -> Result<(), anyhow::Error> { let _ = publisher.await?; let res = subscriber.await?; - mqtt_server_handle.abort(); + match res { Err(e) => { return Err(e); @@ -40,16 +43,14 @@ async fn packet_size_within_limit() -> Result<(), anyhow::Error> { // This checks the mqtt packet size that exceeds the limit async fn packet_size_exceeds_limit() -> Result<(), anyhow::Error> { // Start the broker - let mqtt_server_handle = tokio::spawn(async { - start_broker_local("../../configuration/rumqttd/rumqttd_5884.conf").await - }); + let _mqtt_server_handle = + tokio::spawn(async { rumqttd_broker::start_broker_local(MQTTTESTPORT2).await }); // Start the publisher and publish a message let publish = tokio::spawn(async { publish_big_message_wait_for_error().await }); // if error is received then test is ok, else test should fail let res = publish.await?; - mqtt_server_handle.abort(); match res { Err(e) => { return Err(e); @@ -81,19 +82,13 @@ async fn subscribe_errors(pub_client: &Client) -> Result<(), MqttClientError> { Ok(()) } -async fn start_broker_local(cfile: &str) -> anyhow::Result<()> { - let config: Config = confy::load_path(cfile)?; - let (mut router, _console, servers, _builder) = async_locallink::construct_broker(config); - let router = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { Ok(router.start()?) }); - servers.await; - let _ = router.await; - Ok(()) -} - async fn subscribe_until_3_messages_received() -> Result<(), anyhow::Error> { let sub_filter = TopicFilter::new("test/hello")?; - let client = - Client::connect("subscribe", &mqtt_client::Config::default().with_port(5883)).await?; + let client = Client::connect( + "subscribe", + &mqtt_client::Config::default().with_port(MQTTTESTPORT1), + ) + .await?; let mut messages = client.subscribe(sub_filter).await?; let mut cnt: i32 = 0; while let Some(_message) = messages.next().await { @@ -113,8 +108,8 @@ async fn publish_3_messages() -> Result<(), anyhow::Error> { let buffer = create_packet(134217728); let topic = Topic::new("test/hello")?; let client = Client::connect( - "publish_big_data", - &mqtt_client::Config::default().with_port(5883), + "publish_data", + &mqtt_client::Config::default().with_port(MQTTTESTPORT1), ) .await?; let message = Message::new(&topic, buffer.clone()).qos(QoS::AtMostOnce); @@ -139,7 +134,7 @@ async fn publish_big_message_wait_for_error() -> Result<(), anyhow::Error> { let topic = Topic::new("test/hello")?; let publish_client = Client::connect( "publish_big_data", - &mqtt_client::Config::default().with_port(5884), + &mqtt_client::Config::default().with_port(MQTTTESTPORT2), ) .await?; diff --git a/common/mqtt_client/tests/rumqttd_broker.rs b/common/mqtt_client/tests/rumqttd_broker.rs new file mode 100644 index 00000000..ec377741 --- /dev/null +++ b/common/mqtt_client/tests/rumqttd_broker.rs @@ -0,0 +1,60 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, +}; + +use librumqttd::{async_locallink, Config, ConnectionSettings, ConsoleSettings, ServerSettings}; + +pub async fn start_broker_local(port: u16) -> anyhow::Result<()> { + let config: Config = get_rumqttd_config(port); + let (mut router, _console, servers, _builder) = async_locallink::construct_broker(config); + let router = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { Ok(router.start()?) }); + servers.await; + let _ = router.await; + + Ok(()) +} + +fn get_rumqttd_config(port: u16) -> librumqttd::Config { + let router_config = rumqttlog::Config { + id: 0, + dir: "/tmp/rumqttd".into(), + max_segment_size: 10240, + max_segment_count: 10, + max_connections: 10, + }; + + let connections_settings = ConnectionSettings { + connection_timeout_ms: 1, + max_client_id_len: 256, + throttle_delay_ms: 0, + max_payload_size: 268435455, + max_inflight_count: 200, + max_inflight_size: 1024, + username: None, + password: None, + }; + + let server_config = ServerSettings { + listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)), + cert: None, + next_connection_delay_ms: 1, + connections: connections_settings, + }; + + let mut servers = HashMap::new(); + servers.insert("1".to_string(), server_config); + + let console_settings = ConsoleSettings { + listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030)), + }; + + librumqttd::Config { + id: 0, + router: router_config, + servers, + cluster: None, + replicator: None, + console: console_settings, + } +} diff --git a/configuration/rumqttd/rumqttd_5883.conf b/configuration/rumqttd/rumqttd_5883.conf deleted file mode 100644 index be14abf9..00000000 --- a/configuration/rumqttd/rumqttd_5883.conf +++ /dev/null @@ -1,27 +0,0 @@ -# Broker id. Used to identify local node of the replication mesh
-id = 0
-
-# A commitlog read will pull full segment. Make sure that a segment isn't
-# too big as async tcp writes readiness of one connection might affect tail
-# latencies of other connection. Not a problem with preempting runtimes
-[router]
-id = 0
-dir = "/tmp/rumqttd"
-max_segment_size = 10240
-max_segment_count = 10
-max_connections = 10
-
-# Configuration of server and connections that it accepts
-[servers.1]
-listen = "0.0.0.0:5883"
-next_connection_delay_ms = 1
- [servers.1.connections]
- connection_timeout_ms = 100
- max_client_id_len = 256
- throttle_delay_ms = 0
- max_payload_size = 268435455
- max_inflight_count = 200
- max_inflight_size = 1024
-
-[console]
-listen = "0.0.0.0:3030"
\ No newline at end of file diff --git a/configuration/rumqttd/rumqttd_5884.conf b/configuration/rumqttd/rumqttd_5884.conf deleted file mode 100644 index 95f8e6ca..00000000 --- a/configuration/rumqttd/rumqttd_5884.conf +++ /dev/null @@ -1,27 +0,0 @@ -# Broker id. Used to identify local node of the replication mesh
-id = 0
-
-# A commitlog read will pull full segment. Make sure that a segment isn't
-# too big as async tcp writes readiness of one connection might affect tail
-# latencies of other connection. Not a problem with preempting runtimes
-[router]
-id = 0
-dir = "/tmp/rumqttd"
-max_segment_size = 10240
-max_segment_count = 10
-max_connections = 10
-
-# Configuration of server and connections that it accepts
-[servers.1]
-listen = "0.0.0.0:5884"
-next_connection_delay_ms = 1
- [servers.1.connections]
- connection_timeout_ms = 100
- max_client_id_len = 256
- throttle_delay_ms = 0
- max_payload_size = 268435455
- max_inflight_count = 200
- max_inflight_size = 1024
-
-[console]
-listen = "0.0.0.0:3030"
\ No newline at end of file |