summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2024-04-04 11:50:17 +0200
committerMarcel Müller <neikos@neikos.email>2024-04-04 11:52:17 +0200
commitfee2babb03001570b51233eb0b2d3ec08ff6390e (patch)
treed7e11d26100d36ee18756d6440622b646bf10bc9
parent052e97babee91494b455f1678aaf3abb07e55109 (diff)
Add MqttClient::ping
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--src/client/send.rs44
1 files changed, 44 insertions, 0 deletions
diff --git a/src/client/send.rs b/src/client/send.rs
index b70796a..0223551 100644
--- a/src/client/send.rs
+++ b/src/client/send.rs
@@ -348,3 +348,47 @@ impl PublishQos2 {
self
}
}
+
+impl MqttClient {
+ pub async fn ping(&self) -> Result<Ping, ()> {
+ let mut inner = self.inner.lock().await;
+ let inner = &mut *inner;
+
+ let Some(conn_state) = &mut inner.connection_state else {
+ tracing::error!("No connection state found");
+ return Err(());
+ };
+
+ let packet = mqtt_format::v5::packets::MqttPacket::Pingreq(
+ mqtt_format::v5::packets::pingreq::MPingreq,
+ );
+
+ let (sender, recv) = futures::channel::oneshot::channel();
+
+ let cbs = inner
+ .outstanding_completions
+ .entry(Id::PingReq)
+ .or_insert_with(|| CallbackState::PingReq {
+ on_pingresp: Default::default(),
+ });
+
+ match cbs {
+ CallbackState::PingReq { on_pingresp } => on_pingresp.push_back(sender),
+ _ => unreachable!("Had a non-pingreq in a pingreq response"),
+ }
+
+ conn_state.conn_write.send(packet).await.map_err(drop)?;
+
+ Ok(Ping { recv })
+ }
+}
+
+pub struct Ping {
+ recv: futures::channel::oneshot::Receiver<()>,
+}
+
+impl Ping {
+ pub async fn response(self) {
+ self.recv.await.unwrap()
+ }
+}