summaryrefslogtreecommitdiffstats
path: root/src/client/receive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/receive.rs')
-rw-r--r--src/client/receive.rs16
1 files changed, 11 insertions, 5 deletions
diff --git a/src/client/receive.rs b/src/client/receive.rs
index 3312b1a..e08a3e9 100644
--- a/src/client/receive.rs
+++ b/src/client/receive.rs
@@ -46,6 +46,9 @@ pub(super) async fn handle_background_receiving(
tracing::field::debug(packet.get().get_kind()),
);
+ tracing::trace!("Calling on_packet_recv() handler");
+ (inner.lock().await.default_handlers.on_packet_recv)(packet.clone());
+
match packet.get() {
mqtt_format::v5::packets::MqttPacket::Auth(_) => todo!(),
mqtt_format::v5::packets::MqttPacket::Disconnect(_) => todo!(),
@@ -57,8 +60,8 @@ pub(super) async fn handle_background_receiving(
.instrument(process_span)
.await?
}
- mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => {
- handle_puback(mpuback, &inner, &packet)
+ mqtt_format::v5::packets::MqttPacket::Puback(_mpuback) => {
+ handle_puback(&packet.try_into().unwrap(), &inner)
.instrument(process_span)
.await?
}
@@ -158,10 +161,13 @@ async fn handle_pubcomp(
}
async fn handle_puback(
- mpuback: &mqtt_format::v5::packets::puback::MPuback<'_>,
+ puback: &crate::packets::Puback,
inner: &Arc<Mutex<InnerClient>>,
- packet: &MqttPacket,
) -> Result<(), ()> {
+ tracing::trace!("Calling on_qos1_acknowledge handler");
+ (inner.lock().await.default_handlers.on_qos1_acknowledge)(puback.clone());
+ let mpuback = puback.get();
+
match mpuback.reason {
mqtt_format::v5::packets::puback::PubackReasonCode::Success
| mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => {
@@ -184,7 +190,7 @@ async fn handle_puback(
tracing::trace!("Removed packet id from outstanding packets");
if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) {
- if let Err(_) = callback.on_acknowledge.send(packet.clone()) {
+ if let Err(_) = callback.on_acknowledge.send(puback.clone()) {
tracing::trace!("Could not send ack, receiver was dropped.")
}
}