From 221f421464e6672e9cf25629998477946a8b8e1f Mon Sep 17 00:00:00 2001 From: Plecra <60934058+Plecra@users.noreply.github.com> Date: Tue, 12 May 2020 12:47:38 +0100 Subject: codec: rewrite of codec::Framed (#2368) Framed was designed to encapsulate both AsyncRead and AsyncWrite so that it could wrap two-way connections. It used Fuse to manage the pinned io object between the FramedWrite and FramedRead structs. I replaced the Fuse struct by isolating the state used in reading and writing, and making the code generic over that instead. This means the FramedImpl struct now has a parameter for the state, and contains the logic for both directions. The Framed* structs are now simply wrappers around this type Hopefully removing the `Pin` handling made things easier to understand, too. --- CONTRIBUTING.md | 6 + tokio-util/src/codec/framed.rs | 208 +++++++----------------------- tokio-util/src/codec/framed_impl.rs | 225 +++++++++++++++++++++++++++++++++ tokio-util/src/codec/framed_read.rs | 208 +++++------------------------- tokio-util/src/codec/framed_write.rs | 236 ++++------------------------------- tokio-util/src/codec/mod.rs | 4 + 6 files changed, 333 insertions(+), 554 deletions(-) create mode 100644 tokio-util/src/codec/framed_impl.rs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b338838e..772b1dce 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -132,9 +132,15 @@ RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features ``` The `cargo fmt` command does not work on the Tokio codebase. You can use the command below instead: + +#### Bash ``` rustfmt --check --edition 2018 $(find . -name '*.rs' -print) ``` +#### Powershell +``` +Get-ChildItem . -Filter "*.rs" -Recurse | foreach { rustfmt --check --edition 2018 $_.FullName } +``` The `--check` argument prints the things that need to be fixed. If you remove it, `rustfmt` will update your files locally instead. diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index d2e7659e..bf05a3ae 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -1,10 +1,9 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; -use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; use tokio::{ - io::{AsyncBufRead, AsyncRead, AsyncWrite}, + io::{AsyncRead, AsyncWrite}, stream::Stream, }; @@ -12,8 +11,7 @@ use bytes::BytesMut; use futures_sink::Sink; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, BufRead, Read, Write}; -use std::mem::MaybeUninit; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -30,37 +28,7 @@ pin_project! { /// [`Decoder::framed`]: crate::codec::Decoder::framed() pub struct Framed { #[pin] - inner: FramedRead2>>, - } -} - -pin_project! { - pub(crate) struct Fuse { - #[pin] - pub(crate) io: T, - pub(crate) codec: U, - } -} - -/// Abstracts over `FramedRead2` being either `FramedRead2>>` or -/// `FramedRead2>` and lets the io and codec parts be extracted in either case. -pub(crate) trait ProjectFuse { - type Io; - type Codec; - - fn project(self: Pin<&mut Self>) -> Fuse, &mut Self::Codec>; -} - -impl ProjectFuse for Fuse { - type Io = T; - type Codec = U; - - fn project(self: Pin<&mut Self>) -> Fuse, &mut Self::Codec> { - let self_ = self.project(); - Fuse { - io: self_.io, - codec: self_.codec, - } + inner: FramedImpl } } @@ -93,7 +61,11 @@ where /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn new(inner: T, codec: U) -> Framed { Framed { - inner: framed_read2(framed_write2(Fuse { io: inner, codec })), + inner: FramedImpl { + inner, + codec, + state: Default::default(), + }, } } @@ -123,10 +95,18 @@ where /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed { Framed { - inner: framed_read2_with_buffer( - framed_write2(Fuse { io: inner, codec }), - BytesMut::with_capacity(capacity), - ), + inner: FramedImpl { + inner, + codec, + state: RWFrames { + read: ReadFrame { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(capacity), + }, + write: WriteFrame::default(), + }, + }, } } } @@ -161,16 +141,14 @@ impl Framed { /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn from_parts(parts: FramedParts) -> Framed { Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer( - Fuse { - io: parts.io, - codec: parts.codec, - }, - parts.write_buf, - ), - parts.read_buf, - ), + inner: FramedImpl { + inner: parts.io, + codec: parts.codec, + state: RWFrames { + read: parts.read_buf.into(), + write: parts.write_buf.into(), + }, + }, } } @@ -181,7 +159,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -191,7 +169,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().io + &mut self.inner.inner } /// Returns a reference to the underlying codec wrapped by @@ -200,7 +178,7 @@ impl Framed { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec(&self) -> &U { - &self.inner.get_ref().get_ref().codec + &self.inner.codec } /// Returns a mutable reference to the underlying codec wrapped by @@ -209,12 +187,12 @@ impl Framed { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().codec + &mut self.inner.codec } /// Returns a reference to the read buffer. pub fn read_buffer(&self) -> &BytesMut { - self.inner.buffer() + &self.inner.state.read.buffer } /// Consumes the `Framed`, returning its underlying I/O stream. @@ -223,7 +201,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().io + self.inner.inner } /// Consumes the `Framed`, returning its underlying I/O stream, the buffer @@ -233,19 +211,17 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_parts(self) -> FramedParts { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf) = inner.into_parts(); - FramedParts { - io: inner.io, - codec: inner.codec, - read_buf, - write_buf, + io: self.inner.inner, + codec: self.inner.codec, + read_buf: self.inner.state.read.buffer, + write_buf: self.inner.state.write.buffer, _priv: (), } } } +// This impl just defers to the underlying FramedImpl impl Stream for Framed where T: AsyncRead, @@ -258,6 +234,7 @@ where } } +// This impl just defers to the underlying FramedImpl impl Sink for Framed where T: AsyncWrite, @@ -267,19 +244,19 @@ where type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.get_pin_mut().poll_ready(cx) + self.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - self.project().inner.get_pin_mut().start_send(item) + self.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.get_pin_mut().poll_flush(cx) + self.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.get_pin_mut().poll_close(cx) + self.project().inner.poll_close(cx) } } @@ -290,103 +267,12 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().io) - .field("codec", &self.inner.get_ref().get_ref().codec) + .field("io", self.get_ref()) + .field("codec", self.codec()) .finish() } } -// ===== impl Fuse ===== - -impl Read for Fuse { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.io.read(dst) - } -} - -impl BufRead for Fuse { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.io.fill_buf() - } - - fn consume(&mut self, amt: usize) { - self.io.consume(amt) - } -} - -impl AsyncRead for Fuse { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit]) -> bool { - self.io.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.project().io.poll_read(cx, buf) - } -} - -impl AsyncBufRead for Fuse { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().io.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().io.consume(amt) - } -} - -impl Write for Fuse { - fn write(&mut self, src: &[u8]) -> io::Result { - self.io.write(src) - } - - fn flush(&mut self) -> io::Result<()> { - self.io.flush() - } -} - -impl AsyncWrite for Fuse { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().io.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().io.poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().io.poll_shutdown(cx) - } -} - -impl Decoder for Fuse { - type Item = U::Item; - type Error = U::Error; - - fn decode(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { - self.codec.decode(buffer) - } - - fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { - self.codec.decode_eof(buffer) - } -} - -impl> Encoder for Fuse { - type Error = U::Error; - - fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.codec.encode(item, dst) - } -} - /// `FramedParts` contains an export of the data of a Framed transport. /// It can be used to construct a new [`Framed`] with a different codec. /// It contains all current buffers and the inner transport. diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs new file mode 100644 index 00000000..eb2e0d38 --- /dev/null +++ b/tokio-util/src/codec/framed_impl.rs @@ -0,0 +1,225 @@ +use crate::codec::decoder::Decoder; +use crate::codec::encoder::Encoder; + +use tokio::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, +}; + +use bytes::{Buf, BytesMut}; +use futures_core::ready; +use futures_sink::Sink; +use log::trace; +use pin_project_lite::pin_project; +use std::borrow::{Borrow, BorrowMut}; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + #[derive(Debug)] + pub(crate) struct FramedImpl { + #[pin] + pub(crate) inner: T, + pub(crate) state: State, + pub(crate) codec: U, + } +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +pub(crate) struct ReadFrame { + pub(crate) eof: bool, + pub(crate) is_readable: bool, + pub(crate) buffer: BytesMut, +} + +pub(crate) struct WriteFrame { + pub(crate) buffer: BytesMut, +} + +#[derive(Default)] +pub(crate) struct RWFrames { + pub(crate) read: ReadFrame, + pub(crate) write: WriteFrame, +} + +impl Default for ReadFrame { + fn default() -> Self { + Self { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } + } +} + +impl Default for WriteFrame { + fn default() -> Self { + Self { + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } + } +} + +impl From for ReadFrame { + fn from(mut buffer: BytesMut) -> Self { + let size = buffer.capacity(); + if size < INITIAL_CAPACITY { + buffer.reserve(INITIAL_CAPACITY - size); + } + + Self { + buffer, + is_readable: size > 0, + eof: false, + } + } +} + +impl From for WriteFrame { + fn from(mut buffer: BytesMut) -> Self { + let size = buffer.capacity(); + if size < INITIAL_CAPACITY { + buffer.reserve(INITIAL_CAPACITY - size); + } + + Self { buffer } + } +} + +impl Borrow for RWFrames { + fn borrow(&self) -> &ReadFrame { + &self.read + } +} +impl BorrowMut for RWFrames { + fn borrow_mut(&mut self) -> &mut ReadFrame { + &mut self.read + } +} +impl Borrow for RWFrames { + fn borrow(&self) -> &WriteFrame { + &self.write + } +} +impl BorrowMut for RWFrames { + fn borrow_mut(&mut self) -> &mut WriteFrame { + &mut self.write + } +} +impl Stream for FramedImpl +where + T: AsyncRead, + U: Decoder, + R: BorrowMut, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut pinned = self.project(); + let state: &mut ReadFrame = pinned.state.borrow_mut(); + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if state.is_readable { + if state.eof { + let frame = pinned.codec.decode_eof(&mut state.buffer)?; + return Poll::Ready(frame.map(Ok)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = pinned.codec.decode(&mut state.buffer)? { + trace!("frame decoded from buffer"); + return Poll::Ready(Some(Ok(frame))); + } + + state.is_readable = false; + } + + assert!(!state.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + state.buffer.reserve(1); + let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? { + Poll::Ready(ct) => ct, + Poll::Pending => return Poll::Pending, + }; + if bytect == 0 { + state.eof = true; + } + + state.is_readable = true; + } + } +} + +impl Sink for FramedImpl +where + T: AsyncWrite, + U: Encoder, + U::Error: From, + W: BorrowMut, +{ + type Error = U::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY { + self.as_mut().poll_flush(cx) + } else { + Poll::Ready(Ok(())) + } + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let pinned = self.project(); + pinned + .codec + .encode(item, &mut pinned.state.borrow_mut().buffer)?; + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + trace!("flushing framed transport"); + let mut pinned = self.project(); + + while !pinned.state.borrow_mut().buffer.is_empty() { + let WriteFrame { buffer } = pinned.state.borrow_mut(); + trace!("writing; remaining={}", buffer.len()); + + let buf = &buffer; + let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?; + + if n == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into())); + } + + pinned.state.borrow_mut().buffer.advance(n); + } + + // Try flushing the underlying IO + ready!(pinned.inner.poll_flush(cx))?; + + trace!("framed transport flushed"); + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().poll_flush(cx))?; + ready!(self.project().inner.poll_shutdown(cx))?; + + Poll::Ready(Ok(())) + } +} diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index e7798c32..a6844b73 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,11 +1,10 @@ -use crate::codec::framed::{Fuse, ProjectFuse}; +use crate::codec::framed_impl::{FramedImpl, ReadFrame}; use crate::codec::Decoder; use tokio::{io::AsyncRead, stream::Stream}; use bytes::BytesMut; use futures_sink::Sink; -use log::trace; use pin_project_lite::pin_project; use std::fmt; use std::pin::Pin; @@ -18,22 +17,10 @@ pin_project! { /// [`AsyncRead`]: tokio::io::AsyncRead pub struct FramedRead { #[pin] - inner: FramedRead2>, + inner: FramedImpl, } } -pin_project! { - pub(crate) struct FramedRead2 { - #[pin] - inner: T, - eof: bool, - is_readable: bool, - buffer: BytesMut, - } -} - -const INITIAL_CAPACITY: usize = 8 * 1024; - // ===== impl FramedRead ===== impl FramedRead @@ -44,10 +31,11 @@ where /// Creates a new `FramedRead` with the given `decoder`. pub fn new(inner: T, decoder: D) -> FramedRead { FramedRead { - inner: framed_read2(Fuse { - io: inner, + inner: FramedImpl { + inner, codec: decoder, - }), + state: Default::default(), + }, } } @@ -55,13 +43,15 @@ where /// initial size. pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead { FramedRead { - inner: framed_read2_with_buffer( - Fuse { - io: inner, - codec: decoder, + inner: FramedImpl { + inner, + codec: decoder, + state: ReadFrame { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(capacity), }, - BytesMut::with_capacity(capacity), - ), + }, } } } @@ -74,7 +64,7 @@ impl FramedRead { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -84,7 +74,7 @@ impl FramedRead { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.io + &mut self.inner.inner } /// Consumes the `FramedRead`, returning its underlying I/O stream. @@ -93,25 +83,26 @@ impl FramedRead { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.io + self.inner.inner } /// Returns a reference to the underlying decoder. pub fn decoder(&self) -> &D { - &self.inner.inner.codec + &self.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.codec + &mut self.inner.codec } /// Returns a reference to the read buffer. pub fn read_buffer(&self) -> &BytesMut { - &self.inner.buffer + &self.inner.state.buffer } } +// This impl just defers to the underlying FramedImpl impl Stream for FramedRead where T: AsyncRead, @@ -132,43 +123,19 @@ where type Error = T::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_ready(cx) + self.project().inner.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - self.project() - .inner - .project() - .inner - .project() - .io - .start_send(item) + self.project().inner.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_flush(cx) + self.project().inner.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_close(cx) + self.project().inner.project().inner.poll_close(cx) } } @@ -179,126 +146,11 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.io) - .field("decoder", &self.inner.inner.codec) - .field("eof", &self.inner.eof) - .field("is_readable", &self.inner.is_readable) - .field("buffer", &self.inner.buffer) + .field("inner", &self.get_ref()) + .field("decoder", &self.decoder()) + .field("eof", &self.inner.state.eof) + .field("is_readable", &self.inner.state.is_readable) + .field("buffer", &self.read_buffer()) .finish() } } - -// ===== impl FramedRead2 ===== - -pub(crate) fn framed_read2(inner: T) -> FramedRead2 { - FramedRead2 { - inner, - eof: false, - is_readable: false, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub(crate) fn framed_read2_with_buffer(inner: T, mut buf: BytesMut) -> FramedRead2 { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedRead2 { - inner, - eof: false, - is_readable: !buf.is_empty(), - buffer: buf, - } -} - -impl FramedRead2 { - pub(crate) fn get_ref(&self) -> &T { - &self.inner - } - - pub(crate) fn into_inner(self) -> T { - self.inner - } - - pub(crate) fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.inner - } - - pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { - self.project().inner - } - - pub(crate) fn buffer(&self) -> &BytesMut { - &self.buffer - } -} - -impl Stream for FramedRead2 -where - T: ProjectFuse + AsyncRead, - T::Codec: Decoder, -{ - type Item = Result<::Item, ::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut pinned = self.project(); - loop { - // Repeatedly call `decode` or `decode_eof` as long as it is - // "readable". Readable is defined as not having returned `None`. If - // the upstream has returned EOF, and the decoder is no longer - // readable, it can be assumed that the decoder will never become - // readable again, at which point the stream is terminated. - if *pinned.is_readable { - if *pinned.eof { - let frame = pinned - .inner - .as_mut() - .project() - .codec - .decode_eof(&mut pinned.buffer)?; - return Poll::Ready(frame.map(Ok)); - } - - trace!("attempting to decode a frame"); - - if let Some(frame) = pinned - .inner - .as_mut() - .project() - .codec - .decode(&mut pinned.buffer)? - { - trace!("frame decoded from buffer"); - return Poll::Ready(Some(Ok(frame))); - } - - *pinned.is_readable = false; - } - - assert!(!*pinned.eof); - - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF - pinned.buffer.reserve(1); - let bytect = match pinned - .inner - .as_mut() - .poll_read_buf(cx, &mut pinned.buffer)? - { - Poll::Ready(ct) => ct, - Poll::Pending => return Poll::Pending, - }; - if bytect == 0 { - *pinned.eof = true; - } - - *pinned.is_readable = true; - } - } -} diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index c0049b2d..834eb6ed 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -1,20 +1,12 @@ -use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed::{Fuse, ProjectFuse}; +use crate::codec::framed_impl::{FramedImpl, WriteFrame}; -use tokio::{ - io::{AsyncBufRead, AsyncRead, AsyncWrite}, - stream::Stream, -}; +use tokio::{io::AsyncWrite, stream::Stream}; -use bytes::{Buf, BytesMut}; -use futures_core::ready; use futures_sink::Sink; -use log::trace; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, BufRead, Read}; -use std::mem::MaybeUninit; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -24,21 +16,10 @@ pin_project! { /// [`Sink`]: futures_sink::Sink pub struct FramedWrite { #[pin] - inner: FramedWrite2>, + inner: FramedImpl, } } -pin_project! { - pub(crate) struct FramedWrite2 { - #[pin] - inner: T, - buffer: BytesMut, - } -} - -const INITIAL_CAPACITY: usize = 8 * 1024; -const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; - impl FramedWrite where T: AsyncWrite, @@ -46,10 +27,11 @@ where /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite { FramedWrite { - inner: framed_write2(Fuse { - io: inner, + inner: FramedImpl { + inner, codec: encoder, - }), + state: WriteFrame::default(), + }, } } } @@ -62,7 +44,7 @@ impl FramedWrite { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -72,7 +54,7 @@ impl FramedWrite { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.io + &mut self.inner.inner } /// Consumes the `FramedWrite`, returning its underlying I/O stream. @@ -81,21 +63,21 @@ impl FramedWrite { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.io + self.inner.inner } - /// Returns a reference to the underlying decoder. + /// Returns a reference to the underlying encoder. pub fn encoder(&self) -> &E { - &self.inner.inner.codec + &self.inner.codec } - /// Returns a mutable reference to the underlying decoder. + /// Returns a mutable reference to the underlying encoder. pub fn encoder_mut(&mut self) -> &mut E { - &mut self.inner.inner.codec + &mut self.inner.codec } } -// This impl just defers to the underlying FramedWrite2 +// This impl just defers to the underlying FramedImpl impl Sink for FramedWrite where T: AsyncWrite, @@ -121,6 +103,7 @@ where } } +// This impl just defers to the underlying T: Stream impl Stream for FramedWrite where T: Stream, @@ -128,13 +111,7 @@ where type Item = T::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_next(cx) + self.project().inner.project().inner.poll_next(cx) } } @@ -145,180 +122,9 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedWrite") - .field("inner", &self.inner.get_ref().io) - .field("encoder", &self.inner.get_ref().codec) - .field("buffer", &self.inner.buffer) + .field("inner", &self.get_ref()) + .field("encoder", &self.encoder()) + .field("buffer", &self.inner.state.buffer) .finish() } } - -// ===== impl FramedWrite2 ===== - -pub(crate) fn framed_write2(inner: T) -> FramedWrite2 { - FramedWrite2 { - inner, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub(crate) fn framed_write2_with_buffer(inner: T, mut buf: BytesMut) -> FramedWrite2 { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedWrite2 { inner, buffer: buf } -} - -impl FramedWrite2 { - pub(crate) fn get_ref(&self) -> &T { - &self.inner - } - - pub(crate) fn into_inner(self) -> T { - self.inner - } - - pub(crate) fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.inner - } -} - -impl Sink for FramedWrite2 -where - T: ProjectFuse + AsyncWrite, - T::Codec: Encoder, -{ - type Error = >::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's - // *still* over 8KiB, then apply backpressure (reject the send). - if self.buffer.len() >= BACKPRESSURE_BOUNDARY { - match self.as_mut().poll_flush(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Ready(Ok(())) => (), - }; - - if self.buffer.len() >= BACKPRESSURE_BOUNDARY { - return Poll::Pending; - } - } - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let mut pinned = self.project(); - pinned - .inner - .project() - .codec - .encode(item, &mut pinned.buffer)?; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - trace!("flushing framed transport"); - let mut pinned = self.project(); - - while !pinned.buffer.is_empty() { - trace!("writing; remaining={}", pinned.buffer.len()); - - let buf = &pinned.buffer; - let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?; - - if n == 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to \ - write frame to transport", - ) - .into())); - } - - pinned.buffer.advance(n); - } - - // Try flushing the underlying IO - ready!(pinned.inner.poll_flush(cx))?; - - trace!("framed transport flushed"); - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().poll_flush(cx))?; - ready!(self.project().inner.poll_shutdown(cx))?; - - Poll::Ready(Ok(())) - } -} - -impl Decoder for FramedWrite2 { - type Item = T::Item; - type Error = T::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, T::Error> { - self.inner.decode(src) - } - - fn decode_eof(&mut self, src: &mut BytesMut) -> Result, T::Error> { - self.inner.decode_eof(src) - } -} - -impl Read for FramedWrite2 { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.inner.read(dst) - } -} - -impl BufRead for FramedWrite2 { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.inner.fill_buf() - } - - fn consume(&mut self, amt: usize) { - self.inner.consume(amt) - } -} - -impl AsyncRead for FramedWrite2 { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.project().inner.poll_read(cx, buf) - } -} - -impl AsyncBufRead for FramedWrite2 { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().inner.consume(amt) - } -} - -impl ProjectFuse for FramedWrite2 -where - T: ProjectFuse, -{ - type Io = T::Io; - type Codec = T::Codec; - - fn project(self: Pin<&mut Self>) -> Fuse, &mut Self::Codec> { - self.project().inner.project() - } -} diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index ec76a641..96e9f9f9 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -18,6 +18,10 @@ pub use self::decoder::Decoder; mod encoder; pub use self::encoder::Encoder; +mod framed_impl; +#[allow(unused_imports)] +pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; + mod framed; pub use self::framed::{Framed, FramedParts}; -- cgit v1.2.3