summaryrefslogtreecommitdiffstats
path: root/tokio-codec
diff options
context:
space:
mode:
authorjesskfullwood <38404589+jesskfullwood@users.noreply.github.com>2019-06-27 18:10:29 +0100
committerCarl Lerche <me@carllerche.com>2019-06-27 10:10:29 -0700
commit6b9e7bdace71bf64e12f7ec461a351f7eb188f60 (patch)
tree060df8190bda5ea19366d673704d867b671f0097 /tokio-codec
parented4d4a5353d07d2428072965ea23c9a6eba5d87d (diff)
codec: update to use std-future (#1214)
Strategy was to - copy the old codec code that was temporarily being stashed in `tokio-io` - modify all the type signatures to use Pin, as literal a translation as possible - fix up the tests likewise This is intended just to get things compiling and passing tests. Beyond that there is surely lots of refactoring that can be done to make things more idiomatic. The docs are unchanged. Closes #1189
Diffstat (limited to 'tokio-codec')
-rw-r--r--tokio-codec/Cargo.toml8
-rw-r--r--tokio-codec/src/bytes_codec.rs3
-rw-r--r--tokio-codec/src/decoder.rs117
-rw-r--r--tokio-codec/src/encoder.rs25
-rw-r--r--tokio-codec/src/framed.rs308
-rw-r--r--tokio-codec/src/framed_read.rs225
-rw-r--r--tokio-codec/src/framed_write.rs271
-rw-r--r--tokio-codec/src/lib.rs14
-rw-r--r--tokio-codec/src/lines_codec.rs45
-rw-r--r--tokio-codec/src/macros.rs22
-rw-r--r--tokio-codec/tests/codecs.rs4
-rw-r--r--tokio-codec/tests/framed.rs31
-rw-r--r--tokio-codec/tests/framed_read.rs182
-rw-r--r--tokio-codec/tests/framed_write.rs99
14 files changed, 1255 insertions, 99 deletions
diff --git a/tokio-codec/Cargo.toml b/tokio-codec/Cargo.toml
index bbc29c33..e34e7b2e 100644
--- a/tokio-codec/Cargo.toml
+++ b/tokio-codec/Cargo.toml
@@ -24,4 +24,10 @@ publish = false
[dependencies]
tokio-io = { version = "0.2.0", path = "../tokio-io" }
bytes = "0.4.7"
-futures = "0.1.18"
+tokio-futures = { version = "0.2.0", path = "../tokio-futures" }
+log = "0.4"
+
+[dev-dependencies]
+futures-preview = "0.3.0-alpha.16"
+tokio-current-thread = { version = "0.2.0", path = "../tokio-current-thread" }
+tokio-test = { version = "0.2.0", path = "../tokio-test" } \ No newline at end of file
diff --git a/tokio-codec/src/bytes_codec.rs b/tokio-codec/src/bytes_codec.rs
index 3d6e979d..b4a0fa31 100644
--- a/tokio-codec/src/bytes_codec.rs
+++ b/tokio-codec/src/bytes_codec.rs
@@ -1,6 +1,7 @@
+use crate::decoder::Decoder;
+use crate::encoder::Encoder;
use bytes::{BufMut, Bytes, BytesMut};
use std::io;
-use tokio_io::_tokio_codec::{Decoder, Encoder};
/// A simple `Codec` implementation that just ships bytes around.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
diff --git a/tokio-codec/src/decoder.rs b/tokio-codec/src/decoder.rs
new file mode 100644
index 00000000..f492b0f1
--- /dev/null
+++ b/tokio-codec/src/decoder.rs
@@ -0,0 +1,117 @@
+use bytes::BytesMut;
+use std::io;
+use tokio_io::{AsyncRead, AsyncWrite};
+
+use super::encoder::Encoder;
+
+use super::Framed;
+
+/// Decoding of frames via buffers.
+///
+/// This trait is used when constructing an instance of `Framed` or
+/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
+/// already been buffered in `src` and decodes the data into a stream of
+/// `Self::Item` frames.
+///
+/// Implementations are able to track state on `self`, which enables
+/// implementing stateful streaming parsers. In many cases, though, this type
+/// will simply be a unit struct (e.g. `struct HttpDecoder`).
+
+// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and
+// there doesn't seem to be a way to un-deprecate the re-export.
+pub trait Decoder {
+ /// The type of decoded frames.
+ type Item;
+
+ /// The type of unrecoverable frame decoding errors.
+ ///
+ /// If an individual message is ill-formed but can be ignored without
+ /// interfering with the processing of future messages, it may be more
+ /// useful to report the failure as an `Item`.
+ ///
+ /// `From<io::Error>` is required in the interest of making `Error` suitable
+ /// for returning directly from a `FramedRead`, and to enable the default
+ /// implementation of `decode_eof` to yield an `io::Error` when the decoder
+ /// fails to consume all available data.
+ ///
+ /// Note that implementors of this trait can simply indicate `type Error =
+ /// io::Error` to use I/O errors as this type.
+ type Error: From<io::Error>;
+
+ /// Attempts to decode a frame from the provided buffer of bytes.
+ ///
+ /// This method is called by `FramedRead` whenever bytes are ready to be
+ /// parsed. The provided buffer of bytes is what's been read so far, and
+ /// this instance of `Decode` can determine whether an entire frame is in
+ /// the buffer and is ready to be returned.
+ ///
+ /// If an entire frame is available, then this instance will remove those
+ /// bytes from the buffer provided and return them as a decoded
+ /// frame. Note that removing bytes from the provided buffer doesn't always
+ /// necessarily copy the bytes, so this should be an efficient operation in
+ /// most circumstances.
+ ///
+ /// If the bytes look valid, but a frame isn't fully available yet, then
+ /// `Ok(None)` is returned. This indicates to the `Framed` instance that
+ /// it needs to read some more bytes before calling this method again.
+ ///
+ /// Note that the bytes provided may be empty. If a previous call to
+ /// `decode` consumed all the bytes in the buffer then `decode` will be
+ /// called again until it returns `Ok(None)`, indicating that more bytes need to
+ /// be read.
+ ///
+ /// Finally, if the bytes in the buffer are malformed then an error is
+ /// returned indicating why. This informs `Framed` that the stream is now
+ /// corrupt and should be terminated.
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
+
+ /// A default method available to be called when there are no more bytes
+ /// available to be read from the underlying I/O.
+ ///
+ /// This method defaults to calling `decode` and returns an error if
+ /// `Ok(None)` is returned while there is unconsumed data in `buf`.
+ /// Typically this doesn't need to be implemented unless the framing
+ /// protocol differs near the end of the stream.
+ ///
+ /// Note that the `buf` argument may be empty. If a previous call to
+ /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
+ /// called again until it returns `None`, indicating that there are no more
+ /// frames to yield. This behavior enables returning finalization frames
+ /// that may not be based on inbound data.
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ match self.decode(buf)? {
+ Some(frame) => Ok(Some(frame)),
+ None => {
+ if buf.is_empty() {
+ Ok(None)
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
+ }
+ }
+ }
+ }
+
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
+ where
+ Self: Encoder + Sized,
+ {
+ Framed::new(io, self)
+ }
+}
diff --git a/tokio-codec/src/encoder.rs b/tokio-codec/src/encoder.rs
new file mode 100644
index 00000000..50650803
--- /dev/null
+++ b/tokio-codec/src/encoder.rs
@@ -0,0 +1,25 @@
+use bytes::BytesMut;
+use std::io;
+
+/// Trait of helper objects to write out messages as bytes, for use with
+/// `FramedWrite`.
+
+// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and
+// there doesn't seem to be a way to un-deprecate the re-export.
+pub trait Encoder {
+ /// The type of items consumed by the `Encoder`
+ type Item;
+
+ /// The type of encoding errors.
+ ///
+ /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
+ /// in the interest letting it return `Error`s directly.
+ type Error: From<io::Error>;
+
+ /// Encodes a frame into the buffer provided.
+ ///
+ /// This method will encode `item` into the byte buffer provided by `dst`.
+ /// The `dst` provided is an internal buffer of the `Framed` instance and
+ /// will be written out when possible.
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
+}
diff --git a/tokio-codec/src/framed.rs b/tokio-codec/src/framed.rs
new file mode 100644
index 00000000..1929b3eb
--- /dev/null
+++ b/tokio-codec/src/framed.rs
@@ -0,0 +1,308 @@
+#![allow(deprecated)]
+
+use std::fmt;
+use std::io::{self, Read, Write};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use crate::decoder::Decoder;
+use crate::encoder::Encoder;
+use crate::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
+use crate::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
+use tokio_futures::{Sink, Stream};
+use tokio_io::{AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+
+/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
+pub struct Framed<T, U> {
+ inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
+}
+
+pub struct Fuse<T, U>(pub T, pub U);
+
+impl<T, U> Framed<T, U>
+where
+ T: AsyncRead + AsyncWrite,
+ U: Decoder + Encoder,
+{
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ pub fn new(inner: T, codec: U) -> Framed<T, U> {
+ Framed {
+ inner: framed_read2(framed_write2(Fuse(inner, codec))),
+ }
+ }
+}
+
+impl<T, U> Framed<T, U> {
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// This objects takes a stream and a readbuffer and a writebuffer. These field
+ /// can be obtained from an existing `Framed` with the `into_parts` method.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
+ Framed {
+ inner: framed_read2_with_buffer(
+ framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf),
+ parts.read_buf,
+ ),
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.get_ref().get_ref().0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.get_mut().get_mut().0
+ }
+
+ /// Returns a reference to the underlying codec wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec(&self) -> &U {
+ &self.inner.get_ref().get_ref().1
+ }
+
+ /// Returns a mutable reference to the underlying codec wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec_mut(&mut self) -> &mut U {
+ &mut self.inner.get_mut().get_mut().1
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner().into_inner().0
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream, the buffer
+ /// with unprocessed data, and the codec.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_parts(self) -> FramedParts<T, U> {
+ let (inner, read_buf) = self.inner.into_parts();
+ let (inner, write_buf) = inner.into_parts();
+
+ FramedParts {
+ io: inner.0,
+ codec: inner.1,
+ read_buf: read_buf,
+ write_buf: write_buf,
+ _priv: (),
+ }
+ }
+}
+
+impl<T, U> Stream for Framed<T, U>
+where
+ T: AsyncRead + Unpin,
+ U: Decoder + Unpin,
+{
+ type Item = Result<U::Item, U::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ pin!(self.get_mut().inner).poll_next(cx)
+ }
+}
+
+impl<T, I, U> Sink<I> for Framed<T, U>
+where
+ T: AsyncWrite + Unpin,
+ U: Encoder<Item = I> + Unpin,
+ U::Error: From<io::Error>,
+{
+ type Error = U::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Pin::new(Pin::get_mut(self).inner.get_mut()).poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ Pin::new(Pin::get_mut(self).inner.get_mut()).start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Pin::new(Pin::get_mut(self).inner.get_mut()).poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Pin::new(Pin::get_mut(self).inner.get_mut()).poll_close(cx)
+ }
+}
+
+impl<T, U> fmt::Debug for Framed<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Framed")
+ .field("io", &self.inner.get_ref().get_ref().0)
+ .field("codec", &self.inner.get_ref().get_ref().1)
+ .finish()
+ }
+}
+
+// ===== impl Fuse =====
+
+impl<T: Read, U> Read for Fuse<T, U> {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ self.0.read(dst)
+ }
+}
+
+impl<T: AsyncRead + Unpin, U: Unpin> AsyncRead for Fuse<T, U> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.0.prepare_uninitialized_buffer(buf)
+ }
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ pin!(self.get_mut().0).poll_read(cx, buf)
+ }
+}
+
+impl<T: Write, U> Write for Fuse<T, U> {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ self.0.write(src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+}
+
+impl<T: AsyncWrite + Unpin, U: Unpin> AsyncWrite for Fuse<T, U> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ pin!(self.get_mut().0).poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ pin!(self.get_mut().0).poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ pin!(self.get_mut().0).poll_shutdown(cx)
+ }
+}
+
+impl<T, U: Decoder> Decoder for Fuse<T, U> {
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ self.1.decode(buffer)
+ }
+
+ fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ self.1.decode_eof(buffer)
+ }
+}
+
+impl<T, U: Encoder> Encoder for Fuse<T, U> {
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+ self.1.encode(item, dst)
+ }
+}
+
+/// `FramedParts` contains an export of the data of a Framed transport.
+/// It can be used to construct a new `Framed` with a different codec.
+/// It contains all current buffers and the inner transport.
+#[derive(Debug)]
+pub struct FramedParts<T, U> {
+ /// The inner transport used to read bytes to and write bytes to
+ pub io: T,
+
+ /// The codec
+ pub codec: U,
+
+ /// The buffer with read but unprocessed data.
+ pub read_buf: BytesMut,
+
+ /// A buffer with unprocessed data which are not written yet.
+ pub write_buf: BytesMut,
+
+ /// This private field allows us to add additional fields in the future in a
+ /// backwards compatible way.
+ _priv: (),
+}
+
+impl<T, U> FramedParts<T, U> {
+ /// Create a new, default, `FramedParts`
+ pub fn new(io: T, codec: U) -> FramedParts<T, U> {
+ FramedParts {
+ io,
+ codec,
+ read_buf: BytesMut::new(),
+ write_buf: BytesMut::new(),
+ _priv: (),
+ }
+ }
+}
diff --git a/tokio-codec/src/framed_read.rs b/tokio-codec/src/framed_read.rs
new file mode 100644
index 00000000..13c47541
--- /dev/null
+++ b/tokio-codec/src/framed_read.rs
@@ -0,0 +1,225 @@
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use super::framed::Fuse;
+use super::Decoder;
+use tokio_futures::{Sink, Stream};
+use tokio_io::AsyncRead;
+
+use bytes::BytesMut;
+use log::trace;
+
+/// A `Stream` of messages decoded from an `AsyncRead`.
+pub struct FramedRead<T, D> {
+ inner: FramedRead2<Fuse<T, D>>,
+}
+
+pub struct FramedRead2<T> {
+ inner: T,
+ eof: bool,
+ is_readable: bool,
+ buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+// ===== impl FramedRead =====
+
+impl<T, D> FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ /// Creates a new `FramedRead` with the given `decoder`.
+ pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
+ FramedRead {
+ inner: framed_read2(Fuse(inner, decoder)),
+ }
+ }
+}
+
+impl<T, D> FramedRead<T, D> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner.0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner.0
+ }
+
+ /// Consumes the `FramedRead`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner.0
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn decoder(&self) -> &D {
+ &self.inner.inner.1
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn decoder_mut(&mut self) -> &mut D {
+ &mut self.inner.inner.1
+ }
+}
+
+impl<T, D> Stream for FramedRead<T, D>
+where
+ T: AsyncRead + Unpin,
+ D: Decoder + Unpin,
+{
+ type Item = Result<D::Item, D::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ pin!(self.get_mut().inner).poll_next(cx)
+ }
+}
+
+// This impl just defers to the underlying T: Sink
+impl<T, I, D> Sink<I> for FramedRead<T, D>
+where
+ T: Sink<I> + Unpin,
+ D: Unpin,
+{
+ type Error = T::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pin!(Pin::get_mut(self).inner.inner.0).poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ pin!(Pin::get_mut(self).inner.inner.0).start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pin!(Pin::get_mut(self).inner.inner.0).poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pin!(Pin::get_mut(self).inner.inner.0).poll_close(cx)
+ }
+}
+
+impl<T, D> fmt::Debug for FramedRead<T, D>
+where
+ T: fmt::Debug,
+ D: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FramedRead")
+ .field("inner", &self.inner.inner.0)
+ .field("decoder", &self.inner.inner.1)
+ .field("eof", &self.inner.eof)
+ .field("is_readable", &self.inner.is_readable)
+ .field("buffer", &self.inner.buffer)
+ .finish()
+ }
+}
+
+// ===== impl FramedRead2 =====
+
+pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
+ FramedRead2 {
+ inner: inner,
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+}
+
+pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
+ if buf.capacity() < INITIAL_CAPACITY {
+ let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+ buf.reserve(bytes_to_reserve);
+ }
+ FramedRead2 {
+ inner: inner,
+ eof: false,
+ is_readable: buf.len() > 0,
+ buffer: buf,
+ }
+}
+
+impl<T> FramedRead2<T> {
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ pub fn into_parts(self) -> (T, BytesMut) {
+ (self.inner, self.buffer)
+ }
+
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<T> Stream for FramedRead2<T>
+where
+ T: AsyncRead + Decoder + Unpin,
+{
+ type Item = Result<T::Item, T::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let pinned = Pin::get_mut(self);
+ loop {
+ // Repeatedly call `decode` or `decode_eof` as long as it is
+ // "readable". Readable is defined as not having returned `None`. If
+ // the upstream has returned EOF, and the decoder is no longer
+ // readable, it can be assumed that the decoder will never become
+ // readable again, at which point the stream is terminated.
+ if pinned.is_readable {
+ if pinned.eof {
+ let frame = pinned.inner.decode_eof(&mut pinned.buffer)?;
+ return Poll::Ready(frame.map(Ok));
+ }
+
+ trace!("attempting to decode a frame");
+
+ if let Some(frame) = pinned.inner.decode(&mut pinned.buffer)? {
+ trace!("frame decoded from buffer");
+ return Poll::Ready(Some(Ok(frame)));
+ }
+
+ pinned.is_readable = false;
+ }
+
+ assert!(!pinned.eof);
+
+ // Otherwise, try to read more data and try again. Make sure we've
+ // got room for at least one byte to read to ensure that we don't
+ // get a spurious 0 that looks like EOF
+ pinned.buffer.reserve(1);
+ let bytect = match pin!(pinned.inner).poll_read_buf(cx, &mut pinned.buffer)? {
+ Poll::Ready(ct) => ct,
+ Poll::Pending => return Poll::Pending,
+ };
+ if bytect == 0 {
+ pinned.eof = true;
+ }
+
+ pinned.is_readable = true;
+ }
+ }
+}
diff --git a/tokio-codec/src/framed_write.rs b/tokio-codec/src/framed_write.rs
new file mode 100644
index 00000000..153f5881
--- /dev/null
+++ b/tokio-codec/src/framed_write.rs
@@ -0,0 +1,271 @@
+#![allow(deprecated)]
+
+use log::trace;
+use std::fmt;
+use std::io::{self, Read};
+
+use super::framed::Fuse;
+use crate::decoder::Decoder;
+use crate::encoder::Encoder;
+use tokio_futures::{Sink, Stream};
+use tokio_io::{AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// A `Sink` of frames encoded to an `AsyncWrite`.
+pub struct FramedWrite<T, E> {
+ inner: FramedWrite2<Fuse<T, E>>,
+}
+
+pub struct FramedWrite2<T> {
+ inner: T,
+ buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
+
+impl<T, E> FramedWrite<T, E>
+where
+ T: AsyncWrite,
+ E: Encoder,
+{
+ /// Creates a new `FramedWrite` with the given `encoder`.
+ pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
+ FramedWrite {
+ inner: framed_write2(Fuse(inner, encoder)),
+ }
+ }
+}
+
+impl<T, E> FramedWrite<T, E> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner.0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner.0
+ }
+
+ /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner.0
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn encoder(&self) -> &E {
+ &self.inner.inner.1
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn encoder_mut(&mut self) -> &mut E {
+ &mut self.inner.inner.1
+ }
+}
+
+// This impl just defers to the underlying FramedWrite2
+impl<T, I, E> Sink<I> for FramedWrite<T, E>
+where
+ T: AsyncWrite + Unpin,
+ E: Encoder<Item = I> + Unpin,
+ E::Error: From<io::Error>,
+{
+ type Error = E::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pin!(Pin::get_mut(self).inner).poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ pin!(Pin::get_mut(self).inner).start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pin!(Pin::get_mut(self).inner).poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pin!(Pin::get_mut(self).inner).poll_close(cx)
+ }
+}
+
+impl<T, D> Stream for FramedWrite<T, D>
+where
+ T: Stream + Unpin,
+ D: Unpin,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Pin::new(Pin::get_mut(self).get_mut()).poll_next(cx)
+ }
+}
+
+impl<T, U> fmt::Debug for FramedWrite<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FramedWrite")
+ .field("inner", &self.inner.get_ref().0)
+ .field("encoder", &self.inner.get_ref().1)
+ .field("buffer", &self.inner.buffer)
+ .finish()
+ }
+}
+
+// ===== impl FramedWrite2 =====
+
+pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
+ FramedWrite2 {
+ inner: inner,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+}
+
+pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
+ if buf.capacity() < INITIAL_CAPACITY {
+ let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+ buf.reserve(bytes_to_reserve);
+ }
+ FramedWrite2 {
+ inner: inner,
+ buffer: buf,
+ }
+}
+
+impl<T> FramedWrite2<T> {
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ pub fn into_parts(self) -> (T, BytesMut) {
+ (self.inner, self.buffer)
+ }
+
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<I, T> Sink<I> for FramedWrite2<T>
+where
+ T: AsyncWrite + Encoder<Item = I> + Unpin,
+{
+ type Error = T::Error;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ // If the buffer is already over 8KiB, th