diff options
author | Marcel Müller <neikos@neikos.email> | 2024-04-04 16:15:23 +0200 |
---|---|---|
committer | Marcel Müller <neikos@neikos.email> | 2024-04-04 16:16:32 +0200 |
commit | 2fa4e53a58d71320363c856cd2e29dfd558c349f (patch) | |
tree | 6cee8f17f4796d15118d60475ceeebeb4a9bdd0e | |
parent | 2d31fe958190bf7f9cf97a3bd2b9484c5408ee58 (diff) |
Create and start heartbeat background task
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r-- | src/client/connect.rs | 70 |
1 files changed, 65 insertions, 5 deletions
diff --git a/src/client/connect.rs b/src/client/connect.rs index 74864c4..8bb6f6a 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -4,6 +4,9 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +use std::time::Duration; + +use futures::select; use futures::FutureExt; use futures::SinkExt; use futures::StreamExt; @@ -214,6 +217,7 @@ impl MqttClient { }); } + let (sender, heartbeat_receiver) = futures::channel::mpsc::channel(1); let conn_write = TransportWriter::new(conn_write, sender); let (conn_read_sender, conn_read_recv) = futures::channel::oneshot::channel(); @@ -269,6 +273,8 @@ impl MqttClient { }; } + let keep_alive = connect_client_state.keep_alive; + inner.connection_state = Some(connect_client_state); inner.session_state = Some(SessionState { client_identifier, @@ -279,11 +285,34 @@ impl MqttClient { crate::packets::connack::ConnackPropertiesView::try_from(maybe_connack) .expect("An already matched value suddenly changed?"); - let background_task = crate::client::receive::handle_background_receiving( - inner_clone, - conn_read, - conn_read_sender, - ) + let background_task = async move { + let receiving_inner = inner_clone.clone(); + let receiving = crate::client::receive::handle_background_receiving( + receiving_inner, + conn_read, + conn_read_sender, + ); + + let heartbeat_inner = inner_clone; + + let heartbeat = if let KeepAlive::Seconds(time) = keep_alive { + handle_heartbeats( + heartbeat_receiver, + Duration::from_secs(time.get().into()), + heartbeat_inner, + ) + .left_future() + } else { + tracing::info!( + "Keep Alive is disabled, will not send PingReq packets automatically" + ); + futures::future::ok(()).right_future() + }; + + tokio::try_join!(receiving, heartbeat) + .map(drop) + .map_err(drop) + } .boxed(); return Ok(Connected { @@ -297,3 +326,34 @@ impl MqttClient { todo!() } } + +async fn handle_heartbeats( + mut heartbeat_receiver: futures::channel::mpsc::Receiver<()>, + duration: Duration, + heartbeat_inner: std::sync::Arc<futures::lock::Mutex<super::InnerClient>>, +) -> Result<(), ()> { + let mut timeout = futures_timer::Delay::new(duration).fuse(); + loop { + select! { + heartbeat = heartbeat_receiver.next() => match heartbeat { + None => break, + Some(_) => { + timeout = futures_timer::Delay::new(duration).fuse(); + }, + }, + _ = timeout => { + let mut inner = heartbeat_inner.lock().await; + let inner = &mut *inner; + let Some(conn_state) = inner.connection_state.as_mut() else { + todo!(); + }; + + // We make sure that this won't deadlock in the send method + conn_state.conn_write.send( + mqtt_format::v5::packets::MqttPacket::Pingreq(mqtt_format::v5::packets::pingreq::MPingreq) + ).await.unwrap(); + } + } + } + Ok(()) +} |