summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2024-04-05 12:26:16 +0200
committerMatthias Beyer <mail@beyermatthias.de>2024-04-05 15:09:49 +0200
commitfb6d3f74c776e6f2af3d5e380bdc1a0fd69f5ff4 (patch)
treeeb83e12ca3c73df7f70068594aaec618f83a326f
parent11e1f610e73158184a527ea02c89581b994f9f2f (diff)
Move packet objects into handlers
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--src/client/receive.rs13
-rw-r--r--src/client/send.rs12
2 files changed, 14 insertions, 11 deletions
diff --git a/src/client/receive.rs b/src/client/receive.rs
index 7f8db18..312c211 100644
--- a/src/client/receive.rs
+++ b/src/client/receive.rs
@@ -57,8 +57,8 @@ pub(super) async fn handle_background_receiving(
.instrument(process_span)
.await?
}
- mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => {
- handle_puback(mpuback, &inner, &packet)
+ mqtt_format::v5::packets::MqttPacket::Puback(_mpuback) => {
+ handle_puback(&packet.try_into().unwrap(), &inner)
.instrument(process_span)
.await?
}
@@ -159,10 +159,13 @@ async fn handle_pubcomp(
}
async fn handle_puback(
- mpuback: &mqtt_format::v5::packets::puback::MPuback<'_>,
+ puback: &crate::packets::Puback,
inner: &Arc<Mutex<InnerClient>>,
- packet: &MqttPacket,
) -> Result<(), ()> {
+ tracing::trace!("Calling on_qos1_acknowledge handler");
+ (inner.lock().await.default_handlers.on_qos1_acknowledge)(puback.clone());
+ let mpuback = puback.get();
+
match mpuback.reason {
mqtt_format::v5::packets::puback::PubackReasonCode::Success
| mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => {
@@ -185,7 +188,7 @@ async fn handle_puback(
tracing::trace!("Removed packet id from outstanding packets");
if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) {
- if let Err(_) = callback.on_acknowledge.send(packet.clone()) {
+ if let Err(_) = callback.on_acknowledge.send(puback.clone()) {
tracing::trace!("Could not send ack, receiver was dropped.")
}
}
diff --git a/src/client/send.rs b/src/client/send.rs
index b81970b..204b77e 100644
--- a/src/client/send.rs
+++ b/src/client/send.rs
@@ -216,12 +216,12 @@ pub struct PacketIdentifierExhausted;
pub(crate) struct ClientHandlers {
pub(crate) on_packet_recv: OnPacketRecvFn,
pub(crate) on_qos1_acknowledge: OnQos1AcknowledgeFn,
- // on_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
- // on_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
+ // on_qos2_receive: Box<dyn Fn(crate::packets::MqttPacket) + Send>,
+ // on_qos2_complete: Box<dyn Fn(crate::packets::MqttPacket) + Send>,
}
-pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
-pub type OnQos1AcknowledgeFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
+pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>;
+pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>;
impl Default for ClientHandlers {
fn default() -> Self {
@@ -298,7 +298,7 @@ impl Callbacks {
}
pub(crate) struct Qos1Callbacks {
- pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>,
+ pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::Puback>,
}
pub(crate) struct Qos2ReceiveCallback {
@@ -341,7 +341,7 @@ enum PublishedReceiver {
}
pub struct PublishedQos1 {
- recv: futures::channel::oneshot::Receiver<MqttPacket>,
+ recv: futures::channel::oneshot::Receiver<crate::packets::Puback>,
}
impl PublishedQos1 {