diff options
author | Plecra <60934058+Plecra@users.noreply.github.com> | 2020-05-12 12:47:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-12 13:47:38 +0200 |
commit | 221f421464e6672e9cf25629998477946a8b8e1f (patch) | |
tree | cdb0898dcb0a3d80177fccd0729cd28525e0b96e /tokio-util | |
parent | 1cc016833569e2dbae3b0431b7c87d5e75ef5de6 (diff) |
codec: rewrite of codec::Framed (#2368)
Framed was designed to encapsulate both AsyncRead and AsyncWrite so
that it could wrap two-way connections. It used Fuse to manage the pinned
io object between the FramedWrite and FramedRead structs.
I replaced the Fuse struct by isolating the state used in reading and
writing, and making the code generic over that instead. This means
the FramedImpl struct now has a parameter for the state, and contains
the logic for both directions. The Framed* structs are now simply
wrappers around this type
Hopefully removing the `Pin` handling made things easier to
understand, too.
Diffstat (limited to 'tokio-util')
-rw-r--r-- | tokio-util/src/codec/framed.rs | 208 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_impl.rs | 225 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_read.rs | 208 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_write.rs | 236 | ||||
-rw-r--r-- | tokio-util/src/codec/mod.rs | 4 |
5 files changed, 327 insertions, 554 deletions
diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index d2e7659e..bf05a3ae 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -1,10 +1,9 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; -use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; use tokio::{ - io::{AsyncBufRead, AsyncRead, AsyncWrite}, + io::{AsyncRead, AsyncWrite}, stream::Stream, }; @@ -12,8 +11,7 @@ use bytes::BytesMut; use futures_sink::Sink; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, BufRead, Read, Write}; -use std::mem::MaybeUninit; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -30,37 +28,7 @@ pin_project! { /// [`Decoder::framed`]: crate::codec::Decoder::framed() pub struct Framed<T, U> { #[pin] - inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, - } -} - -pin_project! { - pub(crate) struct Fuse<T, U> { - #[pin] - pub(crate) io: T, - pub(crate) codec: U, - } -} - -/// Abstracts over `FramedRead2` being either `FramedRead2<FramedWrite2<Fuse<T, U>>>` or -/// `FramedRead2<Fuse<T, U>>` and lets the io and codec parts be extracted in either case. -pub(crate) trait ProjectFuse { - type Io; - type Codec; - - fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec>; -} - -impl<T, U> ProjectFuse for Fuse<T, U> { - type Io = T; - type Codec = U; - - fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec> { - let self_ = self.project(); - Fuse { - io: self_.io, - codec: self_.codec, - } + inner: FramedImpl<T, U, RWFrames> } } @@ -93,7 +61,11 @@ where /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn new(inner: T, codec: U) -> Framed<T, U> { Framed { - inner: framed_read2(framed_write2(Fuse { io: inner, codec })), + inner: FramedImpl { + inner, + codec, + state: Default::default(), + }, } } @@ -123,10 +95,18 @@ where /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> { Framed { - inner: framed_read2_with_buffer( - framed_write2(Fuse { io: inner, codec }), - BytesMut::with_capacity(capacity), - ), + inner: FramedImpl { + inner, + codec, + state: RWFrames { + read: ReadFrame { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(capacity), + }, + write: WriteFrame::default(), + }, + }, } } } @@ -161,16 +141,14 @@ impl<T, U> Framed<T, U> { /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer( - Fuse { - io: parts.io, - codec: parts.codec, - }, - parts.write_buf, - ), - parts.read_buf, - ), + inner: FramedImpl { + inner: parts.io, + codec: parts.codec, + state: RWFrames { + read: parts.read_buf.into(), + write: parts.write_buf.into(), + }, + }, } } @@ -181,7 +159,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -191,7 +169,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().io + &mut self.inner.inner } /// Returns a reference to the underlying codec wrapped by @@ -200,7 +178,7 @@ impl<T, U> Framed<T, U> { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec(&self) -> &U { - &self.inner.get_ref().get_ref().codec + &self.inner.codec } /// Returns a mutable reference to the underlying codec wrapped by @@ -209,12 +187,12 @@ impl<T, U> Framed<T, U> { /// Note that care should be taken to not tamper with the underlying codec /// as it may corrupt the stream of frames otherwise being worked with. pub fn codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().codec + &mut self.inner.codec } /// Returns a reference to the read buffer. pub fn read_buffer(&self) -> &BytesMut { - self.inner.buffer() + &self.inner.state.read.buffer } /// Consumes the `Framed`, returning its underlying I/O stream. @@ -223,7 +201,7 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().io + self.inner.inner } /// Consumes the `Framed`, returning its underlying I/O stream, the buffer @@ -233,19 +211,17 @@ impl<T, U> Framed<T, U> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_parts(self) -> FramedParts<T, U> { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf) = inner.into_parts(); - FramedParts { - io: inner.io, - codec: inner.codec, - read_buf, - write_buf, + io: self.inner.inner, + codec: self.inner.codec, + read_buf: self.inner.state.read.buffer, + write_buf: self.inner.state.write.buffer, _priv: (), } } } +// This impl just defers to the underlying FramedImpl impl<T, U> Stream for Framed<T, U> where T: AsyncRead, @@ -258,6 +234,7 @@ where } } +// This impl just defers to the underlying FramedImpl impl<T, I, U> Sink<I> for Framed<T, U> where T: AsyncWrite, @@ -267,19 +244,19 @@ where type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project().inner.get_pin_mut().poll_ready(cx) + self.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - self.project().inner.get_pin_mut().start_send(item) + self.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project().inner.get_pin_mut().poll_flush(cx) + self.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project().inner.get_pin_mut().poll_close(cx) + self.project().inner.poll_close(cx) } } @@ -290,103 +267,12 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().io) - .field("codec", &self.inner.get_ref().get_ref().codec) + .field("io", self.get_ref()) + .field("codec", self.codec()) .finish() } } -// ===== impl Fuse ===== - -impl<T: Read, U> Read for Fuse<T, U> { - fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { - self.io.read(dst) - } -} - -impl<T: BufRead, U> BufRead for Fuse<T, U> { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.io.fill_buf() - } - - fn consume(&mut self, amt: usize) { - self.io.consume(amt) - } -} - -impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { - self.io.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<usize, io::Error>> { - self.project().io.poll_read(cx, buf) - } -} - -impl<T: AsyncBufRead, U> AsyncBufRead for Fuse<T, U> { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { - self.project().io.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().io.consume(amt) - } -} - -impl<T: Write, U> Write for Fuse<T, U> { - fn write(&mut self, src: &[u8]) -> io::Result<usize> { - self.io.write(src) - } - - fn flush(&mut self) -> io::Result<()> { - self.io.flush() - } -} - -impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize, io::Error>> { - self.project().io.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - self.project().io.poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - self.project().io.poll_shutdown(cx) - } -} - -impl<T, U: Decoder> Decoder for Fuse<T, U> { - type Item = U::Item; - type Error = U::Error; - - fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - self.codec.decode(buffer) - } - - fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - self.codec.decode_eof(buffer) - } -} - -impl<T, I, U: Encoder<I>> Encoder<I> for Fuse<T, U> { - type Error = U::Error; - - fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.codec.encode(item, dst) - } -} - /// `FramedParts` contains an export of the data of a Framed transport. /// It can be used to construct a new [`Framed`] with a different codec. /// It contains all current buffers and the inner transport. diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs new file mode 100644 index 00000000..eb2e0d38 --- /dev/null +++ b/tokio-util/src/codec/framed_impl.rs @@ -0,0 +1,225 @@ +use crate::codec::decoder::Decoder; +use crate::codec::encoder::Encoder; + +use tokio::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, +}; + +use bytes::{Buf, BytesMut}; +use futures_core::ready; +use futures_sink::Sink; +use log::trace; +use pin_project_lite::pin_project; +use std::borrow::{Borrow, BorrowMut}; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + #[derive(Debug)] + pub(crate) struct FramedImpl<T, U, State> { + #[pin] + pub(crate) inner: T, + pub(crate) state: State, + pub(crate) codec: U, + } +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +pub(crate) struct ReadFrame { + pub(crate) eof: bool, + pub(crate) is_readable: bool, + pub(crate) buffer: BytesMut, +} + +pub(crate) struct WriteFrame { + pub(crate) buffer: BytesMut, +} + +#[derive(Default)] +pub(crate) struct RWFrames { + pub(crate) read: ReadFrame, + pub(crate) write: WriteFrame, +} + +impl Default for ReadFrame { + fn default() -> Self { + Self { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } + } +} + +impl Default for WriteFrame { + fn default() -> Self { + Self { + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } + } +} + +impl From<BytesMut> for ReadFrame { + fn from(mut buffer: BytesMut) -> Self { + let size = buffer.capacity(); + if size < INITIAL_CAPACITY { + buffer.reserve(INITIAL_CAPACITY - size); + } + + Self { + buffer, + is_readable: size > 0, + eof: false, + } + } +} + +impl From<BytesMut> for WriteFrame { + fn from(mut buffer: BytesMut) -> Self { + let size = buffer.capacity(); + if size < INITIAL_CAPACITY { + buffer.reserve(INITIAL_CAPACITY - size); + } + + Self { buffer } + } +} + +impl Borrow<ReadFrame> for RWFrames { + fn borrow(&self) -> &ReadFrame { + &self.read + } +} +impl BorrowMut<ReadFrame> for RWFrames { + fn borrow_mut(&mut self) -> &mut ReadFrame { + &mut self.read + } +} +impl Borrow<WriteFrame> for RWFrames { + fn borrow(&self) -> &WriteFrame { + &self.write + } +} +impl BorrowMut<WriteFrame> for RWFrames { + fn borrow_mut(&mut self) -> &mut WriteFrame { + &mut self.write + } +} +impl<T, U, R> Stream for FramedImpl<T, U, R> +where + T: AsyncRead, + U: Decoder, + R: BorrowMut<ReadFrame>, +{ + type Item = Result<U::Item, U::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut pinned = self.project(); + let state: &mut ReadFrame = pinned.state.borrow_mut(); + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if state.is_readable { + if state.eof { + let frame = pinned.codec.decode_eof(&mut state.buffer)?; + return Poll::Ready(frame.map(Ok)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = pinned.codec.decode(&mut state.buffer)? { + trace!("frame decoded from buffer"); + return Poll::Ready(Some(Ok(frame))); + } + + state.is_readable = false; + } + + assert!(!state.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + state.buffer.reserve(1); + let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? { + Poll::Ready(ct) => ct, + Poll::Pending => return Poll::Pending, + }; + if bytect == 0 { + state.eof = true; + } + + state.is_readable = true; + } + } +} + +impl<T, I, U, W> Sink<I> for FramedImpl<T, U, W> +where + T: AsyncWrite, + U: Encoder<I>, + U::Error: From<io::Error>, + W: BorrowMut<WriteFrame>, +{ + type Error = U::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY { + self.as_mut().poll_flush(cx) + } else { + Poll::Ready(Ok(())) + } + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let pinned = self.project(); + pinned + .codec + .encode(item, &mut pinned.state.borrow_mut().buffer)?; + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + trace!("flushing framed transport"); + let mut pinned = self.project(); + + while !pinned.state.borrow_mut().buffer.is_empty() { + let WriteFrame { buffer } = pinned.state.borrow_mut(); + trace!("writing; remaining={}", buffer.len()); + + let buf = &buffer; + let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?; + + if n == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into())); + } + + pinned.state.borrow_mut().buffer.advance(n); + } + + // Try flushing the underlying IO + ready!(pinned.inner.poll_flush(cx))?; + + trace!("framed transport flushed"); + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.as_mut().poll_flush(cx))?; + ready!(self.project().inner.poll_shutdown(cx))?; + + Poll::Ready(Ok(())) + } +} diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index e7798c32..a6844b73 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,11 +1,10 @@ -use crate::codec::framed::{Fuse, ProjectFuse}; +use crate::codec::framed_impl::{FramedImpl, ReadFrame}; use crate::codec::Decoder; use tokio::{io::AsyncRead, stream::Stream}; use bytes::BytesMut; use futures_sink::Sink; -use log::trace; use pin_project_lite::pin_project; use std::fmt; use std::pin::Pin; @@ -18,22 +17,10 @@ pin_project! { /// [`AsyncRead`]: tokio::io::AsyncRead pub struct FramedRead<T, D> { #[pin] - inner: FramedRead2<Fuse<T, D>>, + inner: FramedImpl<T, D, ReadFrame>, } } -pin_project! { - pub(crate) struct FramedRead2<T> { - #[pin] - inner: T, - eof: bool, - is_readable: bool, - buffer: BytesMut, - } -} - -const INITIAL_CAPACITY: usize = 8 * 1024; - // ===== impl FramedRead ===== impl<T, D> FramedRead<T, D> @@ -44,10 +31,11 @@ where /// Creates a new `FramedRead` with the given `decoder`. pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { FramedRead { - inner: framed_read2(Fuse { - io: inner, + inner: FramedImpl { + inner, codec: decoder, - }), + state: Default::default(), + }, } } @@ -55,13 +43,15 @@ where /// initial size. pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> { FramedRead { - inner: framed_read2_with_buffer( - Fuse { - io: inner, - codec: decoder, + inner: FramedImpl { + inner, + codec: decoder, + state: ReadFrame { + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(capacity), }, - BytesMut::with_capacity(capacity), - ), + }, } } } @@ -74,7 +64,7 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -84,7 +74,7 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.io + &mut self.inner.inner } /// Consumes the `FramedRead`, returning its underlying I/O stream. @@ -93,25 +83,26 @@ impl<T, D> FramedRead<T, D> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.io + self.inner.inner } /// Returns a reference to the underlying decoder. pub fn decoder(&self) -> &D { - &self.inner.inner.codec + &self.inner.codec } /// Returns a mutable reference to the underlying decoder. pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.codec + &mut self.inner.codec } /// Returns a reference to the read buffer. pub fn read_buffer(&self) -> &BytesMut { - &self.inner.buffer + &self.inner.state.buffer } } +// This impl just defers to the underlying FramedImpl impl<T, D> Stream for FramedRead<T, D> where T: AsyncRead, @@ -132,43 +123,19 @@ where type Error = T::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_ready(cx) + self.project().inner.project().inner.poll_ready(cx) } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - self.project() - .inner - .project() - .inner - .project() - .io - .start_send(item) + self.project().inner.project().inner.start_send(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_flush(cx) + self.project().inner.project().inner.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_close(cx) + self.project().inner.project().inner.poll_close(cx) } } @@ -179,126 +146,11 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.io) - .field("decoder", &self.inner.inner.codec) - .field("eof", &self.inner.eof) - .field("is_readable", &self.inner.is_readable) - .field("buffer", &self.inner.buffer) + .field("inner", &self.get_ref()) + .field("decoder", &self.decoder()) + .field("eof", &self.inner.state.eof) + .field("is_readable", &self.inner.state.is_readable) + .field("buffer", &self.read_buffer()) .finish() } } - -// ===== impl FramedRead2 ===== - -pub(crate) fn framed_read2<T>(inner: T) -> FramedRead2<T> { - FramedRead2 { - inner, - eof: false, - is_readable: false, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub(crate) fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedRead2 { - inner, - eof: false, - is_readable: !buf.is_empty(), - buffer: buf, - } -} - -impl<T> FramedRead2<T> { - pub(crate) fn get_ref(&self) -> &T { - &self.inner - } - - pub(crate) fn into_inner(self) -> T { - self.inner - } - - pub(crate) fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.inner - } - - pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { - self.project().inner - } - - pub(crate) fn buffer(&self) -> &BytesMut { - &self.buffer - } -} - -impl<T> Stream for FramedRead2<T> -where - T: ProjectFuse + AsyncRead, - T::Codec: Decoder, -{ - type Item = Result<<T::Codec as Decoder>::Item, <T::Codec as Decoder>::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let mut pinned = self.project(); - loop { - // Repeatedly call `decode` or `decode_eof` as long as it is - // "readable". Readable is defined as not having returned `None`. If - // the upstream has returned EOF, and the decoder is no longer - // readable, it can be assumed that the decoder will never become - // readable again, at which point the stream is terminated. - if *pinned.is_readable { - if *pinned.eof { - let frame = pinned - .inner - .as_mut() - .project() - .codec - .decode_eof(&mut pinned.buffer)?; - return Poll::Ready(frame.map(Ok)); - } - - trace!("attempting to decode a frame"); - - if let Some(frame) = pinned - .inner - .as_mut() - .project() - .codec - .decode(&mut pinned.buffer)? - { - trace!("frame decoded from buffer"); - return Poll::Ready(Some(Ok(frame))); - } - - *pinned.is_readable = false; - } - - assert!(!*pinned.eof); - - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF - pinned.buffer.reserve(1); - let bytect = match pinned - .inner - .as_mut() - .poll_read_buf(cx, &mut pinned.buffer)? - { - Poll::Ready(ct) => ct, - Poll::Pending => return Poll::Pending, - }; - if bytect == 0 { - *pinned.eof = true; - } - - *pinned.is_readable = true; - } - } -} diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index c0049b2d..834eb6ed 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -1,20 +1,12 @@ -use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use crate::codec::framed::{Fuse, ProjectFuse}; +use crate::codec::framed_impl::{FramedImpl, WriteFrame}; -use tokio::{ - io::{AsyncBufRead, AsyncRead, AsyncWrite}, - stream::Stream, -}; +use tokio::{io::AsyncWrite, stream::Stream}; -use bytes::{Buf, BytesMut}; -use futures_core::ready; use futures_sink::Sink; -use log::trace; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, BufRead, Read}; -use std::mem::MaybeUninit; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -24,21 +16,10 @@ pin_project! { /// [`Sink`]: futures_sink::Sink pub struct FramedWrite<T, E> { #[pin] - inner: FramedWrite2<Fuse<T, E>>, + inner: FramedImpl<T, E, WriteFrame>, } } -pin_project! { - pub(crate) struct FramedWrite2<T> { - #[pin] - inner: T, - buffer: BytesMut, - } -} - -const INITIAL_CAPACITY: usize = 8 * 1024; -const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; - impl<T, E> FramedWrite<T, E> where T: AsyncWrite, @@ -46,10 +27,11 @@ where /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { FramedWrite { - inner: framed_write2(Fuse { - io: inner, + inner: FramedImpl { + inner, codec: encoder, - }), + state: WriteFrame::default(), + }, } } } @@ -62,7 +44,7 @@ impl<T, E> FramedWrite<T, E> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.inner.io + &self.inner.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -72,7 +54,7 @@ impl<T, E> FramedWrite<T, E> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.io + &mut self.inner.inner } /// Consumes the `FramedWrite`, returning its underlying I/O stream. @@ -81,21 +63,21 @@ impl<T, E> FramedWrite<T, E> { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.inner.io + self.inner.inner } - /// Returns a reference to the underlying decoder. + /// Returns a reference to the underlying encoder. pub fn encoder(&self) -> &E { - &self.inner.inner.codec + &self.inner.codec } - /// Returns a mutable reference to the underlying decoder. + /// Returns a mutable reference to the underlying encoder. pub fn encoder_mut(&mut self) -> &mut E { - &mut self.inner.inner.codec + &mut self.inner.codec } } -// This impl just defers to the underlying FramedWrite2 +// This impl just defers to the underlying FramedImpl impl<T, I, E> Sink<I> for FramedWrite<T, E> where T: AsyncWrite, @@ -121,6 +103,7 @@ where } } +// This impl just defers to the underlying T: Stream impl<T, D> Stream for FramedWrite<T, D> where T: Stream, @@ -128,13 +111,7 @@ where type Item = T::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - self.project() - .inner - .project() - .inner - .project() - .io - .poll_next(cx) + self.project().inner.project().inner.poll_next(cx) } } @@ -145,180 +122,9 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FramedWrite") - .field("inner", &self.inner.get_ref().io) - .field("encoder", &self.inner.get_ref().codec) - .field("buffer", &self.inner.buffer) + .field("inner", &self.get_ref()) + .field("encoder", &self.encoder()) + .field("buffer", &self.inner.state.buffer) .finish() } } - -// ===== impl FramedWrite2 ===== - -pub(crate) fn framed_write2<T>(inner: T) -> FramedWrite2<T> { - FramedWrite2 { - inner, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } - |