diff options
-rw-r--r-- | tokio/src/io/async_read_ext.rs | 9 | ||||
-rw-r--r-- | tokio/src/io/mod.rs | 1 | ||||
-rw-r--r-- | tokio/src/io/read_to_end.rs | 99 | ||||
-rw-r--r-- | tokio/tests/io_read_to_end.rs | 40 |
4 files changed, 149 insertions, 0 deletions
diff --git a/tokio/src/io/async_read_ext.rs b/tokio/src/io/async_read_ext.rs index 62b239a8..2fc2cbb4 100644 --- a/tokio/src/io/async_read_ext.rs +++ b/tokio/src/io/async_read_ext.rs @@ -1,6 +1,7 @@ use crate::io::copy::{copy, Copy}; use crate::io::read::{read, Read}; use crate::io::read_exact::{read_exact, ReadExact}; +use crate::io::read_to_end::{read_to_end, ReadToEnd}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -60,6 +61,14 @@ pub trait AsyncReadExt: AsyncRead { { read_exact(self, dst) } + + /// Read all bytes until EOF in this source, placing them into `dst`. + fn read_to_end<'a>(&'a mut self, dst: &'a mut Vec<u8>) -> ReadToEnd<'a, Self> + where + Self: Unpin, + { + read_to_end(self, dst) + } } impl<R: AsyncRead + ?Sized> AsyncReadExt for R {} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 48dbdf16..e66dc181 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -41,6 +41,7 @@ mod async_write_ext; mod copy; mod read; mod read_exact; +mod read_to_end; mod write; mod write_all; diff --git a/tokio/src/io/read_to_end.rs b/tokio/src/io/read_to_end.rs new file mode 100644 index 00000000..2433e6ee --- /dev/null +++ b/tokio/src/io/read_to_end.rs @@ -0,0 +1,99 @@ +use tokio_io::AsyncRead; + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadToEnd<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + start_len: usize, +} + +impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {} + +pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R> +where + R: AsyncRead + Unpin + ?Sized, +{ + let start_len = buf.len(); + ReadToEnd { + reader, + buf, + start_len, + } +} + +struct Guard<'a> { + buf: &'a mut Vec<u8>, + len: usize, +} + +impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { + self.buf.set_len(self.len); + } + } +} + +// This uses an adaptive system to extend the vector when it fills. We want to +// avoid paying to allocate and zero a huge chunk of memory if the reader only +// has 4 bytes while still making large reads if the reader does have a ton +// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every +// time is 4,500 times (!) slower than this if the reader has a very small +// amount of data to return. +// +// Because we're extending the buffer with uninitialized data for trusted +// readers, we need to make sure to truncate that if any of this panics. +pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec<u8>, + start_len: usize, +) -> Poll<io::Result<usize>> { + let mut g = Guard { + len: buf.len(), + buf, + }; + let ret; + loop { + if g.len == g.buf.len() { + unsafe { + g.buf.reserve(32); + let capacity = g.buf.capacity(); + g.buf.set_len(capacity); + rd.prepare_uninitialized_buffer(&mut g.buf[g.len..]); + } + } + + match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { + Ok(0) => { + ret = Poll::Ready(Ok(g.len - start_len)); + break; + } + Ok(n) => g.len += n, + Err(e) => { + ret = Poll::Ready(Err(e)); + break; + } + } + } + + ret +} + +impl<A> Future for ReadToEnd<'_, A> +where + A: AsyncRead + ?Sized + Unpin, +{ + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len) + } +} diff --git a/tokio/tests/io_read_to_end.rs b/tokio/tests/io_read_to_end.rs new file mode 100644 index 00000000..ac089625 --- /dev/null +++ b/tokio/tests/io_read_to_end.rs @@ -0,0 +1,40 @@ +#![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] + +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_test::assert_ok; + +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{cmp, io}; + +#[tokio::test] +async fn read_to_end() { + struct Rd { + val: &'static [u8], + } + + impl AsyncRead for Rd { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + let me = &mut *self; + let len = cmp::min(buf.len(), me.val.len()); + + buf[..len].copy_from_slice(&me.val[..len]); + me.val = &me.val[len..]; + Poll::Ready(Ok(len)) + } + } + + let mut buf = vec![]; + let mut rd = Rd { + val: b"hello world", + }; + + let n = assert_ok!(rd.read_to_end(&mut buf).await); + assert_eq!(n, 11); + assert_eq!(buf[..], b"hello world"[..]); +} |