summaryrefslogtreecommitdiffstats
path: root/tokio-util
diff options
context:
space:
mode:
authorPlecra <60934058+Plecra@users.noreply.github.com>2020-05-12 12:47:38 +0100
committerGitHub <noreply@github.com>2020-05-12 13:47:38 +0200
commit221f421464e6672e9cf25629998477946a8b8e1f (patch)
treecdb0898dcb0a3d80177fccd0729cd28525e0b96e /tokio-util
parent1cc016833569e2dbae3b0431b7c87d5e75ef5de6 (diff)
codec: rewrite of codec::Framed (#2368)
Framed was designed to encapsulate both AsyncRead and AsyncWrite so that it could wrap two-way connections. It used Fuse to manage the pinned io object between the FramedWrite and FramedRead structs. I replaced the Fuse struct by isolating the state used in reading and writing, and making the code generic over that instead. This means the FramedImpl struct now has a parameter for the state, and contains the logic for both directions. The Framed* structs are now simply wrappers around this type Hopefully removing the `Pin` handling made things easier to understand, too.
Diffstat (limited to 'tokio-util')
-rw-r--r--tokio-util/src/codec/framed.rs208
-rw-r--r--tokio-util/src/codec/framed_impl.rs225
-rw-r--r--tokio-util/src/codec/framed_read.rs208
-rw-r--r--tokio-util/src/codec/framed_write.rs236
-rw-r--r--tokio-util/src/codec/mod.rs4
5 files changed, 327 insertions, 554 deletions
diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs
index d2e7659e..bf05a3ae 100644
--- a/tokio-util/src/codec/framed.rs
+++ b/tokio-util/src/codec/framed.rs
@@ -1,10 +1,9 @@
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
-use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
-use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
+use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
use tokio::{
- io::{AsyncBufRead, AsyncRead, AsyncWrite},
+ io::{AsyncRead, AsyncWrite},
stream::Stream,
};
@@ -12,8 +11,7 @@ use bytes::BytesMut;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::fmt;
-use std::io::{self, BufRead, Read, Write};
-use std::mem::MaybeUninit;
+use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -30,37 +28,7 @@ pin_project! {
/// [`Decoder::framed`]: crate::codec::Decoder::framed()
pub struct Framed<T, U> {
#[pin]
- inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
- }
-}
-
-pin_project! {
- pub(crate) struct Fuse<T, U> {
- #[pin]
- pub(crate) io: T,
- pub(crate) codec: U,
- }
-}
-
-/// Abstracts over `FramedRead2` being either `FramedRead2<FramedWrite2<Fuse<T, U>>>` or
-/// `FramedRead2<Fuse<T, U>>` and lets the io and codec parts be extracted in either case.
-pub(crate) trait ProjectFuse {
- type Io;
- type Codec;
-
- fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec>;
-}
-
-impl<T, U> ProjectFuse for Fuse<T, U> {
- type Io = T;
- type Codec = U;
-
- fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec> {
- let self_ = self.project();
- Fuse {
- io: self_.io,
- codec: self_.codec,
- }
+ inner: FramedImpl<T, U, RWFrames>
}
}
@@ -93,7 +61,11 @@ where
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn new(inner: T, codec: U) -> Framed<T, U> {
Framed {
- inner: framed_read2(framed_write2(Fuse { io: inner, codec })),
+ inner: FramedImpl {
+ inner,
+ codec,
+ state: Default::default(),
+ },
}
}
@@ -123,10 +95,18 @@ where
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
Framed {
- inner: framed_read2_with_buffer(
- framed_write2(Fuse { io: inner, codec }),
- BytesMut::with_capacity(capacity),
- ),
+ inner: FramedImpl {
+ inner,
+ codec,
+ state: RWFrames {
+ read: ReadFrame {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(capacity),
+ },
+ write: WriteFrame::default(),
+ },
+ },
}
}
}
@@ -161,16 +141,14 @@ impl<T, U> Framed<T, U> {
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
- inner: framed_read2_with_buffer(
- framed_write2_with_buffer(
- Fuse {
- io: parts.io,
- codec: parts.codec,
- },
- parts.write_buf,
- ),
- parts.read_buf,
- ),
+ inner: FramedImpl {
+ inner: parts.io,
+ codec: parts.codec,
+ state: RWFrames {
+ read: parts.read_buf.into(),
+ write: parts.write_buf.into(),
+ },
+ },
}
}
@@ -181,7 +159,7 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
- &self.inner.get_ref().get_ref().io
+ &self.inner.inner
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
@@ -191,7 +169,7 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
- &mut self.inner.get_mut().get_mut().io
+ &mut self.inner.inner
}
/// Returns a reference to the underlying codec wrapped by
@@ -200,7 +178,7 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying codec
/// as it may corrupt the stream of frames otherwise being worked with.
pub fn codec(&self) -> &U {
- &self.inner.get_ref().get_ref().codec
+ &self.inner.codec
}
/// Returns a mutable reference to the underlying codec wrapped by
@@ -209,12 +187,12 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying codec
/// as it may corrupt the stream of frames otherwise being worked with.
pub fn codec_mut(&mut self) -> &mut U {
- &mut self.inner.get_mut().get_mut().codec
+ &mut self.inner.codec
}
/// Returns a reference to the read buffer.
pub fn read_buffer(&self) -> &BytesMut {
- self.inner.buffer()
+ &self.inner.state.read.buffer
}
/// Consumes the `Framed`, returning its underlying I/O stream.
@@ -223,7 +201,7 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
- self.inner.into_inner().into_inner().io
+ self.inner.inner
}
/// Consumes the `Framed`, returning its underlying I/O stream, the buffer
@@ -233,19 +211,17 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
- let (inner, read_buf) = self.inner.into_parts();
- let (inner, write_buf) = inner.into_parts();
-
FramedParts {
- io: inner.io,
- codec: inner.codec,
- read_buf,
- write_buf,
+ io: self.inner.inner,
+ codec: self.inner.codec,
+ read_buf: self.inner.state.read.buffer,
+ write_buf: self.inner.state.write.buffer,
_priv: (),
}
}
}
+// This impl just defers to the underlying FramedImpl
impl<T, U> Stream for Framed<T, U>
where
T: AsyncRead,
@@ -258,6 +234,7 @@ where
}
}
+// This impl just defers to the underlying FramedImpl
impl<T, I, U> Sink<I> for Framed<T, U>
where
T: AsyncWrite,
@@ -267,19 +244,19 @@ where
type Error = U::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project().inner.get_pin_mut().poll_ready(cx)
+ self.project().inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
- self.project().inner.get_pin_mut().start_send(item)
+ self.project().inner.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project().inner.get_pin_mut().poll_flush(cx)
+ self.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project().inner.get_pin_mut().poll_close(cx)
+ self.project().inner.poll_close(cx)
}
}
@@ -290,103 +267,12 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Framed")
- .field("io", &self.inner.get_ref().get_ref().io)
- .field("codec", &self.inner.get_ref().get_ref().codec)
+ .field("io", self.get_ref())
+ .field("codec", self.codec())
.finish()
}
}
-// ===== impl Fuse =====
-
-impl<T: Read, U> Read for Fuse<T, U> {
- fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
- self.io.read(dst)
- }
-}
-
-impl<T: BufRead, U> BufRead for Fuse<T, U> {
- fn fill_buf(&mut self) -> io::Result<&[u8]> {
- self.io.fill_buf()
- }
-
- fn consume(&mut self, amt: usize) {
- self.io.consume(amt)
- }
-}
-
-impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
- unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
- self.io.prepare_uninitialized_buffer(buf)
- }
-
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<Result<usize, io::Error>> {
- self.project().io.poll_read(cx, buf)
- }
-}
-
-impl<T: AsyncBufRead, U> AsyncBufRead for Fuse<T, U> {
- fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
- self.project().io.poll_fill_buf(cx)
- }
-
- fn consume(self: Pin<&mut Self>, amt: usize) {
- self.project().io.consume(amt)
- }
-}
-
-impl<T: Write, U> Write for Fuse<T, U> {
- fn write(&mut self, src: &[u8]) -> io::Result<usize> {
- self.io.write(src)
- }
-
- fn flush(&mut self) -> io::Result<()> {
- self.io.flush()
- }
-}
-
-impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- self.project().io.poll_write(cx, buf)
- }
-
- fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- self.project().io.poll_flush(cx)
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- self.project().io.poll_shutdown(cx)
- }
-}
-
-impl<T, U: Decoder> Decoder for Fuse<T, U> {
- type Item = U::Item;
- type Error = U::Error;
-
- fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
- self.codec.decode(buffer)
- }
-
- fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
- self.codec.decode_eof(buffer)
- }
-}
-
-impl<T, I, U: Encoder<I>> Encoder<I> for Fuse<T, U> {
- type Error = U::Error;
-
- fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
- self.codec.encode(item, dst)
- }
-}
-
/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new [`Framed`] with a different codec.
/// It contains all current buffers and the inner transport.
diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs
new file mode 100644
index 00000000..eb2e0d38
--- /dev/null
+++ b/tokio-util/src/codec/framed_impl.rs
@@ -0,0 +1,225 @@
+use crate::codec::decoder::Decoder;
+use crate::codec::encoder::Encoder;
+
+use tokio::{
+ io::{AsyncRead, AsyncWrite},
+ stream::Stream,
+};
+
+use bytes::{Buf, BytesMut};
+use futures_core::ready;
+use futures_sink::Sink;
+use log::trace;
+use pin_project_lite::pin_project;
+use std::borrow::{Borrow, BorrowMut};
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ #[derive(Debug)]
+ pub(crate) struct FramedImpl<T, U, State> {
+ #[pin]
+ pub(crate) inner: T,
+ pub(crate) state: State,
+ pub(crate) codec: U,
+ }
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
+
+pub(crate) struct ReadFrame {
+ pub(crate) eof: bool,
+ pub(crate) is_readable: bool,
+ pub(crate) buffer: BytesMut,
+}
+
+pub(crate) struct WriteFrame {
+ pub(crate) buffer: BytesMut,
+}
+
+#[derive(Default)]
+pub(crate) struct RWFrames {
+ pub(crate) read: ReadFrame,
+ pub(crate) write: WriteFrame,
+}
+
+impl Default for ReadFrame {
+ fn default() -> Self {
+ Self {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+ }
+}
+
+impl Default for WriteFrame {
+ fn default() -> Self {
+ Self {
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+ }
+}
+
+impl From<BytesMut> for ReadFrame {
+ fn from(mut buffer: BytesMut) -> Self {
+ let size = buffer.capacity();
+ if size < INITIAL_CAPACITY {
+ buffer.reserve(INITIAL_CAPACITY - size);
+ }
+
+ Self {
+ buffer,
+ is_readable: size > 0,
+ eof: false,
+ }
+ }
+}
+
+impl From<BytesMut> for WriteFrame {
+ fn from(mut buffer: BytesMut) -> Self {
+ let size = buffer.capacity();
+ if size < INITIAL_CAPACITY {
+ buffer.reserve(INITIAL_CAPACITY - size);
+ }
+
+ Self { buffer }
+ }
+}
+
+impl Borrow<ReadFrame> for RWFrames {
+ fn borrow(&self) -> &ReadFrame {
+ &self.read
+ }
+}
+impl BorrowMut<ReadFrame> for RWFrames {
+ fn borrow_mut(&mut self) -> &mut ReadFrame {
+ &mut self.read
+ }
+}
+impl Borrow<WriteFrame> for RWFrames {
+ fn borrow(&self) -> &WriteFrame {
+ &self.write
+ }
+}
+impl BorrowMut<WriteFrame> for RWFrames {
+ fn borrow_mut(&mut self) -> &mut WriteFrame {
+ &mut self.write
+ }
+}
+impl<T, U, R> Stream for FramedImpl<T, U, R>
+where
+ T: AsyncRead,
+ U: Decoder,
+ R: BorrowMut<ReadFrame>,
+{
+ type Item = Result<U::Item, U::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut pinned = self.project();
+ let state: &mut ReadFrame = pinned.state.borrow_mut();
+ loop {
+ // Repeatedly call `decode` or `decode_eof` as long as it is
+ // "readable". Readable is defined as not having returned `None`. If
+ // the upstream has returned EOF, and the decoder is no longer
+ // readable, it can be assumed that the decoder will never become
+ // readable again, at which point the stream is terminated.
+ if state.is_readable {
+ if state.eof {
+ let frame = pinned.codec.decode_eof(&mut state.buffer)?;
+ return Poll::Ready(frame.map(Ok));
+ }
+
+ trace!("attempting to decode a frame");
+
+ if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
+ trace!("frame decoded from buffer");
+ return Poll::Ready(Some(Ok(frame)));
+ }
+
+ state.is_readable = false;
+ }
+
+ assert!(!state.eof);
+
+ // Otherwise, try to read more data and try again. Make sure we've
+ // got room for at least one byte to read to ensure that we don't
+ // get a spurious 0 that looks like EOF
+ state.buffer.reserve(1);
+ let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
+ Poll::Ready(ct) => ct,
+ Poll::Pending => return Poll::Pending,
+ };
+ if bytect == 0 {
+ state.eof = true;
+ }
+
+ state.is_readable = true;
+ }
+ }
+}
+
+impl<T, I, U, W> Sink<I> for FramedImpl<T, U, W>
+where
+ T: AsyncWrite,
+ U: Encoder<I>,
+ U::Error: From<io::Error>,
+ W: BorrowMut<WriteFrame>,
+{
+ type Error = U::Error;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY {
+ self.as_mut().poll_flush(cx)
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ let pinned = self.project();
+ pinned
+ .codec
+ .encode(item, &mut pinned.state.borrow_mut().buffer)?;
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ trace!("flushing framed transport");
+ let mut pinned = self.project();
+
+ while !pinned.state.borrow_mut().buffer.is_empty() {
+ let WriteFrame { buffer } = pinned.state.borrow_mut();
+ trace!("writing; remaining={}", buffer.len());
+
+ let buf = &buffer;
+ let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?;
+
+ if n == 0 {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to \
+ write frame to transport",
+ )
+ .into()));
+ }
+
+ pinned.state.borrow_mut().buffer.advance(n);
+ }
+
+ // Try flushing the underlying IO
+ ready!(pinned.inner.poll_flush(cx))?;
+
+ trace!("framed transport flushed");
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll_flush(cx))?;
+ ready!(self.project().inner.poll_shutdown(cx))?;
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs
index e7798c32..a6844b73 100644
--- a/tokio-util/src/codec/framed_read.rs
+++ b/tokio-util/src/codec/framed_read.rs
@@ -1,11 +1,10 @@
-use crate::codec::framed::{Fuse, ProjectFuse};
+use crate::codec::framed_impl::{FramedImpl, ReadFrame};
use crate::codec::Decoder;
use tokio::{io::AsyncRead, stream::Stream};
use bytes::BytesMut;
use futures_sink::Sink;
-use log::trace;
use pin_project_lite::pin_project;
use std::fmt;
use std::pin::Pin;
@@ -18,22 +17,10 @@ pin_project! {
/// [`AsyncRead`]: tokio::io::AsyncRead
pub struct FramedRead<T, D> {
#[pin]
- inner: FramedRead2<Fuse<T, D>>,
+ inner: FramedImpl<T, D, ReadFrame>,
}
}
-pin_project! {
- pub(crate) struct FramedRead2<T> {
- #[pin]
- inner: T,
- eof: bool,
- is_readable: bool,
- buffer: BytesMut,
- }
-}
-
-const INITIAL_CAPACITY: usize = 8 * 1024;
-
// ===== impl FramedRead =====
impl<T, D> FramedRead<T, D>
@@ -44,10 +31,11 @@ where
/// Creates a new `FramedRead` with the given `decoder`.
pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
FramedRead {
- inner: framed_read2(Fuse {
- io: inner,
+ inner: FramedImpl {
+ inner,
codec: decoder,
- }),
+ state: Default::default(),
+ },
}
}
@@ -55,13 +43,15 @@ where
/// initial size.
pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
FramedRead {
- inner: framed_read2_with_buffer(
- Fuse {
- io: inner,
- codec: decoder,
+ inner: FramedImpl {
+ inner,
+ codec: decoder,
+ state: ReadFrame {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(capacity),
},
- BytesMut::with_capacity(capacity),
- ),
+ },
}
}
}
@@ -74,7 +64,7 @@ impl<T, D> FramedRead<T, D> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
- &self.inner.inner.io
+ &self.inner.inner
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
@@ -84,7 +74,7 @@ impl<T, D> FramedRead<T, D> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
- &mut self.inner.inner.io
+ &mut self.inner.inner
}
/// Consumes the `FramedRead`, returning its underlying I/O stream.
@@ -93,25 +83,26 @@ impl<T, D> FramedRead<T, D> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
- self.inner.inner.io
+ self.inner.inner
}
/// Returns a reference to the underlying decoder.
pub fn decoder(&self) -> &D {
- &self.inner.inner.codec
+ &self.inner.codec
}
/// Returns a mutable reference to the underlying decoder.
pub fn decoder_mut(&mut self) -> &mut D {
- &mut self.inner.inner.codec
+ &mut self.inner.codec
}
/// Returns a reference to the read buffer.
pub fn read_buffer(&self) -> &BytesMut {
- &self.inner.buffer
+ &self.inner.state.buffer
}
}
+// This impl just defers to the underlying FramedImpl
impl<T, D> Stream for FramedRead<T, D>
where
T: AsyncRead,
@@ -132,43 +123,19 @@ where
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_ready(cx)
+ self.project().inner.project().inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .start_send(item)
+ self.project().inner.project().inner.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_flush(cx)
+ self.project().inner.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_close(cx)
+ self.project().inner.project().inner.poll_close(cx)
}
}
@@ -179,126 +146,11 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FramedRead")
- .field("inner", &self.inner.inner.io)
- .field("decoder", &self.inner.inner.codec)
- .field("eof", &self.inner.eof)
- .field("is_readable", &self.inner.is_readable)
- .field("buffer", &self.inner.buffer)
+ .field("inner", &self.get_ref())
+ .field("decoder", &self.decoder())
+ .field("eof", &self.inner.state.eof)
+ .field("is_readable", &self.inner.state.is_readable)
+ .field("buffer", &self.read_buffer())
.finish()
}
}
-
-// ===== impl FramedRead2 =====
-
-pub(crate) fn framed_read2<T>(inner: T) -> FramedRead2<T> {
- FramedRead2 {
- inner,
- eof: false,
- is_readable: false,
- buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
- }
-}
-
-pub(crate) fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
- if buf.capacity() < INITIAL_CAPACITY {
- let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
- buf.reserve(bytes_to_reserve);
- }
- FramedRead2 {
- inner,
- eof: false,
- is_readable: !buf.is_empty(),
- buffer: buf,
- }
-}
-
-impl<T> FramedRead2<T> {
- pub(crate) fn get_ref(&self) -> &T {
- &self.inner
- }
-
- pub(crate) fn into_inner(self) -> T {
- self.inner
- }
-
- pub(crate) fn into_parts(self) -> (T, BytesMut) {
- (self.inner, self.buffer)
- }
-
- pub(crate) fn get_mut(&mut self) -> &mut T {
- &mut self.inner
- }
-
- pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
- self.project().inner
- }
-
- pub(crate) fn buffer(&self) -> &BytesMut {
- &self.buffer
- }
-}
-
-impl<T> Stream for FramedRead2<T>
-where
- T: ProjectFuse + AsyncRead,
- T::Codec: Decoder,
-{
- type Item = Result<<T::Codec as Decoder>::Item, <T::Codec as Decoder>::Error>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let mut pinned = self.project();
- loop {
- // Repeatedly call `decode` or `decode_eof` as long as it is
- // "readable". Readable is defined as not having returned `None`. If
- // the upstream has returned EOF, and the decoder is no longer
- // readable, it can be assumed that the decoder will never become
- // readable again, at which point the stream is terminated.
- if *pinned.is_readable {
- if *pinned.eof {
- let frame = pinned
- .inner
- .as_mut()
- .project()
- .codec
- .decode_eof(&mut pinned.buffer)?;
- return Poll::Ready(frame.map(Ok));
- }
-
- trace!("attempting to decode a frame");
-
- if let Some(frame) = pinned
- .inner
- .as_mut()
- .project()
- .codec
- .decode(&mut pinned.buffer)?
- {
- trace!("frame decoded from buffer");
- return Poll::Ready(Some(Ok(frame)));
- }
-
- *pinned.is_readable = false;
- }
-
- assert!(!*pinned.eof);
-
- // Otherwise, try to read more data and try again. Make sure we've
- // got room for at least one byte to read to ensure that we don't
- // get a spurious 0 that looks like EOF
- pinned.buffer.reserve(1);
- let bytect = match pinned
- .inner
- .as_mut()
- .poll_read_buf(cx, &mut pinned.buffer)?
- {
- Poll::Ready(ct) => ct,
- Poll::Pending => return Poll::Pending,
- };
- if bytect == 0 {
- *pinned.eof = true;
- }
-
- *pinned.is_readable = true;
- }
- }
-}
diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs
index c0049b2d..834eb6ed 100644
--- a/tokio-util/src/codec/framed_write.rs
+++ b/tokio-util/src/codec/framed_write.rs
@@ -1,20 +1,12 @@
-use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
-use crate::codec::framed::{Fuse, ProjectFuse};
+use crate::codec::framed_impl::{FramedImpl, WriteFrame};
-use tokio::{
- io::{AsyncBufRead, AsyncRead, AsyncWrite},
- stream::Stream,
-};
+use tokio::{io::AsyncWrite, stream::Stream};
-use bytes::{Buf, BytesMut};
-use futures_core::ready;
use futures_sink::Sink;
-use log::trace;
use pin_project_lite::pin_project;
use std::fmt;
-use std::io::{self, BufRead, Read};
-use std::mem::MaybeUninit;
+use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -24,21 +16,10 @@ pin_project! {
/// [`Sink`]: futures_sink::Sink
pub struct FramedWrite<T, E> {
#[pin]
- inner: FramedWrite2<Fuse<T, E>>,
+ inner: FramedImpl<T, E, WriteFrame>,
}
}
-pin_project! {
- pub(crate) struct FramedWrite2<T> {
- #[pin]
- inner: T,
- buffer: BytesMut,
- }
-}
-
-const INITIAL_CAPACITY: usize = 8 * 1024;
-const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
-
impl<T, E> FramedWrite<T, E>
where
T: AsyncWrite,
@@ -46,10 +27,11 @@ where
/// Creates a new `FramedWrite` with the given `encoder`.
pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
FramedWrite {
- inner: framed_write2(Fuse {
- io: inner,
+ inner: FramedImpl {
+ inner,
codec: encoder,
- }),
+ state: WriteFrame::default(),
+ },
}
}
}
@@ -62,7 +44,7 @@ impl<T, E> FramedWrite<T, E> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
- &self.inner.inner.io
+ &self.inner.inner
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
@@ -72,7 +54,7 @@ impl<T, E> FramedWrite<T, E> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
- &mut self.inner.inner.io
+ &mut self.inner.inner
}
/// Consumes the `FramedWrite`, returning its underlying I/O stream.
@@ -81,21 +63,21 @@ impl<T, E> FramedWrite<T, E> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
- self.inner.inner.io
+ self.inner.inner
}
- /// Returns a reference to the underlying decoder.
+ /// Returns a reference to the underlying encoder.
pub fn encoder(&self) -> &E {
- &self.inner.inner.codec
+ &self.inner.codec
}
- /// Returns a mutable reference to the underlying decoder.
+ /// Returns a mutable reference to the underlying encoder.
pub fn encoder_mut(&mut self) -> &mut E {
- &mut self.inner.inner.codec
+ &mut self.inner.codec
}
}
-// This impl just defers to the underlying FramedWrite2
+// This impl just defers to the underlying FramedImpl
impl<T, I, E> Sink<I> for FramedWrite<T, E>
where
T: AsyncWrite,
@@ -121,6 +103,7 @@ where
}
}
+// This impl just defers to the underlying T: Stream
impl<T, D> Stream for FramedWrite<T, D>
where
T: Stream,
@@ -128,13 +111,7 @@ where
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- self.project()
- .inner
- .project()
- .inner
- .project()
- .io
- .poll_next(cx)
+ self.project().inner.project().inner.poll_next(cx)
}
}
@@ -145,180 +122,9 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FramedWrite")
- .field("inner", &self.inner.get_ref().io)
- .field("encoder", &self.inner.get_ref().codec)
- .field("buffer", &self.inner.buffer)
+ .field("inner", &self.get_ref())
+ .field("encoder", &self.encoder())
+ .field("buffer", &self.inner.state.buffer)
.finish()
}
}
-
-// ===== impl FramedWrite2 =====
-
-pub(crate) fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
- FramedWrite2 {
- inner,
- buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
- }
-