diff options
author | Marcel Müller <neikos@neikos.email> | 2024-03-21 13:26:14 +0100 |
---|---|---|
committer | Marcel Müller <neikos@neikos.email> | 2024-03-21 13:26:14 +0100 |
commit | 29916db508a14d0b90eabfa20f84f6795d6036c0 (patch) | |
tree | 569ce9668a4f574f846b33dac50e2b2e1df5a05f | |
parent | f31f19c9d122b50ac1edf3956298523f54abdd0e (diff) |
Implement MSubscribe::write
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r-- | mqtt-format/src/v5/packets/subscribe.rs | 35 |
1 files changed, 34 insertions, 1 deletions
diff --git a/mqtt-format/src/v5/packets/subscribe.rs b/mqtt-format/src/v5/packets/subscribe.rs index 580bbec..4af717b 100644 --- a/mqtt-format/src/v5/packets/subscribe.rs +++ b/mqtt-format/src/v5/packets/subscribe.rs @@ -15,9 +15,12 @@ use winnow::Parser; use crate::v5::fixed_header::QualityOfService; use crate::v5::properties::define_properties; use crate::v5::strings::parse_string; +use crate::v5::strings::write_string; use crate::v5::variable_header::PacketIdentifier; use crate::v5::variable_header::SubscriptionIdentifier; use crate::v5::variable_header::UserProperties; +use crate::v5::write::WResult; +use crate::v5::write::WriteMqttPacket; use crate::v5::MResult; define_properties! { @@ -27,7 +30,7 @@ define_properties! { } } -#[derive(Debug, num_enum::TryFromPrimitive, num_enum::IntoPrimitive)] +#[derive(Debug, num_enum::TryFromPrimitive, num_enum::IntoPrimitive, Clone, Copy)] #[repr(u8)] pub enum RetainHandling { SendRetainedMessagesAlways = 0, @@ -69,6 +72,17 @@ impl SubscriptionOptions { }) .parse_next(input) } + + pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> { + let qos = self.quality_of_service as u8; + let no_local = (self.no_local as u8) << 2; + let retain_as_published = (self.retain_as_published as u8) << 3; + let retain_handling = (self.retain_handling as u8) << 4; + + let sub_opts = qos & no_local & retain_as_published & retain_handling; + + buffer.write_byte(sub_opts).await + } } #[derive(Debug)] @@ -91,6 +105,11 @@ impl<'i> Subscription<'i> { }) .parse_next(input) } + + pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> { + write_string(buffer, self.topic_filter).await?; + self.options.write(buffer).await + } } pub struct Subscriptions<'i> { @@ -119,6 +138,14 @@ impl<'i> Subscriptions<'i> { .parse_next(input) } + pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> { + for sub in self.iter() { + sub.write(buffer).await?; + } + + Ok(()) + } + pub fn iter(&self) -> SubscriptionsIter<'i> { SubscriptionsIter { current: Bytes::new(self.start), @@ -169,4 +196,10 @@ impl<'i> MSubscribe<'i> { }) .parse_next(input) } + + pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> { + self.packet_identifier.write(buffer).await?; + self.properties.write(buffer).await?; + self.subscriptions.write(buffer).await + } } |