summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2023-01-23 09:49:19 +0100
committerMatthias Beyer <mail@beyermatthias.de>2023-01-23 10:01:39 +0100
commitd60fefbcf7865715db9fdce6d23fb283a1f24ea5 (patch)
treeefc63e953a4afadb4e5a3bc2013d1516a09d451e
parentd6755a59a5356b8cd32d7a14aaf7340bb3f8f832 (diff)
Let PacketStream end on EOF on first byte
This patch changes the implementation of the PacketStream to automatically end iff reading the first byte from the connection results in an EOF. This is done by extending the PacketIOError with a EofOnFirstByte that is matched against in the Stream for PacketStream impl. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--src/lib.rs13
-rw-r--r--src/packet_stream.rs6
2 files changed, 17 insertions, 2 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 0708d68..408e3a5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -47,6 +47,8 @@ pub enum PacketIOError {
InvalidParsedPacket,
#[error("A packet could not be written to its endpoint")]
InvalidReceivedPacket(#[from] MPacketWriteError),
+ #[error("Received EOF on reading first byte")]
+ EofOnFirstByte,
}
pub(crate) async fn read_one_packet<W: tokio::io::AsyncRead + Unpin>(
@@ -55,7 +57,16 @@ pub(crate) async fn read_one_packet<W: tokio::io::AsyncRead + Unpin>(
debug!("Reading a packet");
let mut buffer = BytesMut::new();
- buffer.put_u16(reader.read_u16().await?);
+ let first_byte = reader.read_u8().await.map_err(|e| {
+ if e.kind() == std::io::ErrorKind::UnexpectedEof {
+ PacketIOError::EofOnFirstByte
+ } else {
+ PacketIOError::from(e)
+ }
+ })?;
+ let second_byte = reader.read_u8().await?;
+ buffer.put_u8(first_byte);
+ buffer.put_u8(second_byte);
trace!(
"Packet has reported size on first byte: 0b{size:08b} = {size}",
diff --git a/src/packet_stream.rs b/src/packet_stream.rs
index b8de70c..efcb238 100644
--- a/src/packet_stream.rs
+++ b/src/packet_stream.rs
@@ -113,7 +113,11 @@ impl<'client, ACK: AckHandler> PacketStream<'client, ACK> {
None => return Err(MqttError::ConnectionClosed),
};
- crate::read_one_packet(client_stream).await?
+ match crate::read_one_packet(client_stream).await {
+ Err(crate::PacketIOError::EofOnFirstByte) => return Ok(None),
+ Err(other) => return Err(MqttError::from(other)),
+ Ok(msg) => msg,
+ }
};
let packet = next_message.get_packet();