diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2023-01-23 09:49:19 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2023-01-23 10:01:39 +0100 |
commit | d60fefbcf7865715db9fdce6d23fb283a1f24ea5 (patch) | |
tree | efc63e953a4afadb4e5a3bc2013d1516a09d451e | |
parent | d6755a59a5356b8cd32d7a14aaf7340bb3f8f832 (diff) |
Let PacketStream end on EOF on first byte
This patch changes the implementation of the PacketStream to
automatically end iff reading the first byte from the connection
results in an EOF.
This is done by extending the PacketIOError with a EofOnFirstByte that
is matched against in the Stream for PacketStream impl.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | src/lib.rs | 13 | ||||
-rw-r--r-- | src/packet_stream.rs | 6 |
2 files changed, 17 insertions, 2 deletions
@@ -47,6 +47,8 @@ pub enum PacketIOError { InvalidParsedPacket, #[error("A packet could not be written to its endpoint")] InvalidReceivedPacket(#[from] MPacketWriteError), + #[error("Received EOF on reading first byte")] + EofOnFirstByte, } pub(crate) async fn read_one_packet<W: tokio::io::AsyncRead + Unpin>( @@ -55,7 +57,16 @@ pub(crate) async fn read_one_packet<W: tokio::io::AsyncRead + Unpin>( debug!("Reading a packet"); let mut buffer = BytesMut::new(); - buffer.put_u16(reader.read_u16().await?); + let first_byte = reader.read_u8().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + PacketIOError::EofOnFirstByte + } else { + PacketIOError::from(e) + } + })?; + let second_byte = reader.read_u8().await?; + buffer.put_u8(first_byte); + buffer.put_u8(second_byte); trace!( "Packet has reported size on first byte: 0b{size:08b} = {size}", diff --git a/src/packet_stream.rs b/src/packet_stream.rs index b8de70c..efcb238 100644 --- a/src/packet_stream.rs +++ b/src/packet_stream.rs @@ -113,7 +113,11 @@ impl<'client, ACK: AckHandler> PacketStream<'client, ACK> { None => return Err(MqttError::ConnectionClosed), }; - crate::read_one_packet(client_stream).await? + match crate::read_one_packet(client_stream).await { + Err(crate::PacketIOError::EofOnFirstByte) => return Ok(None), + Err(other) => return Err(MqttError::from(other)), + Ok(msg) => msg, + } }; let packet = next_message.get_packet(); |