From fe2b9976755407b85c82b5cdee9d8c5e16e3d6c6 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Tue, 27 Oct 2020 09:30:29 +0100 Subject: util: upgrade tokio-util to bytes 0.6 (#3052) --- tokio-util/src/codec/framed_impl.rs | 6 +-- tokio-util/src/io/mod.rs | 4 -- tokio-util/src/io/poll_read_buf.rs | 90 ------------------------------------- tokio-util/src/io/read_buf.rs | 65 --------------------------- tokio-util/src/io/reader_stream.rs | 8 ++-- tokio-util/src/lib.rs | 34 -------------- 6 files changed, 5 insertions(+), 202 deletions(-) delete mode 100644 tokio-util/src/io/poll_read_buf.rs delete mode 100644 tokio-util/src/io/read_buf.rs (limited to 'tokio-util/src') diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index c161808f..ccb8b3c8 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -2,7 +2,7 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; use tokio::{ - io::{AsyncRead, AsyncWrite}, + io::{AsyncRead, AsyncReadExt, AsyncWrite}, stream::Stream, }; @@ -118,8 +118,6 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use crate::util::poll_read_buf; - let mut pinned = self.project(); let state: &mut ReadFrame = pinned.state.borrow_mut(); loop { @@ -150,7 +148,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { + let bytect = match pinned.inner.as_mut().poll_read_buf(&mut state.buffer, cx)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 7cf25989..53066c4e 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -6,12 +6,8 @@ //! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html //! [`AsyncRead`]: tokio::io::AsyncRead -mod poll_read_buf; -mod read_buf; mod reader_stream; mod stream_reader; -pub use self::poll_read_buf::poll_read_buf; -pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; diff --git a/tokio-util/src/io/poll_read_buf.rs b/tokio-util/src/io/poll_read_buf.rs deleted file mode 100644 index efce7ced..00000000 --- a/tokio-util/src/io/poll_read_buf.rs +++ /dev/null @@ -1,90 +0,0 @@ -use bytes::BufMut; -use futures_core::ready; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, ReadBuf}; - -/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. -/// -/// [`Buf`]: bytes::Buf -/// -/// # Example -/// -/// ``` -/// use bytes::{Bytes, BytesMut}; -/// use tokio::stream; -/// use tokio::io::Result; -/// use tokio_util::io::{StreamReader, poll_read_buf}; -/// use futures::future::poll_fn; -/// use std::pin::Pin; -/// # #[tokio::main] -/// # async fn main() -> std::io::Result<()> { -/// -/// // Create a reader from an iterator. This particular reader will always be -/// // ready. -/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); -/// -/// let mut buf = BytesMut::new(); -/// let mut reads = 0; -/// -/// loop { -/// reads += 1; -/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; -/// -/// if n == 0 { -/// break; -/// } -/// } -/// -/// // one or more reads might be necessary. -/// assert!(reads >= 1); -/// assert_eq!(&buf[..], &[0, 1, 2, 3]); -/// # Ok(()) -/// # } -/// ``` -pub fn poll_read_buf( - read: Pin<&mut R>, - cx: &mut Context<'_>, - buf: &mut B, -) -> Poll> -where - R: AsyncRead, - B: BufMut, -{ - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - let n = { - let mut buf = ReadBuf::uninit(buf.bytes_mut()); - let before = buf.filled().as_ptr(); - - ready!(read.poll_read(cx, &mut buf)?); - - // This prevents a malicious read implementation from swapping out the - // buffer being read, which would allow `filled` to be advanced without - // actually initializing the provided buffer. - // - // We avoid this by asserting that the `ReadBuf` instance wraps the same - // memory address both before and after the poll. Which will panic in - // case its swapped. - // - // See https://github.com/tokio-rs/tokio/issues/2827 for more info. - assert! { - std::ptr::eq(before, buf.filled().as_ptr()), - "Read buffer must not be changed during a read poll. \ - See https://github.com/tokio-rs/tokio/issues/2827 for more info." - }; - - buf.filled().len() - }; - - // Safety: This is guaranteed to be the number of initialized (and read) - // bytes due to the invariants provided by `ReadBuf::filled`. - unsafe { - buf.advance_mut(n); - } - - Poll::Ready(Ok(n)) -} diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs deleted file mode 100644 index d617fa6f..00000000 --- a/tokio-util/src/io/read_buf.rs +++ /dev/null @@ -1,65 +0,0 @@ -use bytes::BufMut; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::AsyncRead; - -/// Read data from an `AsyncRead` into an implementer of the [`Buf`] trait. -/// -/// [`Buf`]: bytes::Buf -/// -/// # Example -/// -/// ``` -/// use bytes::{Bytes, BytesMut}; -/// use tokio::stream; -/// use tokio::io::Result; -/// use tokio_util::io::{StreamReader, read_buf}; -/// # #[tokio::main] -/// # async fn main() -> std::io::Result<()> { -/// -/// // Create a reader from an iterator. This particular reader will always be -/// // ready. -/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); -/// -/// let mut buf = BytesMut::new(); -/// let mut reads = 0; -/// -/// loop { -/// reads += 1; -/// let n = read_buf(&mut read, &mut buf).await?; -/// -/// if n == 0 { -/// break; -/// } -/// } -/// -/// // one or more reads might be necessary. -/// assert!(reads >= 1); -/// assert_eq!(&buf[..], &[0, 1, 2, 3]); -/// # Ok(()) -/// # } -/// ``` -pub async fn read_buf(read: &mut R, buf: &mut B) -> io::Result -where - R: AsyncRead + Unpin, - B: BufMut, -{ - return ReadBufFn(read, buf).await; - - struct ReadBufFn<'a, R, B>(&'a mut R, &'a mut B); - - impl<'a, R, B> Future for ReadBufFn<'a, R, B> - where - R: AsyncRead + Unpin, - B: BufMut, - { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - super::poll_read_buf(Pin::new(this.0), cx, this.1) - } - } -} diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index ab0c22fb..49288c45 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -3,7 +3,7 @@ use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, AsyncReadExt}; const CAPACITY: usize = 4096; @@ -70,11 +70,9 @@ impl ReaderStream { impl Stream for ReaderStream { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use crate::util::poll_read_buf; - let mut this = self.as_mut().project(); - let reader = match this.reader.as_pin_mut() { + let mut reader = match this.reader.as_pin_mut() { Some(r) => r, None => return Poll::Ready(None), }; @@ -83,7 +81,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match poll_read_buf(cx, reader, &mut this.buf) { + match reader.poll_read_buf(&mut this.buf, cx) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 10b828ef..1e4b9d40 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -57,37 +57,3 @@ pub mod either; #[cfg(feature = "time")] pub mod time; - -#[cfg(any(feature = "io", feature = "codec"))] -mod util { - use tokio::io::{AsyncRead, ReadBuf}; - - use bytes::BufMut; - use futures_core::ready; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; - - pub(crate) fn poll_read_buf( - cx: &mut Context<'_>, - io: Pin<&mut T>, - buf: &mut impl BufMut, - ) -> Poll> { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - let orig = buf.bytes_mut().as_ptr() as *const u8; - let mut b = ReadBuf::uninit(buf.bytes_mut()); - - ready!(io.poll_read(cx, &mut b))?; - let n = b.filled().len(); - - // Safety: we can assume `n` bytes were read, since they are in`filled`. - assert_eq!(orig, b.filled().as_ptr()); - unsafe { - buf.advance_mut(n); - } - Poll::Ready(Ok(n)) - } -} -- cgit v1.2.3