summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cloudmqtt-bin/src/bin/client.rs6
-rw-r--r--src/client/builder.rs6
-rw-r--r--src/client/receive.rs16
-rw-r--r--src/client/send.rs19
-rw-r--r--src/packets/mod.rs2
-rw-r--r--src/packets/puback.rs29
6 files changed, 54 insertions, 24 deletions
diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs
index 8b044fe..e5f89f2 100644
--- a/cloudmqtt-bin/src/bin/client.rs
+++ b/cloudmqtt-bin/src/bin/client.rs
@@ -58,11 +58,7 @@ async fn main() {
tracing::trace!(?packet, "Received packet")
}))
.with_handle_qos1_acknowledge(Box::new(|packet| {
- async move {
- tracing::trace!(?packet, "Acknowledging packet");
- cloudmqtt::client::send::Acknowledge::Yes
- }
- .boxed()
+ tracing::trace!(?packet, "Acknowledging packet");
}))
.build()
.await
diff --git a/src/client/builder.rs b/src/client/builder.rs
index e8629eb..038cdbd 100644
--- a/src/client/builder.rs
+++ b/src/client/builder.rs
@@ -10,8 +10,8 @@ use futures::lock::Mutex;
use super::send::Callbacks;
use super::send::ClientHandlers;
-use super::send::HandleQos1AcknowledgeFn;
use super::send::OnPacketRecvFn;
+use super::send::OnQos1AcknowledgeFn;
use super::InnerClient;
use super::MqttClient;
@@ -31,8 +31,8 @@ impl MqttClientBuilder {
self
}
- pub fn with_handle_qos1_acknowledge(mut self, f: HandleQos1AcknowledgeFn) -> Self {
- self.handlers.handle_qos1_acknowledge = f;
+ pub fn with_handle_qos1_acknowledge(mut self, f: OnQos1AcknowledgeFn) -> Self {
+ self.handlers.on_qos1_acknowledge = f;
self
}
diff --git a/src/client/receive.rs b/src/client/receive.rs
index 3312b1a..e08a3e9 100644
--- a/src/client/receive.rs
+++ b/src/client/receive.rs
@@ -46,6 +46,9 @@ pub(super) async fn handle_background_receiving(
tracing::field::debug(packet.get().get_kind()),
);
+ tracing::trace!("Calling on_packet_recv() handler");
+ (inner.lock().await.default_handlers.on_packet_recv)(packet.clone());
+
match packet.get() {
mqtt_format::v5::packets::MqttPacket::Auth(_) => todo!(),
mqtt_format::v5::packets::MqttPacket::Disconnect(_) => todo!(),
@@ -57,8 +60,8 @@ pub(super) async fn handle_background_receiving(
.instrument(process_span)
.await?
}
- mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => {
- handle_puback(mpuback, &inner, &packet)
+ mqtt_format::v5::packets::MqttPacket::Puback(_mpuback) => {
+ handle_puback(&packet.try_into().unwrap(), &inner)
.instrument(process_span)
.await?
}
@@ -158,10 +161,13 @@ async fn handle_pubcomp(
}
async fn handle_puback(
- mpuback: &mqtt_format::v5::packets::puback::MPuback<'_>,
+ puback: &crate::packets::Puback,
inner: &Arc<Mutex<InnerClient>>,
- packet: &MqttPacket,
) -> Result<(), ()> {
+ tracing::trace!("Calling on_qos1_acknowledge handler");
+ (inner.lock().await.default_handlers.on_qos1_acknowledge)(puback.clone());
+ let mpuback = puback.get();
+
match mpuback.reason {
mqtt_format::v5::packets::puback::PubackReasonCode::Success
| mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => {
@@ -184,7 +190,7 @@ async fn handle_puback(
tracing::trace!("Removed packet id from outstanding packets");
if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) {
- if let Err(_) = callback.on_acknowledge.send(packet.clone()) {
+ if let Err(_) = callback.on_acknowledge.send(puback.clone()) {
tracing::trace!("Could not send ack, receiver was dropped.")
}
}
diff --git a/src/client/send.rs b/src/client/send.rs
index 161e130..204b77e 100644
--- a/src/client/send.rs
+++ b/src/client/send.rs
@@ -215,22 +215,19 @@ pub struct PacketIdentifierExhausted;
pub(crate) struct ClientHandlers {
pub(crate) on_packet_recv: OnPacketRecvFn,
- pub(crate) handle_qos1_acknowledge: HandleQos1AcknowledgeFn,
- // handle_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
- // handle_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
+ pub(crate) on_qos1_acknowledge: OnQos1AcknowledgeFn,
+ // on_qos2_receive: Box<dyn Fn(crate::packets::MqttPacket) + Send>,
+ // on_qos2_complete: Box<dyn Fn(crate::packets::MqttPacket) + Send>,
}
-pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
-pub type HandleQos1AcknowledgeFn = Box<
- dyn for<'p> Fn(&'p crate::packets::MqttPacket) -> futures::future::BoxFuture<'p, Acknowledge>
- + Send,
->;
+pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>;
+pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>;
impl Default for ClientHandlers {
fn default() -> Self {
Self {
on_packet_recv: Box::new(|_| ()),
- handle_qos1_acknowledge: Box::new(|_| async move { Acknowledge::Yes }.boxed()),
+ on_qos1_acknowledge: Box::new(|_| ()),
}
}
}
@@ -301,7 +298,7 @@ impl Callbacks {
}
pub(crate) struct Qos1Callbacks {
- pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>,
+ pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::Puback>,
}
pub(crate) struct Qos2ReceiveCallback {
@@ -344,7 +341,7 @@ enum PublishedReceiver {
}
pub struct PublishedQos1 {
- recv: futures::channel::oneshot::Receiver<MqttPacket>,
+ recv: futures::channel::oneshot::Receiver<crate::packets::Puback>,
}
impl PublishedQos1 {
diff --git a/src/packets/mod.rs b/src/packets/mod.rs
index 3b25e66..8e9e417 100644
--- a/src/packets/mod.rs
+++ b/src/packets/mod.rs
@@ -32,6 +32,8 @@ pub mod subscribe;
pub mod unsuback;
pub mod unsubscribe;
+pub use self::puback::Puback;
+
#[derive(Debug, thiserror::Error)]
#[error("Could not convert into the required packet type")]
pub struct InvalidPacketType;
diff --git a/src/packets/puback.rs b/src/packets/puback.rs
index cc001cf..54fc20b 100644
--- a/src/packets/puback.rs
+++ b/src/packets/puback.rs
@@ -4,6 +4,11 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
+use yoke::Yoke;
+
+use super::MqttPacket;
+use super::StableBytes;
+
crate::properties::define_properties! {
properties_type: mqtt_format::v5::packets::puback::PubackProperties,
anker: "_Toc3901125",
@@ -15,3 +20,27 @@ crate::properties::define_properties! {
user_properties: UserProperties<'i> with setter = crate::properties::UserProperty,
}
}
+
+#[derive(Clone, Debug)]
+pub struct Puback {
+ packet: Yoke<mqtt_format::v5::packets::puback::MPuback<'static>, StableBytes>,
+}
+
+impl Puback {
+ pub(crate) fn get(&self) -> &mqtt_format::v5::packets::puback::MPuback<'_> {
+ self.packet.get()
+ }
+}
+
+impl TryFrom<MqttPacket> for Puback {
+ type Error = ();
+
+ fn try_from(value: MqttPacket) -> Result<Self, Self::Error> {
+ let packet = value.packet.try_map_project(|p, _| match p {
+ mqtt_format::v5::packets::MqttPacket::Puback(puback) => Ok(puback),
+ _ => Err(()),
+ })?;
+
+ Ok(Puback { packet })
+ }
+}