summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2022-08-22 09:46:42 +0200
committerMarcel Müller <neikos@neikos.email>2022-12-12 12:41:08 +0100
commit6c0c78d107edd4f52cf45727579227a2f971efe2 (patch)
treeeca282dc6a19a46e81549bc75086108bfb00d46c
parent666bb1d8bacb94c58ad04163d7df5af03dc5dec3 (diff)
Add keep alive timeout to connections
Co-authored-by: Matthias Beyer <mail@beyermatthias.de> Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--src/server.rs23
1 files changed, 17 insertions, 6 deletions
diff --git a/src/server.rs b/src/server.rs
index 4702da4..f0dbc7d 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -4,7 +4,7 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
-use std::{collections::VecDeque, sync::Arc};
+use std::{collections::VecDeque, sync::Arc, time::Duration};
use dashmap::DashMap;
use mqtt_format::v3::{
@@ -215,23 +215,34 @@ impl MqttServer {
}
});
+ let keep_alive = *keep_alive;
+
tokio::spawn(async move {
let client_id = client_id;
let client_connection = client_connection;
let mut reader = client_connection.reader.lock().await;
+ let keep_alive_duration = Duration::from_secs((keep_alive as u64 * 150) / 100);
loop {
- let packet = match crate::read_one_packet(&mut *reader).await {
- Ok(packet) => packet,
- Err(e) => {
- debug!("Could not read the next client packet: {e}");
+ let packet = tokio::select! {
+ packet = crate::read_one_packet(&mut *reader) => {
+ match packet {
+ Ok(packet) => packet,
+ Err(e) => {
+ debug!("Could not read the next client packet: {e}");
+ break;
+ }
+ }
+ },
+ _timeout = tokio::time::sleep(keep_alive_duration) => {
+ debug!("Client timed out");
break;
}
};
match packet.get_packet() {
MPacket::Publish {
- dup,
+ dup: _,
qos,
retain,
topic_name,