summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2024-04-03 16:44:57 +0200
committerMatthias Beyer <mail@beyermatthias.de>2024-04-03 16:44:57 +0200
commite46acd35e2d2d4ad58223d8b280ac85f33dd85c9 (patch)
tree8f2b415e54fb515419390ef69ea563cc9fff7513 /src
parentd2727bb05ff52539fe019f3d427e2463f4bc616f (diff)
Add publishing abstraction
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r--src/client.rs140
-rw-r--r--src/lib.rs2
2 files changed, 137 insertions, 5 deletions
diff --git a/src/client.rs b/src/client.rs
index c6cd911..4991a7f 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -226,6 +226,8 @@ impl OutstandingPackets {
struct InnerClient {
connection_state: Option<ConnectState>,
session_state: Option<SessionState>,
+ default_handlers: ClientHandlers,
+ outstanding_completions: std::collections::HashMap<Id, CallbackState>,
}
pub struct MqttClient {
@@ -239,6 +241,11 @@ impl MqttClient {
inner: Arc::new(Mutex::new(InnerClient {
connection_state: None,
session_state: None,
+ default_handlers: ClientHandlers {
+ on_packet_recv: Box::new(|_| ()),
+ handle_acknowledge: Box::new(|_| Acknowledge::Yes),
+ },
+ outstanding_completions: std::collections::HashMap::new(),
})),
}
}
@@ -463,13 +470,16 @@ impl MqttClient {
todo!()
}
- #[tracing::instrument(skip(self, payload), fields(payload_length = payload.as_ref().len()))]
+ #[tracing::instrument(skip_all, fields(payload_length = payload.as_ref().len()))]
pub async fn publish(
&self,
- topic: crate::topic::MqttTopic,
- qos: QualityOfService,
- retain: bool,
- payload: MqttPayload,
+ Publish {
+ topic,
+ qos,
+ retain,
+ payload,
+ on_packet_recv: _,
+ }: Publish,
) -> Result<(), ()> {
let mut inner = self.inner.lock().await;
let inner = &mut *inner;
@@ -552,6 +562,50 @@ impl MqttClient {
Ok(())
}
+
+ pub async fn publish_qos1(
+ &self,
+ PublishQos1 {
+ topic,
+ retain,
+ payload,
+ on_packet_recv,
+ }: PublishQos1,
+ ) -> Result<(), ()> {
+ let _res = self
+ .publish(Publish {
+ topic,
+ qos: QualityOfService::AtMostOnce,
+ retain,
+ payload,
+ on_packet_recv,
+ })
+ .await?;
+
+ Ok(()) // TODO
+ }
+
+ pub async fn publish_qos2(
+ &self,
+ PublishQos2 {
+ topic,
+ retain,
+ payload,
+ on_packet_recv,
+ }: PublishQos2,
+ ) -> Result<(), ()> {
+ let _res = self
+ .publish(Publish {
+ topic,
+ qos: QualityOfService::ExactlyOnce,
+ retain,
+ payload,
+ on_packet_recv,
+ })
+ .await?;
+
+ Ok(()) // TODO
+ }
}
fn get_next_packet_ident(
@@ -588,9 +642,85 @@ pub struct Connected {
#[error("No free packet identifiers available")]
pub struct PacketIdentifierExhausted;
+struct ClientHandlers {
+ on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>,
+ handle_acknowledge: Box<dyn Fn(&crate::packets::MqttPacket) -> Acknowledge + Send>,
+ // on_receive: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>,
+ // on_complete: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>,
+}
+
+#[derive(Debug)]
+enum Acknowledge {
+ No,
+ Yes,
+ YesWithProps {},
+}
+
+#[derive(Hash, PartialEq)]
+struct Id;
+enum CallbackState {
+ Qos1 {
+ on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>,
+ },
+ Qos2 {
+ on_receive: futures::channel::oneshot::Sender<crate::packets::MqttPacket>,
+ on_complete: futures::channel::oneshot::Sender<crate::packets::MqttPacket>,
+ },
+}
+
+pub struct Publish {
+ pub topic: crate::topic::MqttTopic,
+ pub qos: QualityOfService,
+ pub retain: bool,
+ pub payload: MqttPayload,
+ on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>>,
+}
+
+enum PublishReceiver {
+ None,
+ Once(PublishQos1),
+ Twice(PublishQos2),
+}
+
+pub struct PublishQos1 {
+ pub topic: crate::topic::MqttTopic,
+ pub retain: bool,
+ pub payload: MqttPayload,
+ on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>>,
+}
+
+impl PublishQos1 {
+ pub fn with_on_packet_recv(
+ mut self,
+ on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>,
+ ) -> Self {
+ self.on_packet_recv = Some(on_packet_recv);
+ self
+ }
+}
+
+pub struct PublishQos2 {
+ pub topic: crate::topic::MqttTopic,
+ pub retain: bool,
+ pub payload: MqttPayload,
+ on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>>,
+}
+
+impl PublishQos2 {
+ pub fn with_on_packet_recv(
+ mut self,
+ on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) -> () + Send>,
+ ) -> Self {
+ self.on_packet_recv = Some(on_packet_recv);
+ self
+ }
+}
+
#[cfg(test)]
mod tests {
+ use crate::client::ClientHandlers;
use crate::client::MqttClient;
static_assertions::assert_impl_all!(MqttClient: Send, Sync);
+ static_assertions::assert_impl_all!(ClientHandlers: Send);
}
diff --git a/src/lib.rs b/src/lib.rs
index abfe823..929827c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,6 +4,8 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
+#![deny(clippy::future_not_send)]
+
pub mod bytes;
pub mod client;
pub mod client_identifier;