summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2024-04-04 16:15:23 +0200
committerMarcel Müller <neikos@neikos.email>2024-04-04 16:16:32 +0200
commit2fa4e53a58d71320363c856cd2e29dfd558c349f (patch)
tree6cee8f17f4796d15118d60475ceeebeb4a9bdd0e
parent2d31fe958190bf7f9cf97a3bd2b9484c5408ee58 (diff)
Create and start heartbeat background task
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--src/client/connect.rs70
1 files changed, 65 insertions, 5 deletions
diff --git a/src/client/connect.rs b/src/client/connect.rs
index 74864c4..8bb6f6a 100644
--- a/src/client/connect.rs
+++ b/src/client/connect.rs
@@ -4,6 +4,9 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
+use std::time::Duration;
+
+use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
@@ -214,6 +217,7 @@ impl MqttClient {
});
}
+ let (sender, heartbeat_receiver) = futures::channel::mpsc::channel(1);
let conn_write = TransportWriter::new(conn_write, sender);
let (conn_read_sender, conn_read_recv) = futures::channel::oneshot::channel();
@@ -269,6 +273,8 @@ impl MqttClient {
};
}
+ let keep_alive = connect_client_state.keep_alive;
+
inner.connection_state = Some(connect_client_state);
inner.session_state = Some(SessionState {
client_identifier,
@@ -279,11 +285,34 @@ impl MqttClient {
crate::packets::connack::ConnackPropertiesView::try_from(maybe_connack)
.expect("An already matched value suddenly changed?");
- let background_task = crate::client::receive::handle_background_receiving(
- inner_clone,
- conn_read,
- conn_read_sender,
- )
+ let background_task = async move {
+ let receiving_inner = inner_clone.clone();
+ let receiving = crate::client::receive::handle_background_receiving(
+ receiving_inner,
+ conn_read,
+ conn_read_sender,
+ );
+
+ let heartbeat_inner = inner_clone;
+
+ let heartbeat = if let KeepAlive::Seconds(time) = keep_alive {
+ handle_heartbeats(
+ heartbeat_receiver,
+ Duration::from_secs(time.get().into()),
+ heartbeat_inner,
+ )
+ .left_future()
+ } else {
+ tracing::info!(
+ "Keep Alive is disabled, will not send PingReq packets automatically"
+ );
+ futures::future::ok(()).right_future()
+ };
+
+ tokio::try_join!(receiving, heartbeat)
+ .map(drop)
+ .map_err(drop)
+ }
.boxed();
return Ok(Connected {
@@ -297,3 +326,34 @@ impl MqttClient {
todo!()
}
}
+
+async fn handle_heartbeats(
+ mut heartbeat_receiver: futures::channel::mpsc::Receiver<()>,
+ duration: Duration,
+ heartbeat_inner: std::sync::Arc<futures::lock::Mutex<super::InnerClient>>,
+) -> Result<(), ()> {
+ let mut timeout = futures_timer::Delay::new(duration).fuse();
+ loop {
+ select! {
+ heartbeat = heartbeat_receiver.next() => match heartbeat {
+ None => break,
+ Some(_) => {
+ timeout = futures_timer::Delay::new(duration).fuse();
+ },
+ },
+ _ = timeout => {
+ let mut inner = heartbeat_inner.lock().await;
+ let inner = &mut *inner;
+ let Some(conn_state) = inner.connection_state.as_mut() else {
+ todo!();
+ };
+
+ // We make sure that this won't deadlock in the send method
+ conn_state.conn_write.send(
+ mqtt_format::v5::packets::MqttPacket::Pingreq(mqtt_format::v5::packets::pingreq::MPingreq)
+ ).await.unwrap();
+ }
+ }
+ }
+ Ok(())
+}