diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-22 10:13:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-22 10:13:49 -0700 |
commit | cfc15617a5247ea780c32c85b7134b88b6de5845 (patch) | |
tree | ef0a46c61c51505a60f386c9760acac9d1f9b7b1 /tokio-util/src/codec | |
parent | b8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff) |
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.
As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
Diffstat (limited to 'tokio-util/src/codec')
-rw-r--r-- | tokio-util/src/codec/bytes_codec.rs | 41 | ||||
-rw-r--r-- | tokio-util/src/codec/decoder.rs | 154 | ||||
-rw-r--r-- | tokio-util/src/codec/encoder.rs | 22 | ||||
-rw-r--r-- | tokio-util/src/codec/framed.rs | 327 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_read.rs | 226 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_write.rs | 288 | ||||
-rw-r--r-- | tokio-util/src/codec/length_delimited.rs | 962 | ||||
-rw-r--r-- | tokio-util/src/codec/lines_codec.rs | 224 | ||||
-rw-r--r-- | tokio-util/src/codec/macros.rs | 7 | ||||
-rw-r--r-- | tokio-util/src/codec/mod.rs | 37 |
10 files changed, 2288 insertions, 0 deletions
diff --git a/tokio-util/src/codec/bytes_codec.rs b/tokio-util/src/codec/bytes_codec.rs new file mode 100644 index 00000000..a7d424e9 --- /dev/null +++ b/tokio-util/src/codec/bytes_codec.rs @@ -0,0 +1,41 @@ +use crate::codec::decoder::Decoder; +use crate::codec::encoder::Encoder; + +use bytes::{BufMut, Bytes, BytesMut}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { + BytesCodec(()) + } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if !buf.is_empty() { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/tokio-util/src/codec/decoder.rs b/tokio-util/src/codec/decoder.rs new file mode 100644 index 00000000..720e0b6e --- /dev/null +++ b/tokio-util/src/codec/decoder.rs @@ -0,0 +1,154 @@ +use crate::codec::encoder::Encoder; +use crate::codec::Framed; + +use tokio_io::{AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use std::io; + +/// Decoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `Framed` or +/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has +/// already been buffered in `src` and decodes the data into a stream of +/// `Self::Item` frames. +/// +/// Implementations are able to track state on `self`, which enables +/// implementing stateful streaming parsers. In many cases, though, this type +/// will simply be a unit struct (e.g. `struct HttpDecoder`). +pub trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + /// + /// `From<io::Error>` is required in the interest of making `Error` suitable + /// for returning directly from a `FramedRead`, and to enable the default + /// implementation of `decode_eof` to yield an `io::Error` when the decoder + /// fails to consume all available data. + /// + /// Note that implementors of this trait can simply indicate `type Error = + /// io::Error` to use I/O errors as this type. + type Error: From<io::Error>; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `FramedRead` whenever bytes are ready to be + /// parsed. The provided buffer of bytes is what's been read so far, and + /// this instance of `Decode` can determine whether an entire frame is in + /// the buffer and is ready to be returned. + /// + /// If an entire frame is available, then this instance will remove those + /// bytes from the buffer provided and return them as a decoded + /// frame. Note that removing bytes from the provided buffer doesn't always + /// necessarily copy the bytes, so this should be an efficient operation in + /// most circumstances. + /// + /// If the bytes look valid, but a frame isn't fully available yet, then + /// `Ok(None)` is returned. This indicates to the `Framed` instance that + /// it needs to read some more bytes before calling this method again. + /// + /// Note that the bytes provided may be empty. If a previous call to + /// `decode` consumed all the bytes in the buffer then `decode` will be + /// called again until it returns `Ok(None)`, indicating that more bytes need to + /// be read. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + /// + /// # Buffer management + /// + /// Before returning from the function, implementations should ensure that + /// the buffer has appropriate capacity in anticipation of future calls to + /// `decode`. Failing to do so leads to inefficiency. + /// + /// For example, if frames have a fixed length, or if the length of the + /// current frame is known from a header, a possible buffer management + /// strategy is: + /// + /// ```no_run + /// # use std::io; + /// # + /// # use bytes::BytesMut; + /// # use tokio_util::codec::Decoder; + /// # + /// # struct MyCodec; + /// # + /// impl Decoder for MyCodec { + /// // ... + /// # type Item = BytesMut; + /// # type Error = io::Error; + /// + /// fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + /// // ... + /// + /// // Reserve enough to complete decoding of the current frame. + /// let current_frame_len: usize = 1000; // Example. + /// // And to start decoding the next frame. + /// let next_frame_header_len: usize = 10; // Example. + /// src.reserve(current_frame_len + next_frame_header_len); + /// + /// return Ok(None); + /// } + /// } + /// ``` + /// + /// An optimal buffer management strategy minimizes reallocations and + /// over-allocations. + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the underlying I/O. + /// + /// This method defaults to calling `decode` and returns an error if + /// `Ok(None)` is returned while there is unconsumed data in `buf`. + /// Typically this doesn't need to be implemented unless the framing + /// protocol differs near the end of the stream. + /// + /// Note that the `buf` argument may be empty. If a previous call to + /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be + /// called again until it returns `None`, indicating that there are no more + /// frames to yield. This behavior enables returning finalization frames + /// that may not be based on inbound data. + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into()) + } + } + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self> + where + Self: Encoder + Sized, + { + Framed::new(io, self) + } +} diff --git a/tokio-util/src/codec/encoder.rs b/tokio-util/src/codec/encoder.rs new file mode 100644 index 00000000..76fa9dba --- /dev/null +++ b/tokio-util/src/codec/encoder.rs @@ -0,0 +1,22 @@ +use bytes::BytesMut; +use std::io; + +/// Trait of helper objects to write out messages as bytes, for use with +/// `FramedWrite`. +pub trait Encoder { + /// The type of items consumed by the `Encoder` + type Item; + + /// The type of encoding errors. + /// + /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>` + /// in the interest letting it return `Error`s directly. + type Error: From<io::Error>; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `item` into the byte buffer provided by `dst`. + /// The `dst` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>; +} diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs new file mode 100644 index 00000000..e2eb82cb --- /dev/null +++ b/tokio-util/src/codec/framed.rs @@ -0,0 +1,327 @@ +use crate::codec::decoder::Decoder; +use crate::codec::encoder::Encoder; +use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; + +use tokio_io::{AsyncBufRead, AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures_core::Stream; +use futures_sink::Sink; +use std::fmt; +use std::io::{self, BufRead, Read, Write}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +pub struct Framed<T, U> { + inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, +} + +pub(crate) struct Fuse<T, U>(pub(crate) T, pub(crate) U); + +impl<T, U> Framed<T, U> +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn new(inner: T, codec: U) -> Framed<T, U> { + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } + } +} + +impl<T, U> Framed<T, U> { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { + Framed { + inner: framed_read2_with_buffer( + framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), + parts.read_buf, + ), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Returns a reference to the underlying codec wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec(&self) -> &U { + &self.inner.get_ref().get_ref().1 + } + + /// Returns a mutable reference to the underlying codec wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec_mut(&mut self) -> &mut U { + &mut self.inner.get_mut().get_mut().1 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream, the buffer + /// with unprocessed data, and the codec. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts<T, U> { + let (inner, read_buf) = self.inner.into_parts(); + let (inner, write_buf) = inner.into_parts(); + + FramedParts { + io: inner.0, + codec: inner.1, + read_buf, + write_buf, + _priv: (), + } + } +} + +impl<T, U> Stream for Framed<T, U> +where + T: AsyncRead + Unpin, + U: Decoder + Unpin, +{ + type Item = Result<U::Item, U::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + pin!(self.get_mut().inner).poll_next(cx) + } +} + +impl<T, I, U> Sink<I> for Framed<T, U> +where + T: AsyncWrite + Unpin, + U: Encoder<Item = I> + Unpin, + U::Error: From<io::Error>, +{ + type Error = U::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Pin::new(Pin::get_mut(self).inner.get_mut()).poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + Pin::new(Pin::get_mut(self).inner.get_mut()).start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Pin::new(Pin::get_mut(self).inner.get_mut()).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Pin::new(Pin::get_mut(self).inner.get_mut()).poll_close(cx) + } +} + +impl<T, U> fmt::Debug for Framed<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Framed") + .field("io", &self.inner.get_ref().get_ref().0) + .field("codec", &self.inner.get_ref().get_ref().1) + .finish() + } +} + +// ===== impl Fuse ===== + +impl<T: Read, U> Read for Fuse<T, U> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.0.read(dst) + } +} + +impl<T: BufRead, U> BufRead for Fuse<T, U> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.0.fill_buf() + } + + fn consume(&mut self, amt: usize) { + self.0.consume(amt) + } +} + +impl<T: AsyncRead + Unpin, U: Unpin> AsyncRead for Fuse<T, U> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<Result<usize, io::Error>> { + pin!(self.get_mut().0).poll_read(cx, buf) + } +} + +impl<T: AsyncBufRead + Unpin, U: Unpin> AsyncBufRead for Fuse<T, U> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + pin!(self.get_mut().0).poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + pin!(self.get_mut().0).consume(amt) + } +} + +impl<T: Write, U> Write for Fuse<T, U> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.0.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl<T: AsyncWrite + Unpin, U: Unpin> AsyncWrite for Fuse<T, U> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, io::Error>> { + pin!(self.get_mut().0).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + pin!(self.get_mut().0).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + pin!(self.get_mut().0).poll_shutdown(cx) + } +} + +impl<T, U: Decoder> Decoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode(buffer) + } + + fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode_eof(buffer) + } +} + +impl<T, U: Encoder> Encoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.1.encode(item, dst) + } +} + +/// `FramedParts` contains an export of the data of a Framed transport. +/// It can be used to construct a new `Framed` with a different codec. +/// It contains all current buffers and the inner transport. +#[derive(Debug)] +pub struct FramedParts<T, U> { + /// The inner transport used to read bytes to and write bytes to + pub io: T, + + /// The codec + pub codec: U, + + /// The buffer with read but unprocessed data. + pub read_buf: BytesMut, + + /// A buffer with unprocessed data which are not written yet. + pub write_buf: BytesMut, + + /// This private field allows us to add additional fields in the future in a + /// backwards compatible way. + _priv: (), +} + +impl<T, U> FramedParts<T, U> { + /// Create a new, default, `FramedParts` + pub fn new(io: T, codec: U) -> FramedParts<T, U> { + FramedParts { + io, + codec, + read_buf: BytesMut::new(), + write_buf: BytesMut::new(), + _priv: (), + } + } +} diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs new file mode 100644 index 00000000..71f22150 --- /dev/null +++ b/tokio-util/src/codec/framed_read.rs @@ -0,0 +1,226 @@ +use crate::codec::framed::Fuse; +use crate::codec::Decoder; + +use tokio_io::AsyncRead; + +use bytes::BytesMut; +use futures_core::Stream; +use futures_sink::Sink; +use log::trace; +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A `Stream` of messages decoded from an `AsyncRead`. +pub struct FramedRead<T, D> { + inner: FramedRead2<Fuse<T, D>>, +} + +pub(crate) struct FramedRead2<T> { + inner: T, + eof: bool, + is_readable: bool, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; + +// ===== impl FramedRead ===== + +impl<T, D> FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + /// Creates a new `FramedRead` with the given `decoder`. + pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { + FramedRead { + inner: framed_read2(Fuse(inner, decoder)), + } + } +} + +impl<T, D> FramedRead<T, D> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn decoder(&self) -> &D { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn decoder_mut(&mut self) -> &mut D { + &mut self.inner.inner.1 + } +} + +impl<T, D> Stream for FramedRead<T, D> +where + T: AsyncRead + Unpin, + D: Decoder + Unpin, +{ + type Item = Result<D::Item, D::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + pin!(self.get_mut().inner).poll_next(cx) + } +} + +// This impl just defers to the underlying T: Sink +impl<T, I, D> Sink<I> for FramedRead<T, D> +where + T: Sink<I> + Unpin, + D: Unpin, +{ + type Error = T::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + pin!(Pin::get_mut(self).inner.inner.0).poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + pin!(Pin::get_mut(self).inner.inner.0).start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + pin!(Pin::get_mut(self).inner.inner.0).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + pin!(Pin::get_mut(self).inner.inner.0).poll_close(cx) + } +} + +impl<T, D> fmt::Debug for FramedRead<T, D> +where + T: fmt::Debug, + D: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FramedRead") + .field("inner", &self.inner.inner.0) + .field("decoder", &self.inner.inner.1) + .field("eof", &self.inner.eof) + .field("is_readable", &self.inner.is_readable) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedRead2 ===== + +pub(crate) fn framed_read2<T>(inner: T) -> FramedRead2<T> { + FramedRead2 { + inner, + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub(crate) fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedRead2 { + inner, + eof: false, + is_readable: !buf.is_empty(), + buffer: buf, + } +} + +impl<T> FramedRead2<T> { + pub(crate) fn get_ref(&self) -> &T { + &self.inner + } + + pub(crate) fn into_inner(self) -> T { + self.inner + } + + pub(crate) fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub(crate) fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Stream for FramedRead2<T> +where + T: AsyncRead + Decoder + Unpin, +{ + type Item = Result<T::Item, T::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let pinned = Pin::get_mut(self); + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if pinned.is_readable { + if pinned.eof { + let frame = pinned.inner.decode_eof(&mut pinned.buffer)?; + return Poll::Ready(frame.map(Ok)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = pinned.inner.decode(&mut pinned.buffer)? { + trace!("frame decoded from buffer"); + return Poll::Ready(Some(Ok(frame))); + } + + pinned.is_readable = false; + } + + assert!(!pinned.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + pinned.buffer.reserve(1); + let bytect = match pin!(pinned.inner).poll_read_buf(cx, &mut pinned.buffer)? { + Poll::Ready(ct) => ct, + Poll::Pending => return Poll::Pending, + }; + if bytect == 0 { + pinned.eof = true; + } + + pinned.is_readable = true; + } + } +} diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs new file mode 100644 index 00000000..fa8aba17 --- /dev/null +++ b/tokio-util/src/codec/framed_write.rs @@ -0,0 +1,288 @@ +use crate::codec::decoder::Decoder; +use crate::codec::encoder::Encoder; +use crate::codec::framed::Fuse; + +use tokio_io::{AsyncBufRead, AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures_core::{ready, Stream}; +use futures_sink::Sink; +use log::trace; +use std::fmt; +use std::io::{self, BufRead, Read}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A `Sink` of frames encoded to an `AsyncWrite`. +pub struct FramedWrite<T, E> { + inner: FramedWrite2<Fuse<T, E>>, +} + +pub(crate) struct FramedWrite2<T> { + inner: T, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +impl<T, E> FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + /// Creates a new `FramedWrite` with the given `encoder`. + pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { + FramedWrite { + inner: framed_write2(Fuse(inner, encoder)), + } + } +} + +impl<T, E> FramedWrite<T, E> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn encoder(&self) -> &E { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn encoder_mut(&mut self) -> &mut E { + &mut self.inner.inner.1 + } +} + +// This impl just defers to the underlying FramedWrite2 +impl<T, I, E> Sink<I> for FramedWrite<T, E> +where + T: AsyncWrite + Unpin, + E: Encoder<Item = I> + Unpin, + E::Error: From<io::Error>, +{ + type Error = E::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + pin!(Pin::get_mut(self).inner).poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + pin!(Pin::get_mut(self).inner).start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + pin!(Pin::get_mut(self).inner).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + pin!(Pin::get_mut(self).inner).poll_close(cx) + } +} + +impl<T, D> Stream for FramedWrite<T, D> +where + T: Stream + Unpin, + D: Unpin, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Pin::new(Pin::get_mut(self).get_mut()).poll_next(cx) + } +} + +impl<T, U> fmt::Debug for FramedWrite<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner.get_ref().0) + .field("encoder", &self.inner.get_ref().1) + .field("buffer", &self.inner.buffer) + .finish() + } +} + |