From 4186b0aa38abbec7670d53882d5cdfd4b12add5c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 24 Sep 2020 17:26:03 -0700 Subject: io: remove poll_{read,write}_buf from traits (#2882) These functions have object safety issues. It also has been decided to avoid vectored operations on the I/O traits. A later PR will bring back vectored operations on specific types that support them. Refs: #2879, #2716 --- tokio-util/src/codec/framed_impl.rs | 4 +++- tokio-util/src/io/reader_stream.rs | 4 +++- tokio-util/src/io/stream_reader.rs | 25 +------------------------ tokio-util/src/lib.rs | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 41 insertions(+), 26 deletions(-) (limited to 'tokio-util') diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index eb2e0d38..c161808f 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -118,6 +118,8 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use crate::util::poll_read_buf; + let mut pinned = self.project(); let state: &mut ReadFrame = pinned.state.borrow_mut(); loop { @@ -148,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? { + let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index bde7ccee..ab0c22fb 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -70,6 +70,8 @@ impl ReaderStream { impl Stream for ReaderStream { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use crate::util::poll_read_buf; + let mut this = self.as_mut().project(); let reader = match this.reader.as_pin_mut() { @@ -81,7 +83,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match reader.poll_read_buf(cx, &mut this.buf) { + match poll_read_buf(cx, reader, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/io/stream_reader.rs b/tokio-util/src/io/stream_reader.rs index 5c3ab019..def843b1 100644 --- a/tokio-util/src/io/stream_reader.rs +++ b/tokio-util/src/io/stream_reader.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, BufMut}; +use bytes::Buf; use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::io; @@ -119,29 +119,6 @@ where self.consume(len); Poll::Ready(Ok(())) } - fn poll_read_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut BM, - ) -> Poll> - where - Self: Sized, - { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - let inner_buf = match self.as_mut().poll_fill_buf(cx) { - Poll::Ready(Ok(buf)) => buf, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; - let len = std::cmp::min(inner_buf.len(), buf.remaining_mut()); - buf.put_slice(&inner_buf[..len]); - - self.consume(len); - Poll::Ready(Ok(len)) - } } impl AsyncBufRead for StreamReader diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index b96d9044..eb35345e 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -52,3 +52,37 @@ pub mod context; pub mod sync; pub mod either; + +#[cfg(any(feature = "io", feature = "codec"))] +mod util { + use tokio::io::{AsyncRead, ReadBuf}; + + use bytes::BufMut; + use futures_core::ready; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pub(crate) fn poll_read_buf( + cx: &mut Context<'_>, + io: Pin<&mut T>, + buf: &mut impl BufMut, + ) -> Poll> { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let orig = buf.bytes_mut().as_ptr() as *const u8; + let mut b = ReadBuf::uninit(buf.bytes_mut()); + + ready!(io.poll_read(cx, &mut b))?; + let n = b.filled().len(); + + // Safety: we can assume `n` bytes were read, since they are in`filled`. + assert_eq!(orig, b.filled().as_ptr()); + unsafe { + buf.advance_mut(n); + } + Poll::Ready(Ok(n)) + } +} -- cgit v1.2.3