diff options
author | Marcel Müller <neikos@neikos.email> | 2024-04-04 11:50:17 +0200 |
---|---|---|
committer | Marcel Müller <neikos@neikos.email> | 2024-04-04 11:52:17 +0200 |
commit | fee2babb03001570b51233eb0b2d3ec08ff6390e (patch) | |
tree | d7e11d26100d36ee18756d6440622b646bf10bc9 | |
parent | 052e97babee91494b455f1678aaf3abb07e55109 (diff) |
Add MqttClient::ping
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r-- | src/client/send.rs | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/src/client/send.rs b/src/client/send.rs index b70796a..0223551 100644 --- a/src/client/send.rs +++ b/src/client/send.rs @@ -348,3 +348,47 @@ impl PublishQos2 { self } } + +impl MqttClient { + pub async fn ping(&self) -> Result<Ping, ()> { + let mut inner = self.inner.lock().await; + let inner = &mut *inner; + + let Some(conn_state) = &mut inner.connection_state else { + tracing::error!("No connection state found"); + return Err(()); + }; + + let packet = mqtt_format::v5::packets::MqttPacket::Pingreq( + mqtt_format::v5::packets::pingreq::MPingreq, + ); + + let (sender, recv) = futures::channel::oneshot::channel(); + + let cbs = inner + .outstanding_completions + .entry(Id::PingReq) + .or_insert_with(|| CallbackState::PingReq { + on_pingresp: Default::default(), + }); + + match cbs { + CallbackState::PingReq { on_pingresp } => on_pingresp.push_back(sender), + _ => unreachable!("Had a non-pingreq in a pingreq response"), + } + + conn_state.conn_write.send(packet).await.map_err(drop)?; + + Ok(Ping { recv }) + } +} + +pub struct Ping { + recv: futures::channel::oneshot::Receiver<()>, +} + +impl Ping { + pub async fn response(self) { + self.recv.await.unwrap() + } +} |