summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <neikos@neikos.email>2024-03-21 13:26:14 +0100
committerMarcel Müller <neikos@neikos.email>2024-03-21 13:26:14 +0100
commit29916db508a14d0b90eabfa20f84f6795d6036c0 (patch)
tree569ce9668a4f574f846b33dac50e2b2e1df5a05f
parentf31f19c9d122b50ac1edf3956298523f54abdd0e (diff)
Implement MSubscribe::write
Signed-off-by: Marcel Müller <neikos@neikos.email>
-rw-r--r--mqtt-format/src/v5/packets/subscribe.rs35
1 files changed, 34 insertions, 1 deletions
diff --git a/mqtt-format/src/v5/packets/subscribe.rs b/mqtt-format/src/v5/packets/subscribe.rs
index 580bbec..4af717b 100644
--- a/mqtt-format/src/v5/packets/subscribe.rs
+++ b/mqtt-format/src/v5/packets/subscribe.rs
@@ -15,9 +15,12 @@ use winnow::Parser;
use crate::v5::fixed_header::QualityOfService;
use crate::v5::properties::define_properties;
use crate::v5::strings::parse_string;
+use crate::v5::strings::write_string;
use crate::v5::variable_header::PacketIdentifier;
use crate::v5::variable_header::SubscriptionIdentifier;
use crate::v5::variable_header::UserProperties;
+use crate::v5::write::WResult;
+use crate::v5::write::WriteMqttPacket;
use crate::v5::MResult;
define_properties! {
@@ -27,7 +30,7 @@ define_properties! {
}
}
-#[derive(Debug, num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
+#[derive(Debug, num_enum::TryFromPrimitive, num_enum::IntoPrimitive, Clone, Copy)]
#[repr(u8)]
pub enum RetainHandling {
SendRetainedMessagesAlways = 0,
@@ -69,6 +72,17 @@ impl SubscriptionOptions {
})
.parse_next(input)
}
+
+ pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
+ let qos = self.quality_of_service as u8;
+ let no_local = (self.no_local as u8) << 2;
+ let retain_as_published = (self.retain_as_published as u8) << 3;
+ let retain_handling = (self.retain_handling as u8) << 4;
+
+ let sub_opts = qos & no_local & retain_as_published & retain_handling;
+
+ buffer.write_byte(sub_opts).await
+ }
}
#[derive(Debug)]
@@ -91,6 +105,11 @@ impl<'i> Subscription<'i> {
})
.parse_next(input)
}
+
+ pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
+ write_string(buffer, self.topic_filter).await?;
+ self.options.write(buffer).await
+ }
}
pub struct Subscriptions<'i> {
@@ -119,6 +138,14 @@ impl<'i> Subscriptions<'i> {
.parse_next(input)
}
+ pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
+ for sub in self.iter() {
+ sub.write(buffer).await?;
+ }
+
+ Ok(())
+ }
+
pub fn iter(&self) -> SubscriptionsIter<'i> {
SubscriptionsIter {
current: Bytes::new(self.start),
@@ -169,4 +196,10 @@ impl<'i> MSubscribe<'i> {
})
.parse_next(input)
}
+
+ pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
+ self.packet_identifier.write(buffer).await?;
+ self.properties.write(buffer).await?;
+ self.subscriptions.write(buffer).await
+ }
}