summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/codecs.rs13
1 files changed, 4 insertions, 9 deletions
diff --git a/src/codecs.rs b/src/codecs.rs
index b4710d6..3b03dc7 100644
--- a/src/codecs.rs
+++ b/src/codecs.rs
@@ -5,7 +5,6 @@
//
use mqtt_format::v5::packets::MqttPacket as FormatMqttPacket;
-use tokio_util::bytes::Bytes;
use tokio_util::codec::Decoder;
use tokio_util::codec::Encoder;
use winnow::Partial;
@@ -81,16 +80,15 @@ impl Decoder for MqttPacketCodec {
}
}
-impl Encoder<Bytes> for MqttPacketCodec {
+impl Encoder<FormatMqttPacket<'_>> for MqttPacketCodec {
type Error = MqttPacketCodecError;
fn encode(
&mut self,
- packet: Bytes,
+ packet: FormatMqttPacket<'_>,
dst: &mut tokio_util::bytes::BytesMut,
) -> Result<(), Self::Error> {
- dst.extend_from_slice(&packet);
-
+ packet.write(&mut crate::packets::MqttWriter(dst))?;
Ok(())
}
}
@@ -117,11 +115,8 @@ mod tests {
let packet = FormatMqttPacket::Pingreq(MPingreq);
- packet.write(&mut MqttWriter(&mut data)).unwrap();
-
- let send_data = data.clone().freeze();
tokio::spawn(async move {
- framed_client.send(send_data).await.unwrap();
+ framed_client.send(packet).await.unwrap();
});
let recv_packet = framed_server.next().await.unwrap().unwrap();