summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2024-04-05 11:45:58 +0200
committerGitHub <noreply@github.com>2024-04-05 11:45:58 +0200
commit16dd1fe3465fbb90e640975b5aa6de0541d42090 (patch)
treec316ab04d1a28c236524d87371002018338ff592
parentc1dd330208fdc4899e9f655cc9d8a308bfc20a32 (diff)
parentcb21213d2167f9dab759ea0d6ecddd8d4ef5262d (diff)
Merge pull request #280 from matthiasbeyer/client-builder
Client builder
-rw-r--r--Cargo.lock2
-rw-r--r--cloudmqtt-bin/Cargo.toml2
-rw-r--r--cloudmqtt-bin/src/bin/client.rs16
-rw-r--r--src/client/builder.rs54
-rw-r--r--src/client/mod.rs10
-rw-r--r--src/client/send.rs26
6 files changed, 100 insertions, 10 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e08663d..d063591 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -224,7 +224,9 @@ version = "0.1.0"
dependencies = [
"clap",
"cloudmqtt",
+ "futures",
"tokio",
+ "tracing",
"tracing-subscriber",
]
diff --git a/cloudmqtt-bin/Cargo.toml b/cloudmqtt-bin/Cargo.toml
index 97ec8a4..efc755b 100644
--- a/cloudmqtt-bin/Cargo.toml
+++ b/cloudmqtt-bin/Cargo.toml
@@ -8,5 +8,7 @@ edition = "2021"
[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
cloudmqtt = { version = "0.5.0", path = ".." }
+futures = "0.3.30"
tokio = { version = "1.37.0", features = ["full"] }
+tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs
index 43e9743..8b044fe 100644
--- a/cloudmqtt-bin/src/bin/client.rs
+++ b/cloudmqtt-bin/src/bin/client.rs
@@ -11,6 +11,7 @@ use cloudmqtt::client::connect::MqttClientConnector;
use cloudmqtt::client::send::Publish;
use cloudmqtt::client::MqttClient;
use cloudmqtt::transport::MqttConnectTransport;
+use futures::FutureExt;
use tokio::net::TcpStream;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -52,7 +53,20 @@ async fn main() {
cloudmqtt::keep_alive::KeepAlive::Seconds(5.try_into().unwrap()),
);
- let client = MqttClient::new_with_default_handlers();
+ let client = MqttClient::builder()
+ .with_on_packet_recv(Box::new(|packet| {
+ tracing::trace!(?packet, "Received packet")
+ }))
+ .with_handle_qos1_acknowledge(Box::new(|packet| {
+ async move {
+ tracing::trace!(?packet, "Acknowledging packet");
+ cloudmqtt::client::send::Acknowledge::Yes
+ }
+ .boxed()
+ }))
+ .build()
+ .await
+ .unwrap();
let connected = client.connect(connector).await.unwrap();
let background = tokio::task::spawn(connected.background_task);
diff --git a/src/client/builder.rs b/src/client/builder.rs
new file mode 100644
index 0000000..e8629eb
--- /dev/null
+++ b/src/client/builder.rs
@@ -0,0 +1,54 @@
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+//
+
+use std::sync::Arc;
+
+use futures::lock::Mutex;
+
+use super::send::Callbacks;
+use super::send::ClientHandlers;
+use super::send::HandleQos1AcknowledgeFn;
+use super::send::OnPacketRecvFn;
+use super::InnerClient;
+use super::MqttClient;
+
+pub struct MqttClientBuilder {
+ handlers: ClientHandlers,
+}
+
+impl MqttClientBuilder {
+ pub(super) fn new() -> Self {
+ Self {
+ handlers: ClientHandlers::default(),
+ }
+ }
+
+ pub fn with_on_packet_recv(mut self, f: OnPacketRecvFn) -> Self {
+ self.handlers.on_packet_recv = f;
+ self
+ }
+
+ pub fn with_handle_qos1_acknowledge(mut self, f: HandleQos1AcknowledgeFn) -> Self {
+ self.handlers.handle_qos1_acknowledge = f;
+ self
+ }
+
+ pub async fn build(self) -> Result<super::MqttClient, MqttClientBuilderError> {
+ Ok({
+ MqttClient {
+ inner: Arc::new(Mutex::new(InnerClient {
+ connection_state: None,
+ session_state: None,
+ default_handlers: self.handlers,
+ outstanding_callbacks: Callbacks::new(),
+ })),
+ }
+ })
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum MqttClientBuilderError {}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 72780b3..8ff460f 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -4,6 +4,7 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
+pub mod builder;
pub mod connect;
mod receive;
pub mod send;
@@ -36,14 +37,15 @@ impl MqttClient {
inner: Arc::new(Mutex::new(InnerClient {
connection_state: None,
session_state: None,
- default_handlers: ClientHandlers {
- on_packet_recv: Box::new(|_| ()),
- handle_acknowledge: Box::new(|_| Acknowledge::Yes),
- },
+ default_handlers: ClientHandlers::default(),
outstanding_callbacks: Callbacks::new(),
})),
}
}
+
+ pub fn builder() -> builder::MqttClientBuilder {
+ builder::MqttClientBuilder::new()
+ }
}
#[cfg(test)]
diff --git a/src/client/send.rs b/src/client/send.rs
index 1883a64..161e130 100644
--- a/src/client/send.rs
+++ b/src/client/send.rs
@@ -7,6 +7,7 @@
use std::collections::HashMap;
use std::collections::VecDeque;
+use futures::FutureExt;
use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX;
use mqtt_format::v5::packets::publish::MPublish;
use tracing::Instrument;
@@ -213,14 +214,29 @@ fn get_next_packet_ident(
pub struct PacketIdentifierExhausted;
pub(crate) struct ClientHandlers {
- pub(crate) on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
- pub(crate) handle_acknowledge: Box<dyn Fn(&crate::packets::MqttPacket) -> Acknowledge + Send>,
- // on_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
- // on_complete: Box<dyn Fn(&crate::packets::MqttPacket)+ Send>,
+ pub(crate) on_packet_recv: OnPacketRecvFn,
+ pub(crate) handle_qos1_acknowledge: HandleQos1AcknowledgeFn,
+ // handle_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
+ // handle_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
+}
+
+pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
+pub type HandleQos1AcknowledgeFn = Box<
+ dyn for<'p> Fn(&'p crate::packets::MqttPacket) -> futures::future::BoxFuture<'p, Acknowledge>
+ + Send,
+>;
+
+impl Default for ClientHandlers {
+ fn default() -> Self {
+ Self {
+ on_packet_recv: Box::new(|_| ()),
+ handle_qos1_acknowledge: Box::new(|_| async move { Acknowledge::Yes }.boxed()),
+ }
+ }
}
#[derive(Debug)]
-pub(crate) enum Acknowledge {
+pub enum Acknowledge {
No,
Yes,
YesWithProps {},