summaryrefslogtreecommitdiffstats
path: root/tokio-util
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:03 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:03 -0700
commit4186b0aa38abbec7670d53882d5cdfd4b12add5c (patch)
treeb067117fcb1a4c479cd274465bcac0431c2e59f7 /tokio-util
parent760ae89401d9addb71ebf19674980577b5501edd (diff)
io: remove poll_{read,write}_buf from traits (#2882)
These functions have object safety issues. It also has been decided to avoid vectored operations on the I/O traits. A later PR will bring back vectored operations on specific types that support them. Refs: #2879, #2716
Diffstat (limited to 'tokio-util')
-rw-r--r--tokio-util/src/codec/framed_impl.rs4
-rw-r--r--tokio-util/src/io/reader_stream.rs4
-rw-r--r--tokio-util/src/io/stream_reader.rs25
-rw-r--r--tokio-util/src/lib.rs34
4 files changed, 41 insertions, 26 deletions
diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs
index eb2e0d38..c161808f 100644
--- a/tokio-util/src/codec/framed_impl.rs
+++ b/tokio-util/src/codec/framed_impl.rs
@@ -118,6 +118,8 @@ where
type Item = Result<U::Item, U::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ use crate::util::poll_read_buf;
+
let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
@@ -148,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
- let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
+ let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs
index bde7ccee..ab0c22fb 100644
--- a/tokio-util/src/io/reader_stream.rs
+++ b/tokio-util/src/io/reader_stream.rs
@@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ use crate::util::poll_read_buf;
+
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
@@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}
- match reader.poll_read_buf(cx, &mut this.buf) {
+ match poll_read_buf(cx, reader, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
diff --git a/tokio-util/src/io/stream_reader.rs b/tokio-util/src/io/stream_reader.rs
index 5c3ab019..def843b1 100644
--- a/tokio-util/src/io/stream_reader.rs
+++ b/tokio-util/src/io/stream_reader.rs
@@ -1,4 +1,4 @@
-use bytes::{Buf, BufMut};
+use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
@@ -119,29 +119,6 @@ where
self.consume(len);
Poll::Ready(Ok(()))
}
- fn poll_read_buf<BM: BufMut>(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut BM,
- ) -> Poll<io::Result<usize>>
- where
- Self: Sized,
- {
- if !buf.has_remaining_mut() {
- return Poll::Ready(Ok(0));
- }
-
- let inner_buf = match self.as_mut().poll_fill_buf(cx) {
- Poll::Ready(Ok(buf)) => buf,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
- buf.put_slice(&inner_buf[..len]);
-
- self.consume(len);
- Poll::Ready(Ok(len))
- }
}
impl<S, B, E> AsyncBufRead for StreamReader<S, B>
diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs
index b96d9044..eb35345e 100644
--- a/tokio-util/src/lib.rs
+++ b/tokio-util/src/lib.rs
@@ -52,3 +52,37 @@ pub mod context;
pub mod sync;
pub mod either;
+
+#[cfg(any(feature = "io", feature = "codec"))]
+mod util {
+ use tokio::io::{AsyncRead, ReadBuf};
+
+ use bytes::BufMut;
+ use futures_core::ready;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ pub(crate) fn poll_read_buf<T: AsyncRead>(
+ cx: &mut Context<'_>,
+ io: Pin<&mut T>,
+ buf: &mut impl BufMut,
+ ) -> Poll<io::Result<usize>> {
+ if !buf.has_remaining_mut() {
+ return Poll::Ready(Ok(0));
+ }
+
+ let orig = buf.bytes_mut().as_ptr() as *const u8;
+ let mut b = ReadBuf::uninit(buf.bytes_mut());
+
+ ready!(io.poll_read(cx, &mut b))?;
+ let n = b.filled().len();
+
+ // Safety: we can assume `n` bytes were read, since they are in`filled`.
+ assert_eq!(orig, b.filled().as_ptr());
+ unsafe {
+ buf.advance_mut(n);
+ }
+ Poll::Ready(Ok(n))
+ }
+}