summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2024-04-04 11:58:00 +0200
committerGitHub <noreply@github.com>2024-04-04 11:58:00 +0200
commitae07b4c04a27be8e21e0da36a45e80ed6b9ce498 (patch)
treea8a49b8e122b18c5422e600c9cba2fd146f4e4e4
parentd2727bb05ff52539fe019f3d427e2463f4bc616f (diff)
parent654587ad4efc2b0071826776c41064a96ea3754e (diff)
Merge pull request #271 from TheNeikos/publish-abstraction
Publish abstraction
-rw-r--r--Cargo.lock95
-rw-r--r--Cargo.toml3
-rw-r--r--cloudmqtt-bin/src/bin/client.rs27
-rw-r--r--mqtt-format/src/v5/packets/puback.rs30
-rw-r--r--mqtt-format/src/v5/packets/pubcomp.rs15
-rw-r--r--mqtt-format/src/v5/packets/pubrec.rs15
-rw-r--r--mqtt-format/src/v5/packets/pubrel.rs30
-rw-r--r--src/client.rs596
-rw-r--r--src/client/connect.rs287
-rw-r--r--src/client/mod.rs57
-rw-r--r--src/client/receive.rs322
-rw-r--r--src/client/send.rs394
-rw-r--r--src/client/state.rs100
-rw-r--r--src/lib.rs2
-rw-r--r--src/qos.rs2
15 files changed, 1333 insertions, 642 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ee96ca2..696e83e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -28,6 +28,21 @@ dependencies = [
[[package]]
name = "anstream"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163"
+dependencies = [
+ "anstyle",
+ "anstyle-parse",
+ "anstyle-query",
+ "anstyle-wincon 1.0.2",
+ "colorchoice",
+ "is-terminal",
+ "utf8parse",
+]
+
+[[package]]
+name = "anstream"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb"
@@ -35,7 +50,7 @@ dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
- "anstyle-wincon",
+ "anstyle-wincon 3.0.2",
"colorchoice",
"utf8parse",
]
@@ -66,6 +81,16 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c"
+dependencies = [
+ "anstyle",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
+name = "anstyle-wincon"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
@@ -150,7 +175,7 @@ version = "4.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4"
dependencies = [
- "anstream",
+ "anstream 0.6.13",
"anstyle",
"clap_lex",
"strsim",
@@ -221,6 +246,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
+name = "errno"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -350,6 +385,28 @@ dependencies = [
]
[[package]]
+name = "io-lifetimes"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
+name = "is-terminal"
+version = "0.4.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
name = "joinery"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -368,6 +425,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
+name = "linux-raw-sys"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
+
+[[package]]
name = "lock_api"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -663,6 +726,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
+name = "rustix"
+version = "0.37.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2"
+dependencies = [
+ "bitflags",
+ "errno",
+ "io-lifetimes",
+ "libc",
+ "linux-raw-sys",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -772,6 +849,16 @@ dependencies = [
]
[[package]]
+name = "terminal_size"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237"
+dependencies = [
+ "rustix",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
name = "thiserror"
version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1137,7 +1224,11 @@ version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8"
dependencies = [
+ "anstream 0.3.2",
+ "anstyle",
+ "is-terminal",
"memchr",
+ "terminal_size",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index ca4a54a..19041a3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,9 @@ categories = ["embedded"]
[workspace]
members = ["cloudmqtt-bin", "mqtt-format"]
+[features]
+debug = ["winnow/debug"]
+
[dependencies]
futures = "0.3.30"
mqtt-format = { version = "0.5.0", path = "mqtt-format", features = [
diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs
index 13ee843..7537cc3 100644
--- a/cloudmqtt-bin/src/bin/client.rs
+++ b/cloudmqtt-bin/src/bin/client.rs
@@ -5,8 +5,9 @@
//
use clap::Parser;
+use cloudmqtt::client::connect::MqttClientConnector;
+use cloudmqtt::client::send::Publish;
use cloudmqtt::client::MqttClient;
-use cloudmqtt::client::MqttClientConnector;
use cloudmqtt::transport::MqttConnectTransport;
use tokio::net::TcpStream;
use tracing_subscriber::layer::SubscriberExt;
@@ -45,24 +46,28 @@ async fn main() {
let connector = MqttClientConnector::new(
connection,
client_id,
- cloudmqtt::client::CleanStart::Yes,
+ cloudmqtt::client::connect::CleanStart::Yes,
cloudmqtt::keep_alive::KeepAlive::Disabled,
);
- let client = MqttClient::new();
+ let client = MqttClient::new_with_default_handlers();
let connected = client.connect(connector).await.unwrap();
let background = tokio::task::spawn(connected.background_task);
client
- .publish(
- "foo/bar".try_into().unwrap(),
- cloudmqtt::qos::QualityOfService::AtLeastOnce,
- false,
- vec![123].try_into().unwrap(),
- )
+ .publish(Publish {
+ topic: "foo/bar".try_into().unwrap(),
+ qos: cloudmqtt::qos::QualityOfService::ExactlyOnce,
+ retain: false,
+ payload: vec![123].try_into().unwrap(),
+ on_packet_recv: None,
+ })
.await
- .unwrap();
+ .unwrap()
+ .acknowledged()
+ .await;
+
+ client.ping().await.unwrap().response().await;
- let _ = background.await;
println!("Sent message! Bye");
}
diff --git a/mqtt-format/src/v5/packets/puback.rs b/mqtt-format/src/v5/packets/puback.rs
index 5007147..d2f86b2 100644
--- a/mqtt-format/src/v5/packets/puback.rs
+++ b/mqtt-format/src/v5/packets/puback.rs
@@ -55,21 +55,23 @@ impl<'i> MPuback<'i> {
winnow::combinator::trace("MPuback", |input: &mut &'i Bytes| {
let packet_identifier = PacketIdentifier::parse(input)?;
- if input.is_empty() {
- Ok(Self {
- packet_identifier,
- reason: PubackReasonCode::Success,
- properties: PubackProperties::new(),
- })
+ let reason = if input.is_empty() {
+ PubackReasonCode::Success
} else {
- let reason = PubackReasonCode::parse(input)?;
- let properties = PubackProperties::parse(input)?;
- Ok(Self {
- packet_identifier,
- reason,
- properties,
- })
- }
+ PubackReasonCode::parse(input)?
+ };
+
+ let properties = if input.is_empty() {
+ PubackProperties::new()
+ } else {
+ PubackProperties::parse(input)?
+ };
+
+ Ok(Self {
+ packet_identifier,
+ reason,
+ properties,
+ })
})
.parse_next(input)
}
diff --git a/mqtt-format/src/v5/packets/pubcomp.rs b/mqtt-format/src/v5/packets/pubcomp.rs
index cac4069..c8fe2b0 100644
--- a/mqtt-format/src/v5/packets/pubcomp.rs
+++ b/mqtt-format/src/v5/packets/pubcomp.rs
@@ -46,8 +46,19 @@ impl<'i> MPubcomp<'i> {
pub fn parse(input: &mut &'i Bytes) -> MResult<Self> {
winnow::combinator::trace("MPubcomp", |input: &mut &'i Bytes| {
let packet_identifier = PacketIdentifier::parse(input)?;
- let reason = PubcompReasonCode::parse(input)?;
- let properties = PubcompProperties::parse(input)?;
+
+ let reason = if input.is_empty() {
+ PubcompReasonCode::Success
+ } else {
+ PubcompReasonCode::parse(input)?
+ };
+
+ let properties = if input.is_empty() {
+ PubcompProperties::new()
+ } else {
+ PubcompProperties::parse(input)?
+ };
+
Ok(Self {
packet_identifier,
reason,
diff --git a/mqtt-format/src/v5/packets/pubrec.rs b/mqtt-format/src/v5/packets/pubrec.rs
index 9af6a14..655219d 100644
--- a/mqtt-format/src/v5/packets/pubrec.rs
+++ b/mqtt-format/src/v5/packets/pubrec.rs
@@ -54,8 +54,19 @@ impl<'i> MPubrec<'i> {
pub fn parse(input: &mut &'i Bytes) -> MResult<Self> {
winnow::combinator::trace("MPubrec", |input: &mut &'i Bytes| {
let packet_identifier = PacketIdentifier::parse(input)?;
- let reason = PubrecReasonCode::parse(input)?;
- let properties = PubrecProperties::parse(input)?;
+
+ let reason = if input.is_empty() {
+ PubrecReasonCode::Success
+ } else {
+ PubrecReasonCode::parse(input)?
+ };
+
+ let properties = if input.is_empty() {
+ PubrecProperties::new()
+ } else {
+ PubrecProperties::parse(input)?
+ };
+
Ok(Self {
packet_identifier,
reason,
diff --git a/mqtt-format/src/v5/packets/pubrel.rs b/mqtt-format/src/v5/packets/pubrel.rs
index 67e7374..d646c71 100644
--- a/mqtt-format/src/v5/packets/pubrel.rs
+++ b/mqtt-format/src/v5/packets/pubrel.rs
@@ -48,21 +48,23 @@ impl<'i> MPubrel<'i> {
winnow::combinator::trace("MPubrel", |input: &mut &'i Bytes| {
let packet_identifier = PacketIdentifier::parse(input)?;
- if input.is_empty() {
- Ok(Self {
- packet_identifier,
- reason: PubrelReasonCode::Success,
- properties: PubrelProperties::new(),
- })
+ let reason = if input.is_empty() {
+ PubrelReasonCode::Success
} else {
- let reason = PubrelReasonCode::parse(input)?;
- let properties = PubrelProperties::parse(input)?;
- Ok(Self {
- packet_identifier,
- reason,
- properties,
- })
- }
+ PubrelReasonCode::parse(input)?
+ };
+
+ let properties = if input.is_empty() {
+ PubrelProperties::new()
+ } else {
+ PubrelProperties::parse(input)?
+ };
+
+ Ok(Self {
+ packet_identifier,
+ reason,
+ properties,
+ })
})
.parse_next(input)
}
diff --git a/src/client.rs b/src/client.rs
deleted file mode 100644
index c6cd911..0000000
--- a/src/client.rs
+++ /dev/null
@@ -1,596 +0,0 @@
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-//
-
-use std::num::NonZeroU16;
-use std::sync::Arc;
-
-use futures::lock::Mutex;
-use futures::FutureExt;
-use futures::SinkExt;
-use futures::StreamExt;
-use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX;
-use mqtt_format::v5::packets::publish::MPublish;
-use tokio_util::codec::FramedRead;
-use tokio_util::codec::FramedWrite;
-use tracing::Instrument;
-
-use crate::bytes::MqttBytes;
-use crate::client_identifier::ProposedClientIdentifier;
-use crate::codecs::MqttPacketCodec;
-use crate::codecs::MqttPacketCodecError;
-use crate::keep_alive::KeepAlive;
-use crate::packets::connack::ConnackPropertiesView;
-use crate::payload::MqttPayload;
-use crate::qos::QualityOfService;
-use crate::string::MqttString;
-use crate::transport::MqttConnectTransport;
-use crate::transport::MqttConnection;
-
-#[derive(Debug, PartialEq, Eq)]
-pub enum CleanStart {
- No,
- Yes,
-}
-
-impl CleanStart {
- pub fn as_bool(&self) -> bool {
- match self {
- CleanStart::No => false,
- CleanStart::Yes => true,
- }
- }
-}
-
-#[derive(typed_builder::TypedBuilder)]
-pub struct MqttWill {
- #[builder(default = crate::packets::connect::ConnectWillProperties::new())]
- properties: crate::packets::connect::ConnectWillProperties,
- topic: MqttString,
- payload: MqttBytes,
- qos: mqtt_format::v5::qos::QualityOfService,
- retain: bool,
-}
-
-impl MqttWill {
- pub fn get_properties_mut(&mut self) -> &mut crate::packets::connect::ConnectWillProperties {
- &mut self.properties
- }
-}
-
-impl MqttWill {
- fn as_ref(&self) -> mqtt_format::v5::packets::connect::Will<'_> {
- mqtt_format::v5::packets::connect::Will {
- properties: self.properties.as_ref(),
- topic: self.topic.as_ref(),
- payload: self.payload.as_ref(),
- will_qos: self.qos,
- will_retain: self.retain,
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum MqttClientConnectError {
- #[error("An error occured while encoding or sending an MQTT Packet")]
- Send(#[source] MqttPacketCodecError),
-
- #[error("An error occured while decoding or receiving an MQTT Packet")]
- Receive(#[source] MqttPacketCodecError),
-
- #[error("The transport unexpectedly closed")]
- TransportUnexpectedlyClosed,
-
- #[error("The server sent a response with a protocol error: {reason}")]
- ServerProtocolError { reason: &'static str },
-}
-
-pub struct MqttClientConnector {
- transport: MqttConnectTransport,
- client_identifier: ProposedClientIdentifier,
- clean_start: CleanStart,
- keep_alive: KeepAlive,
- properties: crate::packets::connect::ConnectProperties,
- username: Option<MqttString>,
- password: Option<MqttBytes>,
- will: Option<MqttWill>,
-}
-
-impl MqttClientConnector {
- pub fn new(
- transport: MqttConnectTransport,
- client_identifier: ProposedClientIdentifier,
- clean_start: CleanStart,
- keep_alive: KeepAlive,
- ) -> MqttClientConnector {
- MqttClientConnector {
- transport,
- client_identifier,
- clean_start,
- keep_alive,
- properties: crate::packets::connect::ConnectProperties::new(),
- username: None,
- password: None,
- will: None,
- }
- }
-
- pub fn with_username(&mut self, username: MqttString) -> &mut Self {
- self.username = Some(username);
- self
- }
-
- pub fn with_password(&mut self, password: MqttBytes) -> &mut Self {
- self.password = Some(password);
- self
- }
-
- pub fn with_will(&mut self, will: MqttWill) -> &mut Self {
- self.will = Some(will);
- self
- }
-
- pub fn properties_mut(&mut self) -> &mut crate::packets::connect::ConnectProperties {
- &mut self.properties
- }
-}
-
-struct ConnectState {
- session_present: bool,
- receive_maximum: Option<NonZeroU16>,
- maximum_qos: Option<mqtt_format::v5::qos::MaximumQualityOfService>,
- retain_available: Option<bool>,
- topic_alias_maximum: Option<u16>,
- maximum_packet_size: Option<u32>,
- conn_write: FramedWrite<tokio::io::WriteHalf<MqttConnection>, MqttPacketCodec>,
-
- conn_read_recv: futures::channel::oneshot::Receiver<
- FramedRead<tokio::io::ReadHalf<MqttConnection>, MqttPacketCodec>,
- >,
-
- next_packet_identifier: std::num::NonZeroU16,
-}
-
-struct SessionState {
- client_identifier: MqttString,
- outstanding_packets: OutstandingPackets,
-}
-
-struct OutstandingPackets {
- packet_ident_order: Vec<std::num::NonZeroU16>,
- outstanding_packets:
- std::collections::BTreeMap<std::num::NonZeroU16, crate::packets::MqttPacket>,
-}
-
-impl OutstandingPackets {
- pub fn empty() -> Self {
- Self {
- packet_ident_order: Vec::new(),
- outstanding_packets: std::collections::BTreeMap::new(),
- }
- }
-
- pub fn insert(&mut self, ident: std::num::NonZeroU16, packet: crate::packets::MqttPacket) {
- debug_assert_eq!(
- self.packet_ident_order.len(),
- self.outstanding_packets.len()
- );
-
- self.packet_ident_order.push(ident);
- let removed = self.outstanding_packets.insert(ident, packet);
-
- debug_assert!(removed.is_none());
- }
-
- pub fn update_by_id(
- &mut self,
- ident: std::num::NonZeroU16,
- packet: crate::packets::MqttPacket,
- ) {
- debug_assert_eq!(
- self.packet_ident_order.len(),
- self.outstanding_packets.len()
- );
-
- let removed = self.outstanding_packets.insert(ident, packet);
-
- debug_assert!(removed.is_some());
- }
-
- pub fn exists_outstanding_packet(&self, ident: std::num::NonZeroU16) -> bool {
- self.outstanding_packets.contains_key(&ident)
- }
-
- pub fn iter_in_send_order(
- &self,
- ) -> impl Iterator<Item = (std::num::NonZeroU16, &crate::packets::MqttPacket)> {
- self.packet_ident_order
- .iter()
- .flat_map(|id| self.outstanding_packets.get(id).map(|p| (*id, p)))
- }
-
- pub fn remove_by_id(&mut self, id: std::num::NonZeroU16) {
- // Vec::retain() preserves order
- self.packet_ident_order.retain(|&elm| elm != id);
- self.outstanding_packets.remove(&id);
-
- debug_assert_eq!(
- self.packet_ident_order.len(),
- self.outstanding_packets.len()
- );
- }
-}
-
-struct InnerClient {
- connection_state: Option<ConnectState>,
- session_state: Option<SessionState>,
-}
-
-pub struct MqttClient {
- inner: Arc<Mutex<InnerClient>>,
-}
-
-impl MqttClient {
- #[allow(clippy::new_without_default)]
- pub fn new() -> MqttClient {
- MqttClient {
- inner: Arc::new(Mutex::new(InnerClient {
- connection_state: None,
- session_state: None,
- })),
- }
- }
-
- pub async fn connect(
- &self,
- connector: MqttClientConnector,
- ) -> Result<Connected, MqttClientConnectError> {
- type Mcce = MqttClientConnectError;
-
- let inner_clone = self.inner.clone();
- let mut inner = self.inner.lock().await;
- let (read, write) = tokio::io::split(MqttConnection::from(connector.transport));
- let mut conn_write = FramedWrite::new(write, MqttPacketCodec);
- let mut conn_read = FramedRead::new(read, MqttPacketCodec);
-
- let conn_packet = mqtt_format::v5::packets::connect::MConnect {
- client_identifier: connector.client_identifier.as_str(),
- username: connector.username.as_ref().map(AsRef::as_ref),
- password: connector.password.as_ref().map(AsRef::as_ref),
- clean_start: connector.clean_start.as_bool(),
- will: connector.will.as_ref().map(|w| w.as_ref()),
- properties: connector.properties.as_ref(),
- keep_alive: connector.keep_alive.as_u16(),
- };
-
- conn_write
- .send(mqtt_format::v5::packets::MqttPacket::Connect(conn_packet))
- .await
- .map_err(Mcce::Send)?;
-
- let Some(maybe_connack) = conn_read.next().await else {
- return Err(Mcce::TransportUnexpectedlyClosed);
- };
-
- let maybe_connack = match maybe_connack {
- Ok(maybe_connack) => maybe_connack,
- Err(e) => {
- return Err(Mcce::Receive(e));
- }
- };
-
- let connack = loop {
- let can_use_auth = connector.properties.authentication_data.is_some();
- let _auth = match maybe_connack.get() {
- mqtt_format::v5::packets::MqttPacket::Connack(connack) => break connack,
- mqtt_format::v5::packets::MqttPacket::Auth(auth) => {
- if can_use_auth {
- auth
- } else {
- // MQTT-4.12.0-6
- return Err(Mcce::ServerProtocolError {
- reason: "MQTT-4.12.0-6",
- });
- }
- }
- _ => {
- return Err(MqttClientConnectError::ServerProtocolError {
- reason: "MQTT-3.1.4-5",
- });
- }
- };
-
- // TODO: Use user-provided method to authenticate further
-
- todo!()
- };
-
- // TODO: Timeout here if the server doesn't respond
-
- if connack.reason_code == mqtt_format::v5::packets::connack::ConnackReasonCode::Success {
- // TODO: Read properties, configure client
-
- if connack.session_present && connector.clean_start == CleanStart::Yes {
- return Err(MqttClientConnectError::ServerProtocolError {
- reason: "MQTT-3.2.2-2",
- });
- }
-
- let (conn_read_sender, conn_read_recv) = futures::channel::oneshot::channel();
-
- let connect_client_state = ConnectState {
- session_present: connack.session_present,
- receive_maximum: connack.properties.receive_maximum().map(|rm| rm.0),
- maximum_qos: connack.properties.maximum_qos().map(|mq| mq.0),
- retain_available: connack.properties.retain_available().map(|ra| ra.0),
- maximum_packet_size: connack.properties.maximum_packet_size().map(|mps| mps.0),
- topic_alias_maximum: connack.properties.topic_alias_maximum().map(|tam| tam.0),
- conn_write,
- conn_read_recv,
- next_packet_identifier: std::num::NonZeroU16::MIN,
- };
-
- let assigned_client_identifier = connack.properties.assigned_client_identifier();
-
- let client_identifier: MqttString;
-
- if let Some(aci) = assigned_client_identifier {
- if connector.client_identifier
- == ProposedClientIdentifier::PotentiallyServerProvided
- {
- client_identifier = MqttString::try_from(aci.0).map_err(|_mse| {
- MqttClientConnectError::ServerProtocolError {
- reason: "MQTT-1.5.4",
- }
- })?;
- } else {
- return Err(MqttClientConnectError::ServerProtocolError {
- reason: "MQTT-3.2.2.3.7",
- });
- }
- } else {
- client_identifier = match connector.client_identifier {
- ProposedClientIdentifier::PotentiallyServerProvided => {
- return Err(MqttClientConnectError::ServerProtocolError {
- reason: "MQTT-3.2.2.3.7",
- });
- }
- ProposedClientIdentifier::MinimalRequired(mr) => mr.into_inner(),
- ProposedClientIdentifier::PotentiallyAccepted(pa) => pa.into_inner(),
- };
- }
-
- inner.connection_state = Some(connect_client_state);
- inner.session_state = Some(SessionState {
- client_identifier,
- outstanding_packets: OutstandingPackets::empty(),
- });
-
- let connack_prop_view =
- crate::packets::connack::ConnackPropertiesView::try_from(maybe_connack)
- .expect("An already matched value suddenly changed?");
-
- let background_task = async move {
- tracing::info!("Starting background task");
- let inner: Arc<Mutex<InnerClient>> = inner_clone;
-
- while let Some(next) = conn_read.next().await {
- let process_span = tracing::debug_span!("Processing packet",
- packet_kind = tracing::field::Empty,
- packet_identifier = tracing::field::Empty);
- tracing::debug!(parent: &process_span, valid = next.is_ok(), "Received packet");
- let packet = match next {
- Ok(packet) => packet,
- Err(_) => todo!(),
- };
- process_span.record("packet_kind", tracing::field::debug(packet.get().get_kind()));
-
- match packet.get() {
- mqtt_format::v5::packets::MqttPacket::Auth(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Disconnect(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Pingreq(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Pingresp(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => {
- match mpuback.reason {
- mqtt_format::v5::packets::puback::PubackReasonCode::Success |
- mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => {
- // happy path
- let Some(ref mut session_state) = inner.lock().await.session_state else {
- tracing::error!(parent: &process_span, "No session state found");
- todo!()
- };
-
- let pident = std::num::NonZeroU16::try_from(mpuback.packet_identifier.0)
- .expect("Zero PacketIdentifier not valid here");
- process_span.record("packet_identifier", pident);
-
- if session_state.outstanding_packets.exists_outstanding_packet(pident) {
- session_state.outstanding_packets.remove_by_id(pident);
- tracing::trace!(parent: &process_span, "Removed packet id from outstanding packets");
- } else {
- tracing::error!(parent: &process_span, "Packet id does not exist in outstanding packets");
- todo!()
- }
-
- // TODO: Forward mpuback.properties etc to the user
- }
-
- mqtt_format::v5::packets::puback::PubackReasonCode::ImplementationSpecificError => todo!(),
- mqtt_format::v5::packets::puback::PubackReasonCode::NotAuthorized => todo!(),
- mqtt_format::v5::packets::puback::PubackReasonCode::PacketIdentifierInUse => todo!(),
- mqtt_format::v5::packets::puback::PubackReasonCode::PayloadFormatInvalid => todo!(),
- mqtt_format::v5::packets::puback::PubackReasonCode::QuotaExceeded => todo!(),
- mqtt_format::v5::packets::puback::PubackReasonCode::TopicNameInvalid => todo!(),
- mqtt_format::v5::packets::puback::PubackReasonCode::UnspecifiedError => todo!(),
- }
- },
- mqtt_format::v5::packets::MqttPacket::Pubcomp(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Publish(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Pubrec(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Pubrel(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Suback(_) => todo!(),
- mqtt_format::v5::packets::MqttPacket::Unsuback(_) => todo!(),
-
- mqtt_format::v5::packets::MqttPacket::Connack(_) |
- mqtt_format::v5::packets::MqttPacket::Connect(_) |
- mqtt_format::v5::packets::MqttPacket::Subscribe(_) |
- mqtt_format::v5::packets::MqttPacket::Unsubscribe(_) => {
- todo!("Handle invalid packet")
- }
- }
- }
-
- tracing::debug!("Finished processing, returning reader");
- if let Err(conn_read) = conn_read_sender.send(conn_read) {
- tracing::error!("Failed to return reader");
- todo!()
- }
-
- Ok(())
- }
- .boxed::<>();
-
- return Ok(Connected {
- connack_prop_view,
- background_task,
- });
- }
-
- // TODO: Do something with error code
-
- todo!()
- }
-
- #[tracing::instrument(skip(self, payload), fields(payload_length = payload.as_ref().len()))]
- pub async fn publish(
- &self,
- topic: crate::topic::MqttTopic,
- qos: QualityOfService,
- retain: bool,
- payload: MqttPayload,
- ) -> Result<(), ()> {
- let mut inner = self.inner.lock().await;
- let inner = &mut *inner;
-
- let Some(conn_state) = &mut inner.connection_state else {
- tracing::error!("No connection state found");
- return Err(());
- };
-
- let Some(sess_state) = &mut inner.session_state else {
- tracing::error!("No session state found");
- return Err(());
- };
-
- if conn_state.retain_available.unwrap_or(true) && retain {
- tracing::warn!("Retain not available, but requested");
- return Err(());
- }
-
- let packet_identifier = if qos > QualityOfService::AtMostOnce {
- get_next_packet_ident(
- &mut conn_state.next_packet_identifier,
- &sess_state.outstanding_packets,
- )
- .map(Some)
- .map_err(|_| ())? // TODO
- } else {
- None
- };
- tracing::debug!(?packet_identifier, "Packet identifier computed");
-
- let publish = MPublish {
- duplicate: false,
- quality_of_service: qos.into(),
- retain,