diff options
author | Marcel Müller <neikos@neikos.email> | 2022-08-22 09:46:42 +0200 |
---|---|---|
committer | Marcel Müller <neikos@neikos.email> | 2022-12-12 12:41:08 +0100 |
commit | 6c0c78d107edd4f52cf45727579227a2f971efe2 (patch) | |
tree | eca282dc6a19a46e81549bc75086108bfb00d46c | |
parent | 666bb1d8bacb94c58ad04163d7df5af03dc5dec3 (diff) |
Add keep alive timeout to connections
Co-authored-by: Matthias Beyer <mail@beyermatthias.de>
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r-- | src/server.rs | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/src/server.rs b/src/server.rs index 4702da4..f0dbc7d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // -use std::{collections::VecDeque, sync::Arc}; +use std::{collections::VecDeque, sync::Arc, time::Duration}; use dashmap::DashMap; use mqtt_format::v3::{ @@ -215,23 +215,34 @@ impl MqttServer { } }); + let keep_alive = *keep_alive; + tokio::spawn(async move { let client_id = client_id; let client_connection = client_connection; let mut reader = client_connection.reader.lock().await; + let keep_alive_duration = Duration::from_secs((keep_alive as u64 * 150) / 100); loop { - let packet = match crate::read_one_packet(&mut *reader).await { - Ok(packet) => packet, - Err(e) => { - debug!("Could not read the next client packet: {e}"); + let packet = tokio::select! { + packet = crate::read_one_packet(&mut *reader) => { + match packet { + Ok(packet) => packet, + Err(e) => { + debug!("Could not read the next client packet: {e}"); + break; + } + } + }, + _timeout = tokio::time::sleep(keep_alive_duration) => { + debug!("Client timed out"); break; } }; match packet.get_packet() { MPacket::Publish { - dup, + dup: _, qos, retain, topic_name, |