From be96d195b0f715893fe239a21c5d06f00e71a90d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=BCller?= Date: Mon, 4 Jul 2022 17:11:08 +0200 Subject: Add heartbeats as a separate task --- Cargo.lock | 114 +++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 15 +++++- mqtt-format/src/v3/packet.rs | 23 ++++++++- src/bin/cloudmqtt-client.rs | 8 +-- src/lib.rs | 36 ++++++++++++-- src/packet_stream.rs | 6 +-- 6 files changed, 188 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8b90b32..f5b2c91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + [[package]] name = "cfg-if" version = "1.0.0" @@ -106,6 +112,7 @@ version = "0.1.0" dependencies = [ "bytes", "clap", + "crossterm 0.24.0", "dashmap", "futures", "mqtt-format", @@ -116,6 +123,48 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "tui", +] + +[[package]] +name = "crossterm" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2102ea4f781910f8a5b98dd061f4c2023f479ce7bb1236330099ceb5a93cf17" +dependencies = [ + "bitflags", + "crossterm_winapi", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab9f7409c70a38a56216480fba371ee460207dd8926ccf5b4160591759559170" +dependencies = [ + "bitflags", + "crossterm_winapi", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c" +dependencies = [ + "winapi", ] [[package]] @@ -408,6 +457,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.3" @@ -535,6 +594,36 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.6" @@ -729,12 +818,37 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tui" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96fe69244ec2af261bced1d9046a6fee6c8c2a6b0228e59e5ba39bc8ba4ed729" +dependencies = [ + "bitflags", + "cassowary", + "crossterm 0.23.2", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "unicode-ident" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" +[[package]] +name = "unicode-segmentation" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" + +[[package]] +name = "unicode-width" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 47d218c..ef490cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,11 +14,19 @@ name = "cloudmqtt-client" required-features = ["bin"] [features] -bin = ["clap", "tokio/rt-multi-thread", "tokio/macros", "tracing-subscriber"] +bin = [ + "clap", + "tokio/rt-multi-thread", + "tokio/macros", + "tracing-subscriber", + "tui", + "crossterm", +] [dependencies] bytes = "1.1.0" clap = { version = "3.2.6", optional = true, features = ["derive"] } +crossterm = { version = "0.24.0", optional = true } dashmap = "5.3.4" futures = "0.3.21" mqtt-format = { version = "0.1.0", path = "mqtt-format" } @@ -37,7 +45,10 @@ tokio-util = { version = "0.7.3", features = [ "compat", ] } tracing = "0.1.35" -tracing-subscriber = { version = "0.3.14", optional = true, features = ["env-filter"] } +tracing-subscriber = { version = "0.3.14", optional = true, features = [ + "env-filter", +] } +tui = { version = "0.18.0", optional = true } [dev-dependencies] static_assertions = "1.1.0" diff --git a/mqtt-format/src/v3/packet.rs b/mqtt-format/src/v3/packet.rs index 9352367..23e3c8d 100644 --- a/mqtt-format/src/v3/packet.rs +++ b/mqtt-format/src/v3/packet.rs @@ -200,7 +200,20 @@ impl<'message> MPacket<'message> { id, payload, } => todo!(), - MPacket::Puback { id } => todo!(), + MPacket::Puback { id } => { + let packet_type = 0b0100_0000; + + // Header 1 + writer.write_all(&[packet_type]).await?; + + let remaining_length = 2; + + // Header 2-5 + write_remaining_length!(writer, remaining_length); + + // Variable 1-6 + id.write_to(&mut writer).await?; + } MPacket::Pubrec { id } => todo!(), MPacket::Pubrel { id } => todo!(), MPacket::Pubcomp { id } => todo!(), @@ -230,7 +243,13 @@ impl<'message> MPacket<'message> { unsubscriptions, } => todo!(), MPacket::Unsuback { id } => todo!(), - MPacket::Pingreq => todo!(), + MPacket::Pingreq => { + let packet_type = 0b1100_0000; + let variable_length = 0b0; + + // Header + writer.write_all(&[packet_type, variable_length]).await?; + } MPacket::Pingresp => todo!(), MPacket::Disconnect => todo!(), } diff --git a/src/bin/cloudmqtt-client.rs b/src/bin/cloudmqtt-client.rs index 9de6011..49d237f 100644 --- a/src/bin/cloudmqtt-client.rs +++ b/src/bin/cloudmqtt-client.rs @@ -40,7 +40,7 @@ async fn main() { let mut client = MqttClient::connect_v3_unsecured( &args.addr, MqttConnectionParams { - clean_session: true, + clean_session: false, will: Some(MLastWill { topic: mqtt_format::v3::strings::MString { value: "hello/world", @@ -51,7 +51,7 @@ async fn main() { }), username: None, password: None, - keep_alive: 100, + keep_alive: 5, client_id: mqtt_format::v3::strings::MString { value: &args.client_id, }, @@ -60,6 +60,8 @@ async fn main() { .await .unwrap(); + tokio::spawn(client.hearbeat(None)); + client .subscribe( &args @@ -67,7 +69,7 @@ async fn main() { .iter() .map(|sub| MSubscriptionRequest { topic: MString { value: &sub }, - qos: mqtt_format::v3::qos::MQualityOfService::AtMostOnce, + qos: mqtt_format::v3::qos::MQualityOfService::AtLeastOnce, }) .collect::>(), ) diff --git a/src/lib.rs b/src/lib.rs index 056e609..522c51a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use std::{pin::Pin, sync::Arc, time::Duration}; use bytes::{BufMut, Bytes, BytesMut}; use client_stream::MqttClientStream; @@ -44,9 +44,10 @@ fn parse_packet(input: &[u8]) -> Result, MqttError> { pub struct MqttClient { session_present: bool, client_receiver: Mutex>>, - client_sender: Mutex>>, + client_sender: Arc>>>, received_packet_storage: PacketStorage, sent_packet_storage: PacketStorage, + keep_alive_duration: u16, } macro_rules! write_packet { @@ -104,12 +105,39 @@ impl MqttClient { Ok(MqttClient { session_present, client_receiver: Mutex::new(Some(read_half)), - client_sender: Mutex::new(Some(write_half)), + client_sender: Arc::new(Mutex::new(Some(write_half))), sent_packet_storage: PacketStorage::new(), received_packet_storage: PacketStorage::new(), + keep_alive_duration: connection_params.keep_alive, }) } + pub fn hearbeat( + &self, + cancel_token: Option, + ) -> impl std::future::Future> { + let keep_alive_duration = self.keep_alive_duration; + let sender = self.client_sender.clone(); + async move { + loop { + tokio::time::sleep(Duration::from_secs((keep_alive_duration as u64 * 100) / 80)) + .await; + + let mut mutex = sender.lock().await; + + let mut client_stream = match mutex.as_mut() { + Some(cs) => cs, + None => return Err(MqttError::ConnectionClosed), + }; + trace!("Sending hearbeat"); + + let packet = MPacket::Pingreq; + + write_packet!(&mut client_stream, packet).await?; + } + } + } + async fn read_one_packet( mut reader: W, ) -> Result { @@ -178,7 +206,7 @@ impl MqttClient { } pub async fn subscribe( - &mut self, + &self, subscription_requests: &[MSubscriptionRequest<'_>], ) -> Result<(), MqttError> { let mut mutex = match self.client_sender.try_lock() { diff --git a/src/packet_stream.rs b/src/packet_stream.rs index 2b3d39d..3894be5 100644 --- a/src/packet_stream.rs +++ b/src/packet_stream.rs @@ -110,9 +110,9 @@ impl<'client, ACK: AckHandler> PacketStream<'client, ACK> { MPacket::Publish { qos, .. } => { if qos != MQualityOfService::AtMostOnce { self.ack_fn.handle(next_message.clone()); - client - .received_packet_storage - .push_to_storage(next_message.clone()); + // client + // .received_packet_storage + // .push_to_storage(next_message.clone()); let mut mutex = client.client_sender.lock().await; -- cgit v1.2.3