summaryrefslogtreecommitdiffstats
path: root/tokio-util/src/codec/framed_read.rs
diff options
context:
space:
mode:
authorPlecra <60934058+Plecra@users.noreply.github.com>2020-05-12 12:47:38 +0100
committerGitHub <noreply@github.com>2020-05-12 13:47:38 +0200
commit221f421464e6672e9cf25629998477946a8b8e1f (patch)
treecdb0898dcb0a3d80177fccd0729cd28525e0b96e /tokio-util/src/codec/framed_read.rs
parent1cc016833569e2dbae3b0431b7c87d5e75ef5de6 (diff)
codec: rewrite of codec::Framed (#2368)
Framed was designed to encapsulate both AsyncRead and AsyncWrite so that it could wrap two-way connections. It used Fuse to manage the pinned io object between the FramedWrite and FramedRead structs. I replaced the Fuse struct by isolating the state used in reading and writing, and making the code generic over that instead. This means the FramedImpl struct now has a parameter for the state, and contains the logic for both directions. The Framed* structs are now simply wrappers around this type Hopefully removing the `Pin` handling made things easier to understand, too.
Diffstat (limited to 'tokio-util/src/codec/framed_read.rs')
-rw-r--r--tokio-util/src/codec/framed_read.rs208
1 files changed, 30 insertions, 178 deletions
diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs
index e7798c32..a6844b73 100644
--- a/tokio-util/src/codec/framed_read.rs
+++ b/tokio-util/src/codec/framed_read.rs
@@ -1,11 +1,10 @@
-use crate::codec::framed::{Fuse, ProjectFuse};
+use crate::codec::framed_impl::{FramedImpl, ReadFrame};
use crate::codec::Decoder;
use tokio::{io::AsyncRead, stream::Stream};
use bytes::BytesMut;
use futures_sink::Sink;
-use log::trace;
use pin_project_lite::pin_project;
use std::fmt;
use std::pin::Pin;
@@ -18,22 +17,10 @@ pin_project! {
/// [`AsyncRead`]: tokio::io::AsyncRead
pub struct FramedRead<T, D> {
#[pin]
- inner: FramedRead2<Fuse<T, D>>,
+ inner: FramedImpl<T, D, ReadFrame>,
}
}
-pin_project! {
- pub(crate) struct FramedRead2<T> {
- #[pin]
- inner: T,
- eof: bool,
- is_readable: bool,
- buffer: BytesMut,
- }
-}
-
-const INITIAL_CAPACITY: usize = 8 * 1024;
-
// ===== impl FramedRead =====
impl<T, D> FramedRead<T, D>
@@ -44,10 +31,11 @@ where
/// Creates a new `FramedRead` with the given `decoder`.
pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
FramedRead {
- inner: framed_read2(Fuse {
- io: inner,
+ inner: FramedImpl {
+ inner,
codec: decoder,
- }),
+ state: Default::default(),
+ },
}
}
@@ -55,13 +43,15 @@ where
/// initial size.
pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
FramedRead {
- inner: framed_read2_with_buffer(
- Fuse {
- io: inner,
- codec: decoder,
+ inner: FramedImpl {
+ inner,
+ codec: decoder,
+ state: ReadFrame {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(capacity),
},
- BytesMut::with_capacity(capacity),
- ),
+ },
}
}
}
@@ -74,7 +64,7 @@ impl<T, D> FramedRead<T, D> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
- &self.inner.inner.io
+ &self.inner.inner
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
@@ -84,7 +74,7 @@ impl<T, D> FramedRead<T, D> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
- &mut self.inner.inner.io
+ &mut self.inner.inner
}
/// Consumes the `FramedRead`, returning its underlying I/O stream.
@@ -93,25 +83,26 @@ impl<T, D> FramedRead<T, D> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
- self.inner.inner.io
+ self.inner.inner
}
/// Returns a reference to the underlying decoder.
pub fn decoder(&self) -> &D {
- &self.inner.inner.codec
+ &self.inner.codec
}
/// Returns a mutable reference to the underlying decoder.
pub fn decoder_mut(&mut self) -> &mut D {
- &mut self.inner.inner.codec
+ &mut self.inner.codec
}
/// Returns a reference to the read buffer.
pub fn read_buffer(&self) -> &BytesMut {
- &self.inner.buffer
+ &self.inner.state.buffer
}
}
+// This impl just defers to the underlying FramedImpl
impl<T, D> Stream for FramedRead<T, D>
where
T: AsyncRead,
@@ -132,43 +123,19 @@ where
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_ready(cx)
+ self.project().inner.project().inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .start_send(item)
+ self.project().inner.project().inner.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_flush(cx)
+ self.project().inner.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_close(cx)
+ self.project().inner.project().inner.poll_close(cx)
}
}
@@ -179,126 +146,11 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FramedRead")
- .field("inner", &self.inner.inner.io)
- .field("decoder", &self.inner.inner.codec)
- .field("eof", &self.inner.eof)
- .field("is_readable", &self.inner.is_readable)
- .field("buffer", &self.inner.buffer)
+ .field("inner", &self.get_ref())
+ .field("decoder", &self.decoder())
+ .field("eof", &self.inner.state.eof)
+ .field("is_readable", &self.inner.state.is_readable)
+ .field("buffer", &self.read_buffer())
.finish()
}
}
-
-// ===== impl FramedRead2 =====
-
-pub(crate) fn framed_read2<T>(inner: T) -> FramedRead2<T> {
- FramedRead2 {
- inner,
- eof: false,
- is_readable: false,
- buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
- }
-}
-
-pub(crate) fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
- if buf.capacity() < INITIAL_CAPACITY {
- let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
- buf.reserve(bytes_to_reserve);
- }
- FramedRead2 {
- inner,
- eof: false,
- is_readable: !buf.is_empty(),
- buffer: buf,
- }
-}
-
-impl<T> FramedRead2<T> {
- pub(crate) fn get_ref(&self) -> &T {
- &self.inner
- }
-
- pub(crate) fn into_inner(self) -> T {
- self.inner
- }
-
- pub(crate) fn into_parts(self) -> (T, BytesMut) {
- (self.inner, self.buffer)
- }
-
- pub(crate) fn get_mut(&mut self) -> &mut T {
- &mut self.inner
- }
-
- pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
- self.project().inner
- }
-
- pub(crate) fn buffer(&self) -> &BytesMut {
- &self.buffer
- }
-}
-
-impl<T> Stream for FramedRead2<T>
-where
- T: ProjectFuse + AsyncRead,
- T::Codec: Decoder,
-{
- type Item = Result<<T::Codec as Decoder>::Item, <T::Codec as Decoder>::Error>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let mut pinned = self.project();
- loop {
- // Repeatedly call `decode` or `decode_eof` as long as it is
- // "readable". Readable is defined as not having returned `None`. If
- // the upstream has returned EOF, and the decoder is no longer
- // readable, it can be assumed that the decoder will never become
- // readable again, at which point the stream is terminated.
- if *pinned.is_readable {
- if *pinned.eof {
- let frame = pinned
- .inner
- .as_mut()
- .project()
- .codec
- .decode_eof(&mut pinned.buffer)?;
- return Poll::Ready(frame.map(Ok));
- }
-
- trace!("attempting to decode a frame");
-
- if let Some(frame) = pinned
- .inner
- .as_mut()
- .project()
- .codec
- .decode(&mut pinned.buffer)?
- {
- trace!("frame decoded from buffer");
- return Poll::Ready(Some(Ok(frame)));
- }
-
- *pinned.is_readable = false;
- }
-
- assert!(!*pinned.eof);
-
- // Otherwise, try to read more data and try again. Make sure we've
- // got room for at least one byte to read to ensure that we don't
- // get a spurious 0 that looks like EOF
- pinned.buffer.reserve(1);
- let bytect = match pinned
- .inner
- .as_mut()
- .poll_read_buf(cx, &mut pinned.buffer)?
- {
- Poll::Ready(ct) => ct,
- Poll::Pending => return Poll::Pending,
- };
- if bytect == 0 {
- *pinned.eof = true;
- }
-
- *pinned.is_readable = true;
- }
- }
-}