diff options
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | src/bin/cloudmqtt-test-client.rs | 70 |
2 files changed, 75 insertions, 1 deletions
@@ -23,8 +23,12 @@ required-features = ["bin"] name = "cloudmqtt-server" required-features = ["bin"] +[[bin]] +name = "cloudmqtt-test-client" +required-features = ["bin"] + [features] -bin = ["clap", "tokio/rt-multi-thread", "tokio/macros", "tracing-subscriber"] +bin = ["clap", "tokio/rt-multi-thread", "tokio/macros", "tokio/io-std", "tracing-subscriber"] [dependencies] bytes = "1.3.0" diff --git a/src/bin/cloudmqtt-test-client.rs b/src/bin/cloudmqtt-test-client.rs new file mode 100644 index 0000000..6d714f4 --- /dev/null +++ b/src/bin/cloudmqtt-test-client.rs @@ -0,0 +1,70 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use std::process::exit; + +use cloudmqtt::{packet_stream::Acknowledge, MqttClient, MqttConnectionParams}; +use futures::StreamExt; +use mqtt_format::v3::{ + strings::MString, subscription_request::MSubscriptionRequest, will::MLastWill, +}; + +fn print_error_and_quit(e: &str) -> ! { + eprintln!("{}", e); + exit(1); +} + +#[tokio::main] +async fn main() { + let (client_duplex, server_duplex) = tokio::io::duplex(512); + + let (mut read_dup, mut write_dup) = tokio::io::split(server_duplex); + + tokio::spawn(async move { tokio::io::copy(&mut tokio::io::stdin(), &mut write_dup).await }); + tokio::spawn(async move { tokio::io::copy(&mut read_dup, &mut tokio::io::stdout()).await }); + + let client = match MqttClient::connect_v3_duplex( + client_duplex, + MqttConnectionParams { + clean_session: false, + will: Some(MLastWill { + topic: mqtt_format::v3::strings::MString { + value: "hello/world", + }, + payload: b"I died!", + qos: mqtt_format::v3::qos::MQualityOfService::AtMostOnce, + retain: false, + }), + username: None, + password: None, + keep_alive: 5, + client_id: mqtt_format::v3::strings::MString { + value: "mqtt-client-test", + }, + }, + ) + .await + { + Ok(client) => client, + Err(e) => print_error_and_quit(&format!("Could not connect: {e}")), + }; + + tokio::spawn(client.hearbeat(None)); + + let packet_stream = client.build_packet_stream().build(); + let mut packet_stream = Box::pin(packet_stream.stream()); + + loop { + let packet = match packet_stream.next().await { + Some(Ok(packet)) => packet, + None => { + eprintln!("Stream ended, stopping"); + break; + } + Some(Err(error)) => print_error_and_quit(&format!("Stream errored: {error}")), + }; + } +} |