summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/io/async_read_ext.rs9
-rw-r--r--tokio/src/io/mod.rs1
-rw-r--r--tokio/src/io/read_to_end.rs99
-rw-r--r--tokio/tests/io_read_to_end.rs40
4 files changed, 149 insertions, 0 deletions
diff --git a/tokio/src/io/async_read_ext.rs b/tokio/src/io/async_read_ext.rs
index 62b239a8..2fc2cbb4 100644
--- a/tokio/src/io/async_read_ext.rs
+++ b/tokio/src/io/async_read_ext.rs
@@ -1,6 +1,7 @@
use crate::io::copy::{copy, Copy};
use crate::io::read::{read, Read};
use crate::io::read_exact::{read_exact, ReadExact};
+use crate::io::read_to_end::{read_to_end, ReadToEnd};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -60,6 +61,14 @@ pub trait AsyncReadExt: AsyncRead {
{
read_exact(self, dst)
}
+
+ /// Read all bytes until EOF in this source, placing them into `dst`.
+ fn read_to_end<'a>(&'a mut self, dst: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
+ where
+ Self: Unpin,
+ {
+ read_to_end(self, dst)
+ }
}
impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index 48dbdf16..e66dc181 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -41,6 +41,7 @@ mod async_write_ext;
mod copy;
mod read;
mod read_exact;
+mod read_to_end;
mod write;
mod write_all;
diff --git a/tokio/src/io/read_to_end.rs b/tokio/src/io/read_to_end.rs
new file mode 100644
index 00000000..2433e6ee
--- /dev/null
+++ b/tokio/src/io/read_to_end.rs
@@ -0,0 +1,99 @@
+use tokio_io::AsyncRead;
+
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadToEnd<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ start_len: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
+
+pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
+where
+ R: AsyncRead + Unpin + ?Sized,
+{
+ let start_len = buf.len();
+ ReadToEnd {
+ reader,
+ buf,
+ start_len,
+ }
+}
+
+struct Guard<'a> {
+ buf: &'a mut Vec<u8>,
+ len: usize,
+}
+
+impl Drop for Guard<'_> {
+ fn drop(&mut self) {
+ unsafe {
+ self.buf.set_len(self.len);
+ }
+ }
+}
+
+// This uses an adaptive system to extend the vector when it fills. We want to
+// avoid paying to allocate and zero a huge chunk of memory if the reader only
+// has 4 bytes while still making large reads if the reader does have a ton
+// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
+// time is 4,500 times (!) slower than this if the reader has a very small
+// amount of data to return.
+//
+// Because we're extending the buffer with uninitialized data for trusted
+// readers, we need to make sure to truncate that if any of this panics.
+pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
+ mut rd: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut Vec<u8>,
+ start_len: usize,
+) -> Poll<io::Result<usize>> {
+ let mut g = Guard {
+ len: buf.len(),
+ buf,
+ };
+ let ret;
+ loop {
+ if g.len == g.buf.len() {
+ unsafe {
+ g.buf.reserve(32);
+ let capacity = g.buf.capacity();
+ g.buf.set_len(capacity);
+ rd.prepare_uninitialized_buffer(&mut g.buf[g.len..]);
+ }
+ }
+
+ match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
+ Ok(0) => {
+ ret = Poll::Ready(Ok(g.len - start_len));
+ break;
+ }
+ Ok(n) => g.len += n,
+ Err(e) => {
+ ret = Poll::Ready(Err(e));
+ break;
+ }
+ }
+ }
+
+ ret
+}
+
+impl<A> Future for ReadToEnd<'_, A>
+where
+ A: AsyncRead + ?Sized + Unpin,
+{
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
+ }
+}
diff --git a/tokio/tests/io_read_to_end.rs b/tokio/tests/io_read_to_end.rs
new file mode 100644
index 00000000..ac089625
--- /dev/null
+++ b/tokio/tests/io_read_to_end.rs
@@ -0,0 +1,40 @@
+#![deny(warnings, rust_2018_idioms)]
+#![feature(async_await)]
+
+use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio_test::assert_ok;
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{cmp, io};
+
+#[tokio::test]
+async fn read_to_end() {
+ struct Rd {
+ val: &'static [u8],
+ }
+
+ impl AsyncRead for Rd {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ let me = &mut *self;
+ let len = cmp::min(buf.len(), me.val.len());
+
+ buf[..len].copy_from_slice(&me.val[..len]);
+ me.val = &me.val[len..];
+ Poll::Ready(Ok(len))
+ }
+ }
+
+ let mut buf = vec![];
+ let mut rd = Rd {
+ val: b"hello world",
+ };
+
+ let n = assert_ok!(rd.read_to_end(&mut buf).await);
+ assert_eq!(n, 11);
+ assert_eq!(buf[..], b"hello world"[..]);
+}