diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2024-04-05 12:26:16 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2024-04-05 15:09:49 +0200 |
commit | fb6d3f74c776e6f2af3d5e380bdc1a0fd69f5ff4 (patch) | |
tree | eb83e12ca3c73df7f70068594aaec618f83a326f | |
parent | 11e1f610e73158184a527ea02c89581b994f9f2f (diff) |
Move packet objects into handlers
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | src/client/receive.rs | 13 | ||||
-rw-r--r-- | src/client/send.rs | 12 |
2 files changed, 14 insertions, 11 deletions
diff --git a/src/client/receive.rs b/src/client/receive.rs index 7f8db18..312c211 100644 --- a/src/client/receive.rs +++ b/src/client/receive.rs @@ -57,8 +57,8 @@ pub(super) async fn handle_background_receiving( .instrument(process_span) .await? } - mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => { - handle_puback(mpuback, &inner, &packet) + mqtt_format::v5::packets::MqttPacket::Puback(_mpuback) => { + handle_puback(&packet.try_into().unwrap(), &inner) .instrument(process_span) .await? } @@ -159,10 +159,13 @@ async fn handle_pubcomp( } async fn handle_puback( - mpuback: &mqtt_format::v5::packets::puback::MPuback<'_>, + puback: &crate::packets::Puback, inner: &Arc<Mutex<InnerClient>>, - packet: &MqttPacket, ) -> Result<(), ()> { + tracing::trace!("Calling on_qos1_acknowledge handler"); + (inner.lock().await.default_handlers.on_qos1_acknowledge)(puback.clone()); + let mpuback = puback.get(); + match mpuback.reason { mqtt_format::v5::packets::puback::PubackReasonCode::Success | mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => { @@ -185,7 +188,7 @@ async fn handle_puback( tracing::trace!("Removed packet id from outstanding packets"); if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) { - if let Err(_) = callback.on_acknowledge.send(packet.clone()) { + if let Err(_) = callback.on_acknowledge.send(puback.clone()) { tracing::trace!("Could not send ack, receiver was dropped.") } } diff --git a/src/client/send.rs b/src/client/send.rs index b81970b..204b77e 100644 --- a/src/client/send.rs +++ b/src/client/send.rs @@ -216,12 +216,12 @@ pub struct PacketIdentifierExhausted; pub(crate) struct ClientHandlers { pub(crate) on_packet_recv: OnPacketRecvFn, pub(crate) on_qos1_acknowledge: OnQos1AcknowledgeFn, - // on_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, - // on_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, + // on_qos2_receive: Box<dyn Fn(crate::packets::MqttPacket) + Send>, + // on_qos2_complete: Box<dyn Fn(crate::packets::MqttPacket) + Send>, } -pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>; -pub type OnQos1AcknowledgeFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>; +pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>; +pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>; impl Default for ClientHandlers { fn default() -> Self { @@ -298,7 +298,7 @@ impl Callbacks { } pub(crate) struct Qos1Callbacks { - pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>, + pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::Puback>, } pub(crate) struct Qos2ReceiveCallback { @@ -341,7 +341,7 @@ enum PublishedReceiver { } pub struct PublishedQos1 { - recv: futures::channel::oneshot::Receiver<MqttPacket>, + recv: futures::channel::oneshot::Receiver<crate::packets::Puback>, } impl PublishedQos1 { |