diff options
author | Plecra <60934058+Plecra@users.noreply.github.com> | 2020-05-12 12:47:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-12 13:47:38 +0200 |
commit | 221f421464e6672e9cf25629998477946a8b8e1f (patch) | |
tree | cdb0898dcb0a3d80177fccd0729cd28525e0b96e /tokio-util/src/codec/framed_read.rs | |
parent | 1cc016833569e2dbae3b0431b7c87d5e75ef5de6 (diff) |
codec: rewrite of codec::Framed (#2368)
Framed was designed to encapsulate both AsyncRead and AsyncWrite so
that it could wrap two-way connections. It used Fuse to manage the pinned
io object between the FramedWrite and FramedRead structs.
I replaced the Fuse struct by isolating the state used in reading and
writing, and making the code generic over that instead. This means
the FramedImpl struct now has a parameter for the state, and contains
the logic for both directions. The Framed* structs are now simply
wrappers around this type
Hopefully removing the `Pin` handling made things easier to
understand, too.
Diffstat (limited to 'tokio-util/src/codec/framed_read.rs')
-rw-r--r-- | tokio-util/src/codec/framed_read.rs | 208 |
1 files changed, 30 insertions, 178 deletions
diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index e7798c32..a6844b73 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,11 +1,10 @@ -use crate::codec::framed::{Fuse, ProjectFuse}; +use crate::codec::framed_impl::{FramedImpl, ReadFrame}; use crate::codec::Decoder; use tokio::{io::AsyncRead, stream::Stream}; use bytes::BytesMut; use futures_sink::Sink; -use log::trace; use pin_project_lite::pin_project; use std::fmt; use std::pin::Pin; @@ -18,22 +17,10 @@ pin_project! { /// [`AsyncRead`]: tokio::io::AsyncRead pub struct FramedRead<T, D> { #[pin] - inner: FramedRead2<Fuse<T, D>>, + inner: FramedImpl<T, D, ReadFrame>, } } -pin_project! { - pub(crate) struct FramedRead2<T> { - #[pin] - inner: T, - eof: bool, - is_readable: bool, - buffer: BytesMut, - } -} - -const INITIAL_CAPACITY: usize = 8 * 1024; - // ===== impl FramedRead ===== impl<T, D> FramedRead<T, D> @@ -44,10 +31,11 @@ where /// Creates a new `FramedRead` with the given `decoder`. pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { FramedRead { - inner: framed_read2(Fuse { - io: inner, + inner: FramedImpl { + inner, codec: decoder, - }), + state: Default::default(), + }, } } @@ -55,13 +43,15 @@ where /// initial size. pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> { FramedRead { - inner: framed_read2_with_buffer( - Fuse { - io: inner, - codec: decoder, + inner: FramedImpl { + inner, + codec: decoder, + state: ReadFrame { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(capacity), }, - BytesMut::with_capacity(capacity), - ), + }, } } } @@ -74,7 +64,7 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -84,7 +74,7 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.io + &mut self.inner.inner } /// Consumes the `FramedRead`, returning its underlying I/O stream. @@ -93,25 +83,26 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.io + self.inner.inner } /// Returns a reference to the underlying decoder. pub fn decoder(&self) -> &D { - &self.inner.inner.codec + &self.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.codec + &mut self.inner.codec } /// Returns a reference to the read buffer. pub fn read_buffer(&self) -> &BytesMut { - &self.inner.buffer + &self.inner.state.buffer } } +// This impl just defers to the underlying FramedImpl impl<T, D> Stream for FramedRead<T, D> where T: AsyncRead, @@ -132,43 +123,19 @@ where type Error = T::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_ready(cx) + self.project().inner.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - self.project() - .inner - .project() - .inner - .project() - .io - .start_send(item) + self.project().inner.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_flush(cx) + self.project().inner.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_close(cx) + self.project().inner.project().inner.poll_close(cx) } } @@ -179,126 +146,11 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.io) - .field("decoder", &self.inner.inner.codec) - .field("eof", &self.inner.eof) - .field("is_readable", &self.inner.is_readable) - .field("buffer", &self.inner.buffer) + .field("inner", &self.get_ref()) + .field("decoder", &self.decoder()) + .field("eof", &self.inner.state.eof) + .field("is_readable", &self.inner.state.is_readable) + .field("buffer", &self.read_buffer()) .finish() } } - -// ===== impl FramedRead2 ===== - -pub(crate) fn framed_read2<T>(inner: T) -> FramedRead2<T> { - FramedRead2 { - inner, - eof: false, - is_readable: false, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub(crate) fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedRead2 { - inner, - eof: false, - is_readable: !buf.is_empty(), - buffer: buf, - } -} - -impl<T> FramedRead2<T> { - pub(crate) fn get_ref(&self) -> &T { - &self.inner - } - - pub(crate) fn into_inner(self) -> T { - self.inner - } - - pub(crate) fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.inner - } - - pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { - self.project().inner - } - - pub(crate) fn buffer(&self) -> &BytesMut { - &self.buffer - } -} - -impl<T> Stream for FramedRead2<T> -where - T: ProjectFuse + AsyncRead, - T::Codec: Decoder, -{ - type Item = Result<<T::Codec as Decoder>::Item, <T::Codec as Decoder>::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let mut pinned = self.project(); - loop { - // Repeatedly call `decode` or `decode_eof` as long as it is - // "readable". Readable is defined as not having returned `None`. If - // the upstream has returned EOF, and the decoder is no longer - // readable, it can be assumed that the decoder will never become - // readable again, at which point the stream is terminated. - if *pinned.is_readable { - if *pinned.eof { - let frame = pinned - .inner - .as_mut() - .project() - .codec - .decode_eof(&mut pinned.buffer)?; - return Poll::Ready(frame.map(Ok)); - } - - trace!("attempting to decode a frame"); - - if let Some(frame) = pinned - .inner - .as_mut() - .project() - .codec - .decode(&mut pinned.buffer)? - { - trace!("frame decoded from buffer"); - return Poll::Ready(Some(Ok(frame))); - } - - *pinned.is_readable = false; - } - - assert!(!*pinned.eof); - - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF - pinned.buffer.reserve(1); - let bytect = match pinned - .inner - .as_mut() - .poll_read_buf(cx, &mut pinned.buffer)? - { - Poll::Ready(ct) => ct, - Poll::Pending => return Poll::Pending, - }; - if bytect == 0 { - *pinned.eof = true; - } - - *pinned.is_readable = true; - } - } -} |