From d78655337a68bded305782a8a8b4ac7be42aa6a7 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 27 Oct 2020 13:42:00 -0700 Subject: Revert "util: upgrade tokio-util to bytes 0.6 (#3052)" (#3060) This reverts commit fe2b997. We are avoiding adding poll_read_buf to tokio itself for now. The patch is reverted now in order to not block the v0.3.2 release (#3059). --- tokio-util/Cargo.toml | 8 ++-- tokio-util/src/codec/framed_impl.rs | 6 ++- tokio-util/src/io/mod.rs | 4 ++ tokio-util/src/io/poll_read_buf.rs | 90 +++++++++++++++++++++++++++++++++++++ tokio-util/src/io/read_buf.rs | 65 +++++++++++++++++++++++++++ tokio-util/src/io/reader_stream.rs | 8 ++-- tokio-util/src/lib.rs | 34 ++++++++++++++ 7 files changed, 206 insertions(+), 9 deletions(-) create mode 100644 tokio-util/src/io/poll_read_buf.rs create mode 100644 tokio-util/src/io/read_buf.rs (limited to 'tokio-util') diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 3c5b1bf9..11419951 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -7,7 +7,7 @@ name = "tokio-util" # - Cargo.toml # - Update CHANGELOG.md. # - Create "v0.2.x" git tag. -version = "0.5.0" +version = "0.4.0" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" @@ -27,15 +27,15 @@ default = [] full = ["codec", "compat", "io", "time"] compat = ["futures-io",] -codec = ["tokio/io-util", "tokio/stream"] +codec = ["tokio/stream"] time = ["tokio/time","slab"] -io = ["tokio/io-util"] +io = [] rt = ["tokio/rt"] [dependencies] tokio = { version = "0.3.0", path = "../tokio" } -bytes = "0.6.0" +bytes = "0.5.0" futures-core = "0.3.0" futures-sink = "0.3.0" futures-io = { version = "0.3.0", optional = true } diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index ccb8b3c8..c161808f 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -2,7 +2,7 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite}, + io::{AsyncRead, AsyncWrite}, stream::Stream, }; @@ -118,6 +118,8 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use crate::util::poll_read_buf; + let mut pinned = self.project(); let state: &mut ReadFrame = pinned.state.borrow_mut(); loop { @@ -148,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match pinned.inner.as_mut().poll_read_buf(&mut state.buffer, cx)? { + let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 53066c4e..7cf25989 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -6,8 +6,12 @@ //! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html //! [`AsyncRead`]: tokio::io::AsyncRead +mod poll_read_buf; +mod read_buf; mod reader_stream; mod stream_reader; +pub use self::poll_read_buf::poll_read_buf; +pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; diff --git a/tokio-util/src/io/poll_read_buf.rs b/tokio-util/src/io/poll_read_buf.rs new file mode 100644 index 00000000..efce7ced --- /dev/null +++ b/tokio-util/src/io/poll_read_buf.rs @@ -0,0 +1,90 @@ +use bytes::BufMut; +use futures_core::ready; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, ReadBuf}; + +/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. +/// +/// [`Buf`]: bytes::Buf +/// +/// # Example +/// +/// ``` +/// use bytes::{Bytes, BytesMut}; +/// use tokio::stream; +/// use tokio::io::Result; +/// use tokio_util::io::{StreamReader, poll_read_buf}; +/// use futures::future::poll_fn; +/// use std::pin::Pin; +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// +/// // Create a reader from an iterator. This particular reader will always be +/// // ready. +/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); +/// +/// let mut buf = BytesMut::new(); +/// let mut reads = 0; +/// +/// loop { +/// reads += 1; +/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; +/// +/// if n == 0 { +/// break; +/// } +/// } +/// +/// // one or more reads might be necessary. +/// assert!(reads >= 1); +/// assert_eq!(&buf[..], &[0, 1, 2, 3]); +/// # Ok(()) +/// # } +/// ``` +pub fn poll_read_buf( + read: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut B, +) -> Poll> +where + R: AsyncRead, + B: BufMut, +{ + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let n = { + let mut buf = ReadBuf::uninit(buf.bytes_mut()); + let before = buf.filled().as_ptr(); + + ready!(read.poll_read(cx, &mut buf)?); + + // This prevents a malicious read implementation from swapping out the + // buffer being read, which would allow `filled` to be advanced without + // actually initializing the provided buffer. + // + // We avoid this by asserting that the `ReadBuf` instance wraps the same + // memory address both before and after the poll. Which will panic in + // case its swapped. + // + // See https://github.com/tokio-rs/tokio/issues/2827 for more info. + assert! { + std::ptr::eq(before, buf.filled().as_ptr()), + "Read buffer must not be changed during a read poll. \ + See https://github.com/tokio-rs/tokio/issues/2827 for more info." + }; + + buf.filled().len() + }; + + // Safety: This is guaranteed to be the number of initialized (and read) + // bytes due to the invariants provided by `ReadBuf::filled`. + unsafe { + buf.advance_mut(n); + } + + Poll::Ready(Ok(n)) +} diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs new file mode 100644 index 00000000..d617fa6f --- /dev/null +++ b/tokio-util/src/io/read_buf.rs @@ -0,0 +1,65 @@ +use bytes::BufMut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; + +/// Read data from an `AsyncRead` into an implementer of the [`Buf`] trait. +/// +/// [`Buf`]: bytes::Buf +/// +/// # Example +/// +/// ``` +/// use bytes::{Bytes, BytesMut}; +/// use tokio::stream; +/// use tokio::io::Result; +/// use tokio_util::io::{StreamReader, read_buf}; +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// +/// // Create a reader from an iterator. This particular reader will always be +/// // ready. +/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); +/// +/// let mut buf = BytesMut::new(); +/// let mut reads = 0; +/// +/// loop { +/// reads += 1; +/// let n = read_buf(&mut read, &mut buf).await?; +/// +/// if n == 0 { +/// break; +/// } +/// } +/// +/// // one or more reads might be necessary. +/// assert!(reads >= 1); +/// assert_eq!(&buf[..], &[0, 1, 2, 3]); +/// # Ok(()) +/// # } +/// ``` +pub async fn read_buf(read: &mut R, buf: &mut B) -> io::Result +where + R: AsyncRead + Unpin, + B: BufMut, +{ + return ReadBufFn(read, buf).await; + + struct ReadBufFn<'a, R, B>(&'a mut R, &'a mut B); + + impl<'a, R, B> Future for ReadBufFn<'a, R, B> + where + R: AsyncRead + Unpin, + B: BufMut, + { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + super::poll_read_buf(Pin::new(this.0), cx, this.1) + } + } +} diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index 49288c45..ab0c22fb 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -3,7 +3,7 @@ use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::AsyncRead; const CAPACITY: usize = 4096; @@ -70,9 +70,11 @@ impl ReaderStream { impl Stream for ReaderStream { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use crate::util::poll_read_buf; + let mut this = self.as_mut().project(); - let mut reader = match this.reader.as_pin_mut() { + let reader = match this.reader.as_pin_mut() { Some(r) => r, None => return Poll::Ready(None), }; @@ -81,7 +83,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match reader.poll_read_buf(&mut this.buf, cx) { + match poll_read_buf(cx, reader, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 1e4b9d40..10b828ef 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -57,3 +57,37 @@ pub mod either; #[cfg(feature = "time")] pub mod time; + +#[cfg(any(feature = "io", feature = "codec"))] +mod util { + use tokio::io::{AsyncRead, ReadBuf}; + + use bytes::BufMut; + use futures_core::ready; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pub(crate) fn poll_read_buf( + cx: &mut Context<'_>, + io: Pin<&mut T>, + buf: &mut impl BufMut, + ) -> Poll> { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let orig = buf.bytes_mut().as_ptr() as *const u8; + let mut b = ReadBuf::uninit(buf.bytes_mut()); + + ready!(io.poll_read(cx, &mut b))?; + let n = b.filled().len(); + + // Safety: we can assume `n` bytes were read, since they are in`filled`. + assert_eq!(orig, b.filled().as_ptr()); + unsafe { + buf.advance_mut(n); + } + Poll::Ready(Ok(n)) + } +} -- cgit v1.2.3