summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2022-07-04 17:11:08 +0200
committerMarcel Müller <neikos@neikos.email>2022-07-04 17:11:08 +0200
commitbe96d195b0f715893fe239a21c5d06f00e71a90d (patch)
tree2c9360b969f869de7ce7de3edeb51a53a8053571
parent335d76687e4d7c62688b998d883fd52bff97ef27 (diff)
Add heartbeats as a separate task
-rw-r--r--Cargo.lock114
-rw-r--r--Cargo.toml15
-rw-r--r--mqtt-format/src/v3/packet.rs23
-rw-r--r--src/bin/cloudmqtt-client.rs8
-rw-r--r--src/lib.rs36
-rw-r--r--src/packet_stream.rs6
6 files changed, 188 insertions, 14 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8b90b32..f5b2c91 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -56,6 +56,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
+name = "cassowary"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53"
+
+[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -106,6 +112,7 @@ version = "0.1.0"
dependencies = [
"bytes",
"clap",
+ "crossterm 0.24.0",
"dashmap",
"futures",
"mqtt-format",
@@ -116,6 +123,48 @@ dependencies = [
"tokio-util",
"tracing",
"tracing-subscriber",
+ "tui",
+]
+
+[[package]]
+name = "crossterm"
+version = "0.23.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2102ea4f781910f8a5b98dd061f4c2023f479ce7bb1236330099ceb5a93cf17"
+dependencies = [
+ "bitflags",
+ "crossterm_winapi",
+ "libc",
+ "mio",
+ "parking_lot",
+ "signal-hook",
+ "signal-hook-mio",
+ "winapi",
+]
+
+[[package]]
+name = "crossterm"
+version = "0.24.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab9f7409c70a38a56216480fba371ee460207dd8926ccf5b4160591759559170"
+dependencies = [
+ "bitflags",
+ "crossterm_winapi",
+ "libc",
+ "mio",
+ "parking_lot",
+ "signal-hook",
+ "signal-hook-mio",
+ "winapi",
+]
+
+[[package]]
+name = "crossterm_winapi"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
+dependencies = [
+ "winapi",
]
[[package]]
@@ -409,6 +458,16 @@ dependencies = [
]
[[package]]
+name = "parking_lot"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
+dependencies = [
+ "lock_api",
+ "parking_lot_core",
+]
+
+[[package]]
name = "parking_lot_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -536,6 +595,36 @@ dependencies = [
]
[[package]]
+name = "signal-hook"
+version = "0.3.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d"
+dependencies = [
+ "libc",
+ "signal-hook-registry",
+]
+
+[[package]]
+name = "signal-hook-mio"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
+dependencies = [
+ "libc",
+ "mio",
+ "signal-hook",
+]
+
+[[package]]
+name = "signal-hook-registry"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "slab"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -730,12 +819,37 @@ dependencies = [
]
[[package]]
+name = "tui"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96fe69244ec2af261bced1d9046a6fee6c8c2a6b0228e59e5ba39bc8ba4ed729"
+dependencies = [
+ "bitflags",
+ "cassowary",
+ "crossterm 0.23.2",
+ "unicode-segmentation",
+ "unicode-width",
+]
+
+[[package]]
name = "unicode-ident"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c"
[[package]]
+name = "unicode-segmentation"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
+
+[[package]]
+name = "unicode-width"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
+
+[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 47d218c..ef490cd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,11 +14,19 @@ name = "cloudmqtt-client"
required-features = ["bin"]
[features]
-bin = ["clap", "tokio/rt-multi-thread", "tokio/macros", "tracing-subscriber"]
+bin = [
+ "clap",
+ "tokio/rt-multi-thread",
+ "tokio/macros",
+ "tracing-subscriber",
+ "tui",
+ "crossterm",
+]
[dependencies]
bytes = "1.1.0"
clap = { version = "3.2.6", optional = true, features = ["derive"] }
+crossterm = { version = "0.24.0", optional = true }
dashmap = "5.3.4"
futures = "0.3.21"
mqtt-format = { version = "0.1.0", path = "mqtt-format" }
@@ -37,7 +45,10 @@ tokio-util = { version = "0.7.3", features = [
"compat",
] }
tracing = "0.1.35"
-tracing-subscriber = { version = "0.3.14", optional = true, features = ["env-filter"] }
+tracing-subscriber = { version = "0.3.14", optional = true, features = [
+ "env-filter",
+] }
+tui = { version = "0.18.0", optional = true }
[dev-dependencies]
static_assertions = "1.1.0"
diff --git a/mqtt-format/src/v3/packet.rs b/mqtt-format/src/v3/packet.rs
index 9352367..23e3c8d 100644
--- a/mqtt-format/src/v3/packet.rs
+++ b/mqtt-format/src/v3/packet.rs
@@ -200,7 +200,20 @@ impl<'message> MPacket<'message> {
id,
payload,
} => todo!(),
- MPacket::Puback { id } => todo!(),
+ MPacket::Puback { id } => {
+ let packet_type = 0b0100_0000;
+
+ // Header 1
+ writer.write_all(&[packet_type]).await?;
+
+ let remaining_length = 2;
+
+ // Header 2-5
+ write_remaining_length!(writer, remaining_length);
+
+ // Variable 1-6
+ id.write_to(&mut writer).await?;
+ }
MPacket::Pubrec { id } => todo!(),
MPacket::Pubrel { id } => todo!(),
MPacket::Pubcomp { id } => todo!(),
@@ -230,7 +243,13 @@ impl<'message> MPacket<'message> {
unsubscriptions,
} => todo!(),
MPacket::Unsuback { id } => todo!(),
- MPacket::Pingreq => todo!(),
+ MPacket::Pingreq => {
+ let packet_type = 0b1100_0000;
+ let variable_length = 0b0;
+
+ // Header
+ writer.write_all(&[packet_type, variable_length]).await?;
+ }
MPacket::Pingresp => todo!(),
MPacket::Disconnect => todo!(),
}
diff --git a/src/bin/cloudmqtt-client.rs b/src/bin/cloudmqtt-client.rs
index 9de6011..49d237f 100644
--- a/src/bin/cloudmqtt-client.rs
+++ b/src/bin/cloudmqtt-client.rs
@@ -40,7 +40,7 @@ async fn main() {
let mut client = MqttClient::connect_v3_unsecured(
&args.addr,
MqttConnectionParams {
- clean_session: true,
+ clean_session: false,
will: Some(MLastWill {
topic: mqtt_format::v3::strings::MString {
value: "hello/world",
@@ -51,7 +51,7 @@ async fn main() {
}),
username: None,
password: None,
- keep_alive: 100,
+ keep_alive: 5,
client_id: mqtt_format::v3::strings::MString {
value: &args.client_id,
},
@@ -60,6 +60,8 @@ async fn main() {
.await
.unwrap();
+ tokio::spawn(client.hearbeat(None));
+
client
.subscribe(
&args
@@ -67,7 +69,7 @@ async fn main() {
.iter()
.map(|sub| MSubscriptionRequest {
topic: MString { value: &sub },
- qos: mqtt_format::v3::qos::MQualityOfService::AtMostOnce,
+ qos: mqtt_format::v3::qos::MQualityOfService::AtLeastOnce,
})
.collect::<Vec<_>>(),
)
diff --git a/src/lib.rs b/src/lib.rs
index 056e609..522c51a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-use std::pin::Pin;
+use std::{pin::Pin, sync::Arc, time::Duration};
use bytes::{BufMut, Bytes, BytesMut};
use client_stream::MqttClientStream;
@@ -44,9 +44,10 @@ fn parse_packet(input: &[u8]) -> Result<MPacket<'_>, MqttError> {
pub struct MqttClient {
session_present: bool,
client_receiver: Mutex<Option<ReadHalf<client_stream::MqttClientStream>>>,
- client_sender: Mutex<Option<WriteHalf<client_stream::MqttClientStream>>>,
+ client_sender: Arc<Mutex<Option<WriteHalf<client_stream::MqttClientStream>>>>,
received_packet_storage: PacketStorage,
sent_packet_storage: PacketStorage,
+ keep_alive_duration: u16,
}
macro_rules! write_packet {
@@ -104,12 +105,39 @@ impl MqttClient {
Ok(MqttClient {
session_present,
client_receiver: Mutex::new(Some(read_half)),
- client_sender: Mutex::new(Some(write_half)),
+ client_sender: Arc::new(Mutex::new(Some(write_half))),
sent_packet_storage: PacketStorage::new(),
received_packet_storage: PacketStorage::new(),
+ keep_alive_duration: connection_params.keep_alive,
})
}
+ pub fn hearbeat(
+ &self,
+ cancel_token: Option<CancellationToken>,
+ ) -> impl std::future::Future<Output = Result<(), MqttError>> {
+ let keep_alive_duration = self.keep_alive_duration;
+ let sender = self.client_sender.clone();
+ async move {
+ loop {
+ tokio::time::sleep(Duration::from_secs((keep_alive_duration as u64 * 100) / 80))
+ .await;
+
+ let mut mutex = sender.lock().await;
+
+ let mut client_stream = match mutex.as_mut() {
+ Some(cs) => cs,
+ None => return Err(MqttError::ConnectionClosed),
+ };
+ trace!("Sending hearbeat");
+
+ let packet = MPacket::Pingreq;
+
+ write_packet!(&mut client_stream, packet).await?;
+ }
+ }
+ }
+
async fn read_one_packet<W: tokio::io::AsyncRead + Unpin>(
mut reader: W,
) -> Result<MqttPacket, MqttError> {
@@ -178,7 +206,7 @@ impl MqttClient {
}
pub async fn subscribe(
- &mut self,
+ &self,
subscription_requests: &[MSubscriptionRequest<'_>],
) -> Result<(), MqttError> {
let mut mutex = match self.client_sender.try_lock() {
diff --git a/src/packet_stream.rs b/src/packet_stream.rs
index 2b3d39d..3894be5 100644
--- a/src/packet_stream.rs
+++ b/src/packet_stream.rs
@@ -110,9 +110,9 @@ impl<'client, ACK: AckHandler> PacketStream<'client, ACK> {
MPacket::Publish { qos, .. } => {
if qos != MQualityOfService::AtMostOnce {
self.ack_fn.handle(next_message.clone());
- client
- .received_packet_storage
- .push_to_storage(next_message.clone());
+ // client
+ // .received_packet_storage
+ // .push_to_storage(next_message.clone());
let mut mutex = client.client_sender.lock().await;