diff options
author | Marcel Müller <neikos@neikos.email> | 2024-04-04 11:58:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-04 11:58:00 +0200 |
commit | ae07b4c04a27be8e21e0da36a45e80ed6b9ce498 (patch) | |
tree | a8a49b8e122b18c5422e600c9cba2fd146f4e4e4 | |
parent | d2727bb05ff52539fe019f3d427e2463f4bc616f (diff) | |
parent | 654587ad4efc2b0071826776c41064a96ea3754e (diff) |
Merge pull request #271 from TheNeikos/publish-abstraction
Publish abstraction
-rw-r--r-- | Cargo.lock | 95 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | cloudmqtt-bin/src/bin/client.rs | 27 | ||||
-rw-r--r-- | mqtt-format/src/v5/packets/puback.rs | 30 | ||||
-rw-r--r-- | mqtt-format/src/v5/packets/pubcomp.rs | 15 | ||||
-rw-r--r-- | mqtt-format/src/v5/packets/pubrec.rs | 15 | ||||
-rw-r--r-- | mqtt-format/src/v5/packets/pubrel.rs | 30 | ||||
-rw-r--r-- | src/client.rs | 596 | ||||
-rw-r--r-- | src/client/connect.rs | 287 | ||||
-rw-r--r-- | src/client/mod.rs | 57 | ||||
-rw-r--r-- | src/client/receive.rs | 322 | ||||
-rw-r--r-- | src/client/send.rs | 394 | ||||
-rw-r--r-- | src/client/state.rs | 100 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/qos.rs | 2 |
15 files changed, 1333 insertions, 642 deletions
@@ -28,6 +28,21 @@ dependencies = [ [[package]] name = "anstream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon 1.0.2", + "colorchoice", + "is-terminal", + "utf8parse", +] + +[[package]] +name = "anstream" version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" @@ -35,7 +50,7 @@ dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", - "anstyle-wincon", + "anstyle-wincon 3.0.2", "colorchoice", "utf8parse", ] @@ -66,6 +81,16 @@ dependencies = [ [[package]] name = "anstyle-wincon" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" +dependencies = [ + "anstyle", + "windows-sys 0.48.0", +] + +[[package]] +name = "anstyle-wincon" version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" @@ -150,7 +175,7 @@ version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" dependencies = [ - "anstream", + "anstream 0.6.13", "anstyle", "clap_lex", "strsim", @@ -221,6 +246,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] name = "futures" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -350,6 +385,28 @@ dependencies = [ ] [[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + +[[package]] name = "joinery" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -368,6 +425,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + +[[package]] name = "lock_api" version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -663,6 +726,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.48.0", +] + +[[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -772,6 +849,16 @@ dependencies = [ ] [[package]] +name = "terminal_size" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237" +dependencies = [ + "rustix", + "windows-sys 0.48.0", +] + +[[package]] name = "thiserror" version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1137,7 +1224,11 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8" dependencies = [ + "anstream 0.3.2", + "anstyle", + "is-terminal", "memchr", + "terminal_size", ] [[package]] @@ -15,6 +15,9 @@ categories = ["embedded"] [workspace] members = ["cloudmqtt-bin", "mqtt-format"] +[features] +debug = ["winnow/debug"] + [dependencies] futures = "0.3.30" mqtt-format = { version = "0.5.0", path = "mqtt-format", features = [ diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs index 13ee843..7537cc3 100644 --- a/cloudmqtt-bin/src/bin/client.rs +++ b/cloudmqtt-bin/src/bin/client.rs @@ -5,8 +5,9 @@ // use clap::Parser; +use cloudmqtt::client::connect::MqttClientConnector; +use cloudmqtt::client::send::Publish; use cloudmqtt::client::MqttClient; -use cloudmqtt::client::MqttClientConnector; use cloudmqtt::transport::MqttConnectTransport; use tokio::net::TcpStream; use tracing_subscriber::layer::SubscriberExt; @@ -45,24 +46,28 @@ async fn main() { let connector = MqttClientConnector::new( connection, client_id, - cloudmqtt::client::CleanStart::Yes, + cloudmqtt::client::connect::CleanStart::Yes, cloudmqtt::keep_alive::KeepAlive::Disabled, ); - let client = MqttClient::new(); + let client = MqttClient::new_with_default_handlers(); let connected = client.connect(connector).await.unwrap(); let background = tokio::task::spawn(connected.background_task); client - .publish( - "foo/bar".try_into().unwrap(), - cloudmqtt::qos::QualityOfService::AtLeastOnce, - false, - vec![123].try_into().unwrap(), - ) + .publish(Publish { + topic: "foo/bar".try_into().unwrap(), + qos: cloudmqtt::qos::QualityOfService::ExactlyOnce, + retain: false, + payload: vec![123].try_into().unwrap(), + on_packet_recv: None, + }) .await - .unwrap(); + .unwrap() + .acknowledged() + .await; + + client.ping().await.unwrap().response().await; - let _ = background.await; println!("Sent message! Bye"); } diff --git a/mqtt-format/src/v5/packets/puback.rs b/mqtt-format/src/v5/packets/puback.rs index 5007147..d2f86b2 100644 --- a/mqtt-format/src/v5/packets/puback.rs +++ b/mqtt-format/src/v5/packets/puback.rs @@ -55,21 +55,23 @@ impl<'i> MPuback<'i> { winnow::combinator::trace("MPuback", |input: &mut &'i Bytes| { let packet_identifier = PacketIdentifier::parse(input)?; - if input.is_empty() { - Ok(Self { - packet_identifier, - reason: PubackReasonCode::Success, - properties: PubackProperties::new(), - }) + let reason = if input.is_empty() { + PubackReasonCode::Success } else { - let reason = PubackReasonCode::parse(input)?; - let properties = PubackProperties::parse(input)?; - Ok(Self { - packet_identifier, - reason, - properties, - }) - } + PubackReasonCode::parse(input)? + }; + + let properties = if input.is_empty() { + PubackProperties::new() + } else { + PubackProperties::parse(input)? + }; + + Ok(Self { + packet_identifier, + reason, + properties, + }) }) .parse_next(input) } diff --git a/mqtt-format/src/v5/packets/pubcomp.rs b/mqtt-format/src/v5/packets/pubcomp.rs index cac4069..c8fe2b0 100644 --- a/mqtt-format/src/v5/packets/pubcomp.rs +++ b/mqtt-format/src/v5/packets/pubcomp.rs @@ -46,8 +46,19 @@ impl<'i> MPubcomp<'i> { pub fn parse(input: &mut &'i Bytes) -> MResult<Self> { winnow::combinator::trace("MPubcomp", |input: &mut &'i Bytes| { let packet_identifier = PacketIdentifier::parse(input)?; - let reason = PubcompReasonCode::parse(input)?; - let properties = PubcompProperties::parse(input)?; + + let reason = if input.is_empty() { + PubcompReasonCode::Success + } else { + PubcompReasonCode::parse(input)? + }; + + let properties = if input.is_empty() { + PubcompProperties::new() + } else { + PubcompProperties::parse(input)? + }; + Ok(Self { packet_identifier, reason, diff --git a/mqtt-format/src/v5/packets/pubrec.rs b/mqtt-format/src/v5/packets/pubrec.rs index 9af6a14..655219d 100644 --- a/mqtt-format/src/v5/packets/pubrec.rs +++ b/mqtt-format/src/v5/packets/pubrec.rs @@ -54,8 +54,19 @@ impl<'i> MPubrec<'i> { pub fn parse(input: &mut &'i Bytes) -> MResult<Self> { winnow::combinator::trace("MPubrec", |input: &mut &'i Bytes| { let packet_identifier = PacketIdentifier::parse(input)?; - let reason = PubrecReasonCode::parse(input)?; - let properties = PubrecProperties::parse(input)?; + + let reason = if input.is_empty() { + PubrecReasonCode::Success + } else { + PubrecReasonCode::parse(input)? + }; + + let properties = if input.is_empty() { + PubrecProperties::new() + } else { + PubrecProperties::parse(input)? + }; + Ok(Self { packet_identifier, reason, diff --git a/mqtt-format/src/v5/packets/pubrel.rs b/mqtt-format/src/v5/packets/pubrel.rs index 67e7374..d646c71 100644 --- a/mqtt-format/src/v5/packets/pubrel.rs +++ b/mqtt-format/src/v5/packets/pubrel.rs @@ -48,21 +48,23 @@ impl<'i> MPubrel<'i> { winnow::combinator::trace("MPubrel", |input: &mut &'i Bytes| { let packet_identifier = PacketIdentifier::parse(input)?; - if input.is_empty() { - Ok(Self { - packet_identifier, - reason: PubrelReasonCode::Success, - properties: PubrelProperties::new(), - }) + let reason = if input.is_empty() { + PubrelReasonCode::Success } else { - let reason = PubrelReasonCode::parse(input)?; - let properties = PubrelProperties::parse(input)?; - Ok(Self { - packet_identifier, - reason, - properties, - }) - } + PubrelReasonCode::parse(input)? + }; + + let properties = if input.is_empty() { + PubrelProperties::new() + } else { + PubrelProperties::parse(input)? + }; + + Ok(Self { + packet_identifier, + reason, + properties, + }) }) .parse_next(input) } diff --git a/src/client.rs b/src/client.rs deleted file mode 100644 index c6cd911..0000000 --- a/src/client.rs +++ /dev/null @@ -1,596 +0,0 @@ -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -use std::num::NonZeroU16; -use std::sync::Arc; - -use futures::lock::Mutex; -use futures::FutureExt; -use futures::SinkExt; -use futures::StreamExt; -use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX; -use mqtt_format::v5::packets::publish::MPublish; -use tokio_util::codec::FramedRead; -use tokio_util::codec::FramedWrite; -use tracing::Instrument; - -use crate::bytes::MqttBytes; -use crate::client_identifier::ProposedClientIdentifier; -use crate::codecs::MqttPacketCodec; -use crate::codecs::MqttPacketCodecError; -use crate::keep_alive::KeepAlive; -use crate::packets::connack::ConnackPropertiesView; -use crate::payload::MqttPayload; -use crate::qos::QualityOfService; -use crate::string::MqttString; -use crate::transport::MqttConnectTransport; -use crate::transport::MqttConnection; - -#[derive(Debug, PartialEq, Eq)] -pub enum CleanStart { - No, - Yes, -} - -impl CleanStart { - pub fn as_bool(&self) -> bool { - match self { - CleanStart::No => false, - CleanStart::Yes => true, - } - } -} - -#[derive(typed_builder::TypedBuilder)] -pub struct MqttWill { - #[builder(default = crate::packets::connect::ConnectWillProperties::new())] - properties: crate::packets::connect::ConnectWillProperties, - topic: MqttString, - payload: MqttBytes, - qos: mqtt_format::v5::qos::QualityOfService, - retain: bool, -} - -impl MqttWill { - pub fn get_properties_mut(&mut self) -> &mut crate::packets::connect::ConnectWillProperties { - &mut self.properties - } -} - -impl MqttWill { - fn as_ref(&self) -> mqtt_format::v5::packets::connect::Will<'_> { - mqtt_format::v5::packets::connect::Will { - properties: self.properties.as_ref(), - topic: self.topic.as_ref(), - payload: self.payload.as_ref(), - will_qos: self.qos, - will_retain: self.retain, - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum MqttClientConnectError { - #[error("An error occured while encoding or sending an MQTT Packet")] - Send(#[source] MqttPacketCodecError), - - #[error("An error occured while decoding or receiving an MQTT Packet")] - Receive(#[source] MqttPacketCodecError), - - #[error("The transport unexpectedly closed")] - TransportUnexpectedlyClosed, - - #[error("The server sent a response with a protocol error: {reason}")] - ServerProtocolError { reason: &'static str }, -} - -pub struct MqttClientConnector { - transport: MqttConnectTransport, - client_identifier: ProposedClientIdentifier, - clean_start: CleanStart, - keep_alive: KeepAlive, - properties: crate::packets::connect::ConnectProperties, - username: Option<MqttString>, - password: Option<MqttBytes>, - will: Option<MqttWill>, -} - -impl MqttClientConnector { - pub fn new( - transport: MqttConnectTransport, - client_identifier: ProposedClientIdentifier, - clean_start: CleanStart, - keep_alive: KeepAlive, - ) -> MqttClientConnector { - MqttClientConnector { - transport, - client_identifier, - clean_start, - keep_alive, - properties: crate::packets::connect::ConnectProperties::new(), - username: None, - password: None, - will: None, - } - } - - pub fn with_username(&mut self, username: MqttString) -> &mut Self { - self.username = Some(username); - self - } - - pub fn with_password(&mut self, password: MqttBytes) -> &mut Self { - self.password = Some(password); - self - } - - pub fn with_will(&mut self, will: MqttWill) -> &mut Self { - self.will = Some(will); - self - } - - pub fn properties_mut(&mut self) -> &mut crate::packets::connect::ConnectProperties { - &mut self.properties - } -} - -struct ConnectState { - session_present: bool, - receive_maximum: Option<NonZeroU16>, - maximum_qos: Option<mqtt_format::v5::qos::MaximumQualityOfService>, - retain_available: Option<bool>, - topic_alias_maximum: Option<u16>, - maximum_packet_size: Option<u32>, - conn_write: FramedWrite<tokio::io::WriteHalf<MqttConnection>, MqttPacketCodec>, - - conn_read_recv: futures::channel::oneshot::Receiver< - FramedRead<tokio::io::ReadHalf<MqttConnection>, MqttPacketCodec>, - >, - - next_packet_identifier: std::num::NonZeroU16, -} - -struct SessionState { - client_identifier: MqttString, - outstanding_packets: OutstandingPackets, -} - -struct OutstandingPackets { - packet_ident_order: Vec<std::num::NonZeroU16>, - outstanding_packets: - std::collections::BTreeMap<std::num::NonZeroU16, crate::packets::MqttPacket>, -} - -impl OutstandingPackets { - pub fn empty() -> Self { - Self { - packet_ident_order: Vec::new(), - outstanding_packets: std::collections::BTreeMap::new(), - } - } - - pub fn insert(&mut self, ident: std::num::NonZeroU16, packet: crate::packets::MqttPacket) { - debug_assert_eq!( - self.packet_ident_order.len(), - self.outstanding_packets.len() - ); - - self.packet_ident_order.push(ident); - let removed = self.outstanding_packets.insert(ident, packet); - - debug_assert!(removed.is_none()); - } - - pub fn update_by_id( - &mut self, - ident: std::num::NonZeroU16, - packet: crate::packets::MqttPacket, - ) { - debug_assert_eq!( - self.packet_ident_order.len(), - self.outstanding_packets.len() - ); - - let removed = self.outstanding_packets.insert(ident, packet); - - debug_assert!(removed.is_some()); - } - - pub fn exists_outstanding_packet(&self, ident: std::num::NonZeroU16) -> bool { - self.outstanding_packets.contains_key(&ident) - } - - pub fn iter_in_send_order( - &self, - ) -> impl Iterator<Item = (std::num::NonZeroU16, &crate::packets::MqttPacket)> { - self.packet_ident_order - .iter() - .flat_map(|id| self.outstanding_packets.get(id).map(|p| (*id, p))) - } - - pub fn remove_by_id(&mut self, id: std::num::NonZeroU16) { - // Vec::retain() preserves order - self.packet_ident_order.retain(|&elm| elm != id); - self.outstanding_packets.remove(&id); - - debug_assert_eq!( - self.packet_ident_order.len(), - self.outstanding_packets.len() - ); - } -} - -struct InnerClient { - connection_state: Option<ConnectState>, - session_state: Option<SessionState>, -} - -pub struct MqttClient { - inner: Arc<Mutex<InnerClient>>, -} - -impl MqttClient { - #[allow(clippy::new_without_default)] - pub fn new() -> MqttClient { - MqttClient { - inner: Arc::new(Mutex::new(InnerClient { - connection_state: None, - session_state: None, - })), - } - } - - pub async fn connect( - &self, - connector: MqttClientConnector, - ) -> Result<Connected, MqttClientConnectError> { - type Mcce = MqttClientConnectError; - - let inner_clone = self.inner.clone(); - let mut inner = self.inner.lock().await; - let (read, write) = tokio::io::split(MqttConnection::from(connector.transport)); - let mut conn_write = FramedWrite::new(write, MqttPacketCodec); - let mut conn_read = FramedRead::new(read, MqttPacketCodec); - - let conn_packet = mqtt_format::v5::packets::connect::MConnect { - client_identifier: connector.client_identifier.as_str(), - username: connector.username.as_ref().map(AsRef::as_ref), - password: connector.password.as_ref().map(AsRef::as_ref), - clean_start: connector.clean_start.as_bool(), - will: connector.will.as_ref().map(|w| w.as_ref()), - properties: connector.properties.as_ref(), - keep_alive: connector.keep_alive.as_u16(), - }; - - conn_write - .send(mqtt_format::v5::packets::MqttPacket::Connect(conn_packet)) - .await - .map_err(Mcce::Send)?; - - let Some(maybe_connack) = conn_read.next().await else { - return Err(Mcce::TransportUnexpectedlyClosed); - }; - - let maybe_connack = match maybe_connack { - Ok(maybe_connack) => maybe_connack, - Err(e) => { - return Err(Mcce::Receive(e)); - } - }; - - let connack = loop { - let can_use_auth = connector.properties.authentication_data.is_some(); - let _auth = match maybe_connack.get() { - mqtt_format::v5::packets::MqttPacket::Connack(connack) => break connack, - mqtt_format::v5::packets::MqttPacket::Auth(auth) => { - if can_use_auth { - auth - } else { - // MQTT-4.12.0-6 - return Err(Mcce::ServerProtocolError { - reason: "MQTT-4.12.0-6", - }); - } - } - _ => { - return Err(MqttClientConnectError::ServerProtocolError { - reason: "MQTT-3.1.4-5", - }); - } - }; - - // TODO: Use user-provided method to authenticate further - - todo!() - }; - - // TODO: Timeout here if the server doesn't respond - - if connack.reason_code == mqtt_format::v5::packets::connack::ConnackReasonCode::Success { - // TODO: Read properties, configure client - - if connack.session_present && connector.clean_start == CleanStart::Yes { - return Err(MqttClientConnectError::ServerProtocolError { - reason: "MQTT-3.2.2-2", - }); - } - - let (conn_read_sender, conn_read_recv) = futures::channel::oneshot::channel(); - - let connect_client_state = ConnectState { - session_present: connack.session_present, - receive_maximum: connack.properties.receive_maximum().map(|rm| rm.0), - maximum_qos: connack.properties.maximum_qos().map(|mq| mq.0), - retain_available: connack.properties.retain_available().map(|ra| ra.0), - maximum_packet_size: connack.properties.maximum_packet_size().map(|mps| mps.0), - topic_alias_maximum: connack.properties.topic_alias_maximum().map(|tam| tam.0), - conn_write, - conn_read_recv, - next_packet_identifier: std::num::NonZeroU16::MIN, - }; - - let assigned_client_identifier = connack.properties.assigned_client_identifier(); - - let client_identifier: MqttString; - - if let Some(aci) = assigned_client_identifier { - if connector.client_identifier - == ProposedClientIdentifier::PotentiallyServerProvided - { - client_identifier = MqttString::try_from(aci.0).map_err(|_mse| { - MqttClientConnectError::ServerProtocolError { - reason: "MQTT-1.5.4", - } - })?; - } else { - return Err(MqttClientConnectError::ServerProtocolError { - reason: "MQTT-3.2.2.3.7", - }); - } - } else { - client_identifier = match connector.client_identifier { - ProposedClientIdentifier::PotentiallyServerProvided => { - return Err(MqttClientConnectError::ServerProtocolError { - reason: "MQTT-3.2.2.3.7", - }); - } - ProposedClientIdentifier::MinimalRequired(mr) => mr.into_inner(), - ProposedClientIdentifier::PotentiallyAccepted(pa) => pa.into_inner(), - }; - } - - inner.connection_state = Some(connect_client_state); - inner.session_state = Some(SessionState { - client_identifier, - outstanding_packets: OutstandingPackets::empty(), - }); - - let connack_prop_view = - crate::packets::connack::ConnackPropertiesView::try_from(maybe_connack) - .expect("An already matched value suddenly changed?"); - - let background_task = async move { - tracing::info!("Starting background task"); - let inner: Arc<Mutex<InnerClient>> = inner_clone; - - while let Some(next) = conn_read.next().await { - let process_span = tracing::debug_span!("Processing packet", - packet_kind = tracing::field::Empty, - packet_identifier = tracing::field::Empty); - tracing::debug!(parent: &process_span, valid = next.is_ok(), "Received packet"); - let packet = match next { - Ok(packet) => packet, - Err(_) => todo!(), - }; - process_span.record("packet_kind", tracing::field::debug(packet.get().get_kind())); - - match packet.get() { - mqtt_format::v5::packets::MqttPacket::Auth(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Disconnect(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Pingreq(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Pingresp(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => { - match mpuback.reason { - mqtt_format::v5::packets::puback::PubackReasonCode::Success | - mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => { - // happy path - let Some(ref mut session_state) = inner.lock().await.session_state else { - tracing::error!(parent: &process_span, "No session state found"); - todo!() - }; - - let pident = std::num::NonZeroU16::try_from(mpuback.packet_identifier.0) - .expect("Zero PacketIdentifier not valid here"); - process_span.record("packet_identifier", pident); - - if session_state.outstanding_packets.exists_outstanding_packet(pident) { - session_state.outstanding_packets.remove_by_id(pident); - tracing::trace!(parent: &process_span, "Removed packet id from outstanding packets"); - } else { - tracing::error!(parent: &process_span, "Packet id does not exist in outstanding packets"); - todo!() - } - - // TODO: Forward mpuback.properties etc to the user - } - - mqtt_format::v5::packets::puback::PubackReasonCode::ImplementationSpecificError => todo!(), - mqtt_format::v5::packets::puback::PubackReasonCode::NotAuthorized => todo!(), - mqtt_format::v5::packets::puback::PubackReasonCode::PacketIdentifierInUse => todo!(), - mqtt_format::v5::packets::puback::PubackReasonCode::PayloadFormatInvalid => todo!(), - mqtt_format::v5::packets::puback::PubackReasonCode::QuotaExceeded => todo!(), - mqtt_format::v5::packets::puback::PubackReasonCode::TopicNameInvalid => todo!(), - mqtt_format::v5::packets::puback::PubackReasonCode::UnspecifiedError => todo!(), - } - }, - mqtt_format::v5::packets::MqttPacket::Pubcomp(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Publish(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Pubrec(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Pubrel(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Suback(_) => todo!(), - mqtt_format::v5::packets::MqttPacket::Unsuback(_) => todo!(), - - mqtt_format::v5::packets::MqttPacket::Connack(_) | - mqtt_format::v5::packets::MqttPacket::Connect(_) | - mqtt_format::v5::packets::MqttPacket::Subscribe(_) | - mqtt_format::v5::packets::MqttPacket::Unsubscribe(_) => { - todo!("Handle invalid packet") - } - } - } - - tracing::debug!("Finished processing, returning reader"); - if let Err(conn_read) = conn_read_sender.send(conn_read) { - tracing::error!("Failed to return reader"); - todo!() - } - - Ok(()) - } - .boxed::<>(); - - return Ok(Connected { - connack_prop_view, - background_task, - }); - } - - // TODO: Do something with error code - - todo!() - } - - #[tracing::instrument(skip(self, payload), fields(payload_length = payload.as_ref().len()))] - pub async fn publish( - &self, - topic: crate::topic::MqttTopic, - qos: QualityOfService, - retain: bool, - payload: MqttPayload, - ) -> Result<(), ()> { - let mut inner = self.inner.lock().await; - let inner = &mut *inner; - - let Some(conn_state) = &mut inner.connection_state else { - tracing::error!("No connection state found"); - return Err(()); - }; - - let Some(sess_state) = &mut inner.session_state else { - tracing::error!("No session state found"); - return Err(()); - }; - - if conn_state.retain_available.unwrap_or(true) && retain { - tracing::warn!("Retain not available, but requested"); - return Err(()); - } - - let packet_identifier = if qos > QualityOfService::AtMostOnce { - get_next_packet_ident( - &mut conn_state.next_packet_identifier, - &sess_state.outstanding_packets, - ) - .map(Some) - .map_err(|_| ())? // TODO - } else { - None - }; - tracing::debug!(?packet_identifier, "Packet identifier computed"); - - let publish = MPublish { - duplicate: false, - quality_of_service: qos.into(), - retain, |