summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2022-07-18 11:30:34 +0200
committerMatthias Beyer <mail@beyermatthias.de>2023-01-02 11:50:33 +0100
commit16e62ebc7c9e1fd29c99e25bad2e9c08338295f1 (patch)
tree49522a1bf5a62e96807c1d6b45aea86cb995aab1
parent69208d0166862a13330bc705e512e89e72fd8bf0 (diff)
Add cloudmqtt test client binary
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--Cargo.toml6
-rw-r--r--src/bin/cloudmqtt-test-client.rs70
2 files changed, 75 insertions, 1 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 98c5e69..bb1cc22 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,8 +23,12 @@ required-features = ["bin"]
name = "cloudmqtt-server"
required-features = ["bin"]
+[[bin]]
+name = "cloudmqtt-test-client"
+required-features = ["bin"]
+
[features]
-bin = ["clap", "tokio/rt-multi-thread", "tokio/macros", "tracing-subscriber"]
+bin = ["clap", "tokio/rt-multi-thread", "tokio/macros", "tokio/io-std", "tracing-subscriber"]
[dependencies]
bytes = "1.3.0"
diff --git a/src/bin/cloudmqtt-test-client.rs b/src/bin/cloudmqtt-test-client.rs
new file mode 100644
index 0000000..6d714f4
--- /dev/null
+++ b/src/bin/cloudmqtt-test-client.rs
@@ -0,0 +1,70 @@
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+//
+
+use std::process::exit;
+
+use cloudmqtt::{packet_stream::Acknowledge, MqttClient, MqttConnectionParams};
+use futures::StreamExt;
+use mqtt_format::v3::{
+ strings::MString, subscription_request::MSubscriptionRequest, will::MLastWill,
+};
+
+fn print_error_and_quit(e: &str) -> ! {
+ eprintln!("{}", e);
+ exit(1);
+}
+
+#[tokio::main]
+async fn main() {
+ let (client_duplex, server_duplex) = tokio::io::duplex(512);
+
+ let (mut read_dup, mut write_dup) = tokio::io::split(server_duplex);
+
+ tokio::spawn(async move { tokio::io::copy(&mut tokio::io::stdin(), &mut write_dup).await });
+ tokio::spawn(async move { tokio::io::copy(&mut read_dup, &mut tokio::io::stdout()).await });
+
+ let client = match MqttClient::connect_v3_duplex(
+ client_duplex,
+ MqttConnectionParams {
+ clean_session: false,
+ will: Some(MLastWill {
+ topic: mqtt_format::v3::strings::MString {
+ value: "hello/world",
+ },
+ payload: b"I died!",
+ qos: mqtt_format::v3::qos::MQualityOfService::AtMostOnce,
+ retain: false,
+ }),
+ username: None,
+ password: None,
+ keep_alive: 5,
+ client_id: mqtt_format::v3::strings::MString {
+ value: "mqtt-client-test",
+ },
+ },
+ )
+ .await
+ {
+ Ok(client) => client,
+ Err(e) => print_error_and_quit(&format!("Could not connect: {e}")),
+ };
+
+ tokio::spawn(client.hearbeat(None));
+
+ let packet_stream = client.build_packet_stream().build();
+ let mut packet_stream = Box::pin(packet_stream.stream());
+
+ loop {
+ let packet = match packet_stream.next().await {
+ Some(Ok(packet)) => packet,
+ None => {
+ eprintln!("Stream ended, stopping");
+ break;
+ }
+ Some(Err(error)) => print_error_and_quit(&format!("Stream errored: {error}")),
+ };
+ }
+}