diff options
author | Marcel Müller <neikos@neikos.email> | 2024-04-05 11:45:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-05 11:45:58 +0200 |
commit | 16dd1fe3465fbb90e640975b5aa6de0541d42090 (patch) | |
tree | c316ab04d1a28c236524d87371002018338ff592 | |
parent | c1dd330208fdc4899e9f655cc9d8a308bfc20a32 (diff) | |
parent | cb21213d2167f9dab759ea0d6ecddd8d4ef5262d (diff) |
Merge pull request #280 from matthiasbeyer/client-builder
Client builder
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | cloudmqtt-bin/Cargo.toml | 2 | ||||
-rw-r--r-- | cloudmqtt-bin/src/bin/client.rs | 16 | ||||
-rw-r--r-- | src/client/builder.rs | 54 | ||||
-rw-r--r-- | src/client/mod.rs | 10 | ||||
-rw-r--r-- | src/client/send.rs | 26 |
6 files changed, 100 insertions, 10 deletions
@@ -224,7 +224,9 @@ version = "0.1.0" dependencies = [ "clap", "cloudmqtt", + "futures", "tokio", + "tracing", "tracing-subscriber", ] diff --git a/cloudmqtt-bin/Cargo.toml b/cloudmqtt-bin/Cargo.toml index 97ec8a4..efc755b 100644 --- a/cloudmqtt-bin/Cargo.toml +++ b/cloudmqtt-bin/Cargo.toml @@ -8,5 +8,7 @@ edition = "2021" [dependencies] clap = { version = "4.5.4", features = ["derive"] } cloudmqtt = { version = "0.5.0", path = ".." } +futures = "0.3.30" tokio = { version = "1.37.0", features = ["full"] } +tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs index 43e9743..8b044fe 100644 --- a/cloudmqtt-bin/src/bin/client.rs +++ b/cloudmqtt-bin/src/bin/client.rs @@ -11,6 +11,7 @@ use cloudmqtt::client::connect::MqttClientConnector; use cloudmqtt::client::send::Publish; use cloudmqtt::client::MqttClient; use cloudmqtt::transport::MqttConnectTransport; +use futures::FutureExt; use tokio::net::TcpStream; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -52,7 +53,20 @@ async fn main() { cloudmqtt::keep_alive::KeepAlive::Seconds(5.try_into().unwrap()), ); - let client = MqttClient::new_with_default_handlers(); + let client = MqttClient::builder() + .with_on_packet_recv(Box::new(|packet| { + tracing::trace!(?packet, "Received packet") + })) + .with_handle_qos1_acknowledge(Box::new(|packet| { + async move { + tracing::trace!(?packet, "Acknowledging packet"); + cloudmqtt::client::send::Acknowledge::Yes + } + .boxed() + })) + .build() + .await + .unwrap(); let connected = client.connect(connector).await.unwrap(); let background = tokio::task::spawn(connected.background_task); diff --git a/src/client/builder.rs b/src/client/builder.rs new file mode 100644 index 0000000..e8629eb --- /dev/null +++ b/src/client/builder.rs @@ -0,0 +1,54 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use std::sync::Arc; + +use futures::lock::Mutex; + +use super::send::Callbacks; +use super::send::ClientHandlers; +use super::send::HandleQos1AcknowledgeFn; +use super::send::OnPacketRecvFn; +use super::InnerClient; +use super::MqttClient; + +pub struct MqttClientBuilder { + handlers: ClientHandlers, +} + +impl MqttClientBuilder { + pub(super) fn new() -> Self { + Self { + handlers: ClientHandlers::default(), + } + } + + pub fn with_on_packet_recv(mut self, f: OnPacketRecvFn) -> Self { + self.handlers.on_packet_recv = f; + self + } + + pub fn with_handle_qos1_acknowledge(mut self, f: HandleQos1AcknowledgeFn) -> Self { + self.handlers.handle_qos1_acknowledge = f; + self + } + + pub async fn build(self) -> Result<super::MqttClient, MqttClientBuilderError> { + Ok({ + MqttClient { + inner: Arc::new(Mutex::new(InnerClient { + connection_state: None, + session_state: None, + default_handlers: self.handlers, + outstanding_callbacks: Callbacks::new(), + })), + } + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum MqttClientBuilderError {} diff --git a/src/client/mod.rs b/src/client/mod.rs index 72780b3..8ff460f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -4,6 +4,7 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +pub mod builder; pub mod connect; mod receive; pub mod send; @@ -36,14 +37,15 @@ impl MqttClient { inner: Arc::new(Mutex::new(InnerClient { connection_state: None, session_state: None, - default_handlers: ClientHandlers { - on_packet_recv: Box::new(|_| ()), - handle_acknowledge: Box::new(|_| Acknowledge::Yes), - }, + default_handlers: ClientHandlers::default(), outstanding_callbacks: Callbacks::new(), })), } } + + pub fn builder() -> builder::MqttClientBuilder { + builder::MqttClientBuilder::new() + } } #[cfg(test)] diff --git a/src/client/send.rs b/src/client/send.rs index 1883a64..161e130 100644 --- a/src/client/send.rs +++ b/src/client/send.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::collections::VecDeque; +use futures::FutureExt; use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX; use mqtt_format::v5::packets::publish::MPublish; use tracing::Instrument; @@ -213,14 +214,29 @@ fn get_next_packet_ident( pub struct PacketIdentifierExhausted; pub(crate) struct ClientHandlers { - pub(crate) on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, - pub(crate) handle_acknowledge: Box<dyn Fn(&crate::packets::MqttPacket) -> Acknowledge + Send>, - // on_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, - // on_complete: Box<dyn Fn(&crate::packets::MqttPacket)+ Send>, + pub(crate) on_packet_recv: OnPacketRecvFn, + pub(crate) handle_qos1_acknowledge: HandleQos1AcknowledgeFn, + // handle_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, + // handle_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>, +} + +pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>; +pub type HandleQos1AcknowledgeFn = Box< + dyn for<'p> Fn(&'p crate::packets::MqttPacket) -> futures::future::BoxFuture<'p, Acknowledge> + + Send, +>; + +impl Default for ClientHandlers { + fn default() -> Self { + Self { + on_packet_recv: Box::new(|_| ()), + handle_qos1_acknowledge: Box::new(|_| async move { Acknowledge::Yes }.boxed()), + } + } } #[derive(Debug)] -pub(crate) enum Acknowledge { +pub enum Acknowledge { No, Yes, YesWithProps {}, |