diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client/builder.rs | 54 | ||||
-rw-r--r-- | src/client/mod.rs | 10 | ||||
-rw-r--r-- | src/client/send.rs | 26 |
3 files changed, 81 insertions, 9 deletions
diff --git a/src/client/builder.rs b/src/client/builder.rs new file mode 100644 index 0000000..e8629eb --- /dev/null +++ b/src/client/builder.rs @@ -0,0 +1,54 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use std::sync::Arc; + +use futures::lock::Mutex; + +use super::send::Callbacks; +use super::send::ClientHandlers; +use super::send::HandleQos1AcknowledgeFn; +use super::send::OnPacketRecvFn; +use super::InnerClient; +use super::MqttClient; + +pub struct MqttClientBuilder { + handlers: ClientHandlers, +} + +impl MqttClientBuilder { + pub(super) fn new() -> Self { + Self { + handlers: ClientHandlers::default(), + } + } + + pub fn with_on_packet_recv(mut self, f: OnPacketRecvFn) -> Self { + self.handlers.on_packet_recv = f; + self + } + + pub fn with_handle_qos1_acknowledge(mut self, f: HandleQos1AcknowledgeFn) -> Self { + self.handlers.handle_qos1_acknowledge = f; + self + } + + pub async fn build(self) -> Result<super::MqttClient, MqttClientBuilderError> { + Ok({ + MqttClient { + inner: Arc::new(Mutex::new(InnerClient { + connection_state: None, + session_state: None, + default_handlers: self.handlers, + outstanding_callbacks: Callbacks::new(), + })), + } + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum MqttClientBuilderError {} diff --git a/src/client/mod.rs b/src/client/mod.rs index 72780b3..8ff460f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -4,6 +4,7 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +pub mod builder; pub mod connect; mod receive; pub mod send; @@ -36,14 +37,15 @@ impl MqttClient { inner: Arc::new(Mutex::new(InnerClient { connection_state: None, session_state: None, - default_handlers: ClientHandlers { - on_packet_recv: Box::new(|_| ()), - handle_acknowledge: Box::new(|_| Acknowledge::Yes), - }, + default_handlers: ClientHandlers::default(), outstanding_callbacks: Callbacks::new(), })), } } + + pub fn builder() -> builder::MqttClientBuilder { + builder::MqttClientBuilder::new() + } } #[cfg(test)] diff --git a/src/client/send.rs b/src/client/send.rs index 1883a64..161e130 100644 --- a/src/client/send.rs +++ b/src/client/send.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::collections::VecDeque; +use futures::FutureExt; use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX; use mqtt_format::v5::packets::publish::MPublish; use tracing::Instrument; @@ -213,14 +214,29 @@ fn get_next_packet_ident( pub struct PacketIdentifierExhausted; pub(crate) struct ClientHandlers { - pub(crate) on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, - pub(crate) handle_acknowledge: Box<dyn Fn(&crate::packets::MqttPacket) -> Acknowledge + Send>, - // on_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, - // on_complete: Box<dyn Fn(&crate::packets::MqttPacket)+ Send>, + pub(crate) on_packet_recv: OnPacketRecvFn, + pub(crate) handle_qos1_acknowledge: HandleQos1AcknowledgeFn, + // handle_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, + // handle_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, +} + +pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>; +pub type HandleQos1AcknowledgeFn = Box< + dyn for<'p> Fn(&'p crate::packets::MqttPacket) -> futures::future::BoxFuture<'p, Acknowledge> + + Send, +>; + +impl Default for ClientHandlers { + fn default() -> Self { + Self { + on_packet_recv: Box::new(|_| ()), + handle_qos1_acknowledge: Box::new(|_| async move { Acknowledge::Yes }.boxed()), + } + } } #[derive(Debug)] -pub(crate) enum Acknowledge { +pub enum Acknowledge { No, Yes, YesWithProps {}, |