diff options
author | Marcel Müller <neikos@neikos.email> | 2024-04-05 18:19:46 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-05 18:19:46 +0200 |
commit | a2acf067d3cddf53559f476421fa8cc61cce460f (patch) | |
tree | 56ac42c6c0ee0b30164e36912c2e5404b29b0c9c | |
parent | 066dcd4856fb6738dbd29f51406fda3e0f2a76fd (diff) | |
parent | 0e48621239c86b51c5936125a42b8a2406c242a9 (diff) |
Handlers calls
-rw-r--r-- | cloudmqtt-bin/src/bin/client.rs | 6 | ||||
-rw-r--r-- | src/client/builder.rs | 6 | ||||
-rw-r--r-- | src/client/receive.rs | 16 | ||||
-rw-r--r-- | src/client/send.rs | 19 | ||||
-rw-r--r-- | src/packets/mod.rs | 2 | ||||
-rw-r--r-- | src/packets/puback.rs | 29 |
6 files changed, 54 insertions, 24 deletions
diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs index 8b044fe..e5f89f2 100644 --- a/cloudmqtt-bin/src/bin/client.rs +++ b/cloudmqtt-bin/src/bin/client.rs @@ -58,11 +58,7 @@ async fn main() { tracing::trace!(?packet, "Received packet") })) .with_handle_qos1_acknowledge(Box::new(|packet| { - async move { - tracing::trace!(?packet, "Acknowledging packet"); - cloudmqtt::client::send::Acknowledge::Yes - } - .boxed() + tracing::trace!(?packet, "Acknowledging packet"); })) .build() .await diff --git a/src/client/builder.rs b/src/client/builder.rs index e8629eb..038cdbd 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -10,8 +10,8 @@ use futures::lock::Mutex; use super::send::Callbacks; use super::send::ClientHandlers; -use super::send::HandleQos1AcknowledgeFn; use super::send::OnPacketRecvFn; +use super::send::OnQos1AcknowledgeFn; use super::InnerClient; use super::MqttClient; @@ -31,8 +31,8 @@ impl MqttClientBuilder { self } - pub fn with_handle_qos1_acknowledge(mut self, f: HandleQos1AcknowledgeFn) -> Self { - self.handlers.handle_qos1_acknowledge = f; + pub fn with_handle_qos1_acknowledge(mut self, f: OnQos1AcknowledgeFn) -> Self { + self.handlers.on_qos1_acknowledge = f; self } diff --git a/src/client/receive.rs b/src/client/receive.rs index 3312b1a..e08a3e9 100644 --- a/src/client/receive.rs +++ b/src/client/receive.rs @@ -46,6 +46,9 @@ pub(super) async fn handle_background_receiving( tracing::field::debug(packet.get().get_kind()), ); + tracing::trace!("Calling on_packet_recv() handler"); + (inner.lock().await.default_handlers.on_packet_recv)(packet.clone()); + match packet.get() { mqtt_format::v5::packets::MqttPacket::Auth(_) => todo!(), mqtt_format::v5::packets::MqttPacket::Disconnect(_) => todo!(), @@ -57,8 +60,8 @@ pub(super) async fn handle_background_receiving( .instrument(process_span) .await? } - mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => { - handle_puback(mpuback, &inner, &packet) + mqtt_format::v5::packets::MqttPacket::Puback(_mpuback) => { + handle_puback(&packet.try_into().unwrap(), &inner) .instrument(process_span) .await? } @@ -158,10 +161,13 @@ async fn handle_pubcomp( } async fn handle_puback( - mpuback: &mqtt_format::v5::packets::puback::MPuback<'_>, + puback: &crate::packets::Puback, inner: &Arc<Mutex<InnerClient>>, - packet: &MqttPacket, ) -> Result<(), ()> { + tracing::trace!("Calling on_qos1_acknowledge handler"); + (inner.lock().await.default_handlers.on_qos1_acknowledge)(puback.clone()); + let mpuback = puback.get(); + match mpuback.reason { mqtt_format::v5::packets::puback::PubackReasonCode::Success | mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => { @@ -184,7 +190,7 @@ async fn handle_puback( tracing::trace!("Removed packet id from outstanding packets"); if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) { - if let Err(_) = callback.on_acknowledge.send(packet.clone()) { + if let Err(_) = callback.on_acknowledge.send(puback.clone()) { tracing::trace!("Could not send ack, receiver was dropped.") } } diff --git a/src/client/send.rs b/src/client/send.rs index 161e130..204b77e 100644 --- a/src/client/send.rs +++ b/src/client/send.rs @@ -215,22 +215,19 @@ pub struct PacketIdentifierExhausted; pub(crate) struct ClientHandlers { pub(crate) on_packet_recv: OnPacketRecvFn, - pub(crate) handle_qos1_acknowledge: HandleQos1AcknowledgeFn, - // handle_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, - // handle_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, + pub(crate) on_qos1_acknowledge: OnQos1AcknowledgeFn, + // on_qos2_receive: Box<dyn Fn(crate::packets::MqttPacket) + Send>, + // on_qos2_complete: Box<dyn Fn(crate::packets::MqttPacket) + Send>, } -pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>; -pub type HandleQos1AcknowledgeFn = Box< - dyn for<'p> Fn(&'p crate::packets::MqttPacket) -> futures::future::BoxFuture<'p, Acknowledge> - + Send, ->; +pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>; +pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>; impl Default for ClientHandlers { fn default() -> Self { Self { on_packet_recv: Box::new(|_| ()), - handle_qos1_acknowledge: Box::new(|_| async move { Acknowledge::Yes }.boxed()), + on_qos1_acknowledge: Box::new(|_| ()), } } } @@ -301,7 +298,7 @@ impl Callbacks { } pub(crate) struct Qos1Callbacks { - pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>, + pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::Puback>, } pub(crate) struct Qos2ReceiveCallback { @@ -344,7 +341,7 @@ enum PublishedReceiver { } pub struct PublishedQos1 { - recv: futures::channel::oneshot::Receiver<MqttPacket>, + recv: futures::channel::oneshot::Receiver<crate::packets::Puback>, } impl PublishedQos1 { diff --git a/src/packets/mod.rs b/src/packets/mod.rs index 3b25e66..8e9e417 100644 --- a/src/packets/mod.rs +++ b/src/packets/mod.rs @@ -32,6 +32,8 @@ pub mod subscribe; pub mod unsuback; pub mod unsubscribe; +pub use self::puback::Puback; + #[derive(Debug, thiserror::Error)] #[error("Could not convert into the required packet type")] pub struct InvalidPacketType; diff --git a/src/packets/puback.rs b/src/packets/puback.rs index cc001cf..54fc20b 100644 --- a/src/packets/puback.rs +++ b/src/packets/puback.rs @@ -4,6 +4,11 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +use yoke::Yoke; + +use super::MqttPacket; +use super::StableBytes; + crate::properties::define_properties! { properties_type: mqtt_format::v5::packets::puback::PubackProperties, anker: "_Toc3901125", @@ -15,3 +20,27 @@ crate::properties::define_properties! { user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, } } + +#[derive(Clone, Debug)] +pub struct Puback { + packet: Yoke<mqtt_format::v5::packets::puback::MPuback<'static>, StableBytes>, +} + +impl Puback { + pub(crate) fn get(&self) -> &mqtt_format::v5::packets::puback::MPuback<'_> { + self.packet.get() + } +} + +impl TryFrom<MqttPacket> for Puback { + type Error = (); + + fn try_from(value: MqttPacket) -> Result<Self, Self::Error> { + let packet = value.packet.try_map_project(|p, _| match p { + mqtt_format::v5::packets::MqttPacket::Puback(puback) => Ok(puback), + _ => Err(()), + })?; + + Ok(Puback { packet }) + } +} |