diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2024-04-03 16:44:57 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2024-04-03 16:44:57 +0200 |
commit | e46acd35e2d2d4ad58223d8b280ac85f33dd85c9 (patch) | |
tree | 8f2b415e54fb515419390ef69ea563cc9fff7513 /src | |
parent | d2727bb05ff52539fe019f3d427e2463f4bc616f (diff) |
Add publishing abstraction
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 140 | ||||
-rw-r--r-- | src/lib.rs | 2 |
2 files changed, 137 insertions, 5 deletions
diff --git a/src/client.rs b/src/client.rs index c6cd911..4991a7f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -226,6 +226,8 @@ impl OutstandingPackets { struct InnerClient { connection_state: Option<ConnectState>, session_state: Option<SessionState>, + default_handlers: ClientHandlers, + outstanding_completions: std::collections::HashMap<Id, CallbackState>, } pub struct MqttClient { @@ -239,6 +241,11 @@ impl MqttClient { inner: Arc::new(Mutex::new(InnerClient { connection_state: None, session_state: None, + default_handlers: ClientHandlers { + on_packet_recv: Box::new(|_| ()), + handle_acknowledge: Box::new(|_| Acknowledge::Yes), + }, + outstanding_completions: std::collections::HashMap::new(), })), } } @@ -463,13 +470,16 @@ impl MqttClient { todo!() } - #[tracing::instrument(skip(self, payload), fields(payload_length = payload.as_ref().len()))] + #[tracing::instrument(skip_all, fields(payload_length = payload.as_ref().len()))] pub async fn publish( &self, - topic: crate::topic::MqttTopic, - qos: QualityOfService, - retain: bool, - payload: MqttPayload, + Publish { + topic, + qos, + retain, + payload, + on_packet_recv: _, + }: Publish, ) -> Result<(), ()> { let mut inner = self.inner.lock().await; let inner = &mut *inner; @@ -552,6 +562,50 @@ impl MqttClient { Ok(()) } + + pub async fn publish_qos1( + &self, + PublishQos1 { + topic, + retain, + payload, + on_packet_recv, + }: PublishQos1, + ) -> Result<(), ()> { + let _res = self + .publish(Publish { + topic, + qos: QualityOfService::AtMostOnce, + retain, + payload, + on_packet_recv, + }) + .await?; + + Ok(()) // TODO + } + + pub async fn publish_qos2( + &self, + PublishQos2 { + topic, + retain, + payload, + on_packet_recv, + }: PublishQos2, + ) -> Result<(), ()> { + let _res = self + .publish(Publish { + topic, + qos: QualityOfService::ExactlyOnce, + retain, + payload, + on_packet_recv, + }) + .await?; + + Ok(()) // TODO + } } fn get_next_packet_ident( @@ -588,9 +642,85 @@ pub struct Connected { #[error("No free packet identifiers available")] pub struct PacketIdentifierExhausted; +struct ClientHandlers { + on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>, + handle_acknowledge: Box<dyn Fn(&crate::packets::MqttPacket) -> Acknowledge + Send>, + // on_receive: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>, + // on_complete: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>, +} + +#[derive(Debug)] +enum Acknowledge { + No, + Yes, + YesWithProps {}, +} + +#[derive(Hash, PartialEq)] +struct Id; +enum CallbackState { + Qos1 { + on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>, + }, + Qos2 { + on_receive: futures::channel::oneshot::Sender<crate::packets::MqttPacket>, + on_complete: futures::channel::oneshot::Sender<crate::packets::MqttPacket>, + }, +} + +pub struct Publish { + pub topic: crate::topic::MqttTopic, + pub qos: QualityOfService, + pub retain: bool, + pub payload: MqttPayload, + on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>>, +} + +enum PublishReceiver { + None, + Once(PublishQos1), + Twice(PublishQos2), +} + +pub struct PublishQos1 { + pub topic: crate::topic::MqttTopic, + pub retain: bool, + pub payload: MqttPayload, + on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>>, +} + +impl PublishQos1 { + pub fn with_on_packet_recv( + mut self, + on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>, + ) -> Self { + self.on_packet_recv = Some(on_packet_recv); + self + } +} + +pub struct PublishQos2 { + pub topic: crate::topic::MqttTopic, + pub retain: bool, + pub payload: MqttPayload, + on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>>, +} + +impl PublishQos2 { + pub fn with_on_packet_recv( + mut self, + on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>, + ) -> Self { + self.on_packet_recv = Some(on_packet_recv); + self + } +} + #[cfg(test)] mod tests { + use crate::client::ClientHandlers; use crate::client::MqttClient; static_assertions::assert_impl_all!(MqttClient: Send, Sync); + static_assertions::assert_impl_all!(ClientHandlers: Send); } @@ -4,6 +4,8 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +#![deny(clippy::future_not_send)] + pub mod bytes; pub mod client; pub mod client_identifier; |