summaryrefslogtreecommitdiffstats
path: root/tokio/src/io
diff options
context:
space:
mode:
authorAlice Ryhl <alice@ryhl.io>2020-07-29 00:45:02 +0200
committerGitHub <noreply@github.com>2020-07-28 15:45:02 -0700
commit03b68f4e7564a3a78fa29bacfe5e306c59145aaa (patch)
tree7d9f9fa8452d03c3a36969eb853ca7569c9bdc03 /tokio/src/io
parent084fcd7954ef13eae4f7d344ec69e17c2a8ce06a (diff)
io: rewrite read_to_end and read_to_string (#2560)
The new implementation changes the behavior such that set_len is called after poll_read. The motivation of this change is that it makes it much more obvious that a rouge panic won't give the caller access to a vector containing exposed uninitialized memory. The new implementation also makes sure to not zero memory twice. Additionally, it makes the various implementations more consistent with each other regarding the naming of variables, and whether we store how many bytes we have read, or how many were in the container originally. Fixes: #2544
Diffstat (limited to 'tokio/src/io')
-rw-r--r--tokio/src/io/util/read_line.rs50
-rw-r--r--tokio/src/io/util/read_to_end.rs171
-rw-r--r--tokio/src/io/util/read_to_string.rs77
3 files changed, 189 insertions, 109 deletions
diff --git a/tokio/src/io/util/read_line.rs b/tokio/src/io/util/read_line.rs
index d625a76b..d1f66f38 100644
--- a/tokio/src/io/util/read_line.rs
+++ b/tokio/src/io/util/read_line.rs
@@ -5,6 +5,7 @@ use std::future::Future;
use std::io;
use std::mem;
use std::pin::Pin;
+use std::string::FromUtf8Error;
use std::task::{Context, Poll};
cfg_io_util! {
@@ -16,7 +17,7 @@ cfg_io_util! {
/// This is the buffer we were provided. It will be replaced with an empty string
/// while reading to postpone utf-8 handling until after reading.
output: &'a mut String,
- /// The actual allocation of the string is moved into a vector instead.
+ /// The actual allocation of the string is moved into this vector instead.
buf: Vec<u8>,
/// The number of bytes appended to buf. This can be less than buf.len() if
/// the buffer was not empty when the operation was started.
@@ -42,31 +43,33 @@ fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_re
*output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
}
-pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
- reader: Pin<&mut R>,
- cx: &mut Context<'_>,
+/// This handles the various failure cases and puts the string back into `output`.
+///
+/// The `truncate_on_io_error` bool is necessary because `read_to_string` and `read_line`
+/// disagree on what should happen when an IO error occurs.
+pub(super) fn finish_string_read(
+ io_res: io::Result<usize>,
+ utf8_res: Result<String, FromUtf8Error>,
+ read: usize,
output: &mut String,
- buf: &mut Vec<u8>,
- read: &mut usize,
+ truncate_on_io_error: bool,
) -> Poll<io::Result<usize>> {
- let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
- let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
-
- // At this point both buf and output are empty. The allocation is in utf8_res.
-
- debug_assert!(buf.is_empty());
match (io_res, utf8_res) {
(Ok(num_bytes), Ok(string)) => {
- debug_assert_eq!(*read, 0);
+ debug_assert_eq!(read, 0);
*output = string;
Poll::Ready(Ok(num_bytes))
}
(Err(io_err), Ok(string)) => {
*output = string;
+ if truncate_on_io_error {
+ let original_len = output.len() - read;
+ output.truncate(original_len);
+ }
Poll::Ready(Err(io_err))
}
(Ok(num_bytes), Err(utf8_err)) => {
- debug_assert_eq!(*read, 0);
+ debug_assert_eq!(read, 0);
put_back_original_data(output, utf8_err.into_bytes(), num_bytes);
Poll::Ready(Err(io::Error::new(
@@ -75,13 +78,30 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
)))
}
(Err(io_err), Err(utf8_err)) => {
- put_back_original_data(output, utf8_err.into_bytes(), *read);
+ put_back_original_data(output, utf8_err.into_bytes(), read);
Poll::Ready(Err(io_err))
}
}
}
+pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
+ reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ output: &mut String,
+ buf: &mut Vec<u8>,
+ read: &mut usize,
+) -> Poll<io::Result<usize>> {
+ let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
+ let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
+
+ // At this point both buf and output are empty. The allocation is in utf8_res.
+
+ debug_assert!(buf.is_empty());
+ debug_assert!(output.is_empty());
+ finish_string_read(io_res, utf8_res, *read, output, false)
+}
+
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
type Output = io::Result<usize>;
diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs
index a2cd99be..29b8b811 100644
--- a/tokio/src/io/util/read_to_end.rs
+++ b/tokio/src/io/util/read_to_end.rs
@@ -2,7 +2,7 @@ use crate::io::AsyncRead;
use std::future::Future;
use std::io;
-use std::mem::MaybeUninit;
+use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -12,81 +12,136 @@ use std::task::{Context, Poll};
pub struct ReadToEnd<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
- start_len: usize,
+ /// The number of bytes appended to buf. This can be less than buf.len() if
+ /// the buffer was not empty when the operation was started.
+ read: usize,
}
-pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
+pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
- let start_len = buf.len();
+ prepare_buffer(buffer, reader);
ReadToEnd {
reader,
- buf,
- start_len,
+ buf: buffer,
+ read: 0,
}
}
-struct Guard<'a> {
- buf: &'a mut Vec<u8>,
- len: usize,
-}
-
-impl Drop for Guard<'_> {
- fn drop(&mut self) {
- unsafe {
- self.buf.set_len(self.len);
+/// # Safety
+///
+/// Before first calling this method, the unused capacity must have been
+/// prepared for use with the provided AsyncRead. This can be done using the
+/// `prepare_buffer` function later in this file.
+pub(super) unsafe fn read_to_end_internal<R: AsyncRead + ?Sized>(
+ buf: &mut Vec<u8>,
+ mut reader: Pin<&mut R>,
+ num_read: &mut usize,
+ cx: &mut Context<'_>,
+) -> Poll<io::Result<usize>> {
+ loop {
+ // safety: The caller promised to prepare the buffer.
+ let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx));
+ match ret {
+ Err(err) => return Poll::Ready(Err(err)),
+ Ok(0) => return Poll::Ready(Ok(mem::replace(num_read, 0))),
+ Ok(num) => {
+ *num_read += num;
+ }
}
}
}
-// This uses an adaptive system to extend the vector when it fills. We want to
-// avoid paying to allocate and zero a huge chunk of memory if the reader only
-// has 4 bytes while still making large reads if the reader does have a ton
-// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
-// time is 4,500 times (!) slower than this if the reader has a very small
-// amount of data to return.
-//
-// Because we're extending the buffer with uninitialized data for trusted
-// readers, we need to make sure to truncate that if any of this panics.
-pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
- mut rd: Pin<&mut R>,
- cx: &mut Context<'_>,
+/// Tries to read from the provided AsyncRead.
+///
+/// The length of the buffer is increased by the number of bytes read.
+///
+/// # Safety
+///
+/// The caller ensures that the buffer has been prepared for use with the
+/// AsyncRead before calling this function. This can be done using the
+/// `prepare_buffer` function later in this file.
+unsafe fn poll_read_to_end<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
- start_len: usize,
+ read: Pin<&mut R>,
+ cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
- let mut g = Guard {
- len: buf.len(),
- buf,
- };
- let ret;
- loop {
- if g.len == g.buf.len() {
- unsafe {
- g.buf.reserve(32);
- let capacity = g.buf.capacity();
- g.buf.set_len(capacity);
+ // This uses an adaptive system to extend the vector when it fills. We want to
+ // avoid paying to allocate and zero a huge chunk of memory if the reader only
+ // has 4 bytes while still making large reads if the reader does have a ton
+ // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
+ // time is 4,500 times (!) slower than this if the reader has a very small
+ // amount of data to return.
+ reserve(buf, &*read, 32);
- let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]);
+ let unused_capacity: &mut [MaybeUninit<u8>] = get_unused_capacity(buf);
- rd.prepare_uninitialized_buffer(b);
- }
- }
+ // safety: The buffer has been prepared for use with the AsyncRead before
+ // calling this function.
+ let slice: &mut [u8] = &mut *(unused_capacity as *mut [MaybeUninit<u8>] as *mut [u8]);
- match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
- Ok(0) => {
- ret = Poll::Ready(Ok(g.len - start_len));
- break;
- }
- Ok(n) => g.len += n,
- Err(e) => {
- ret = Poll::Ready(Err(e));
- break;
- }
- }
+ let res = ready!(read.poll_read(cx, slice));
+ if let Ok(num) = res {
+ // safety: There are two situations:
+ //
+ // 1. The AsyncRead has not overriden `prepare_uninitialized_buffer`.
+ //
+ // In this situation, the default implementation of that method will have
+ // zeroed the unused capacity. This means that setting the length will
+ // never expose uninitialized memory in the vector.
+ //
+ // Note that the assert! below ensures that we don't set the length to
+ // something larger than the capacity, which malicious implementors might
+ // try to have us do.
+ //
+ // 2. The AsyncRead has overriden `prepare_uninitialized_buffer`.
+ //
+ // In this case, the safety of the `set_len` call below relies on this
+ // guarantee from the documentation on `prepare_uninitialized_buffer`:
+ //
+ // > This function isn't actually unsafe to call but unsafe to implement.
+ // > The implementer must ensure that either the whole buf has been zeroed
+ // > or poll_read() overwrites the buffer without reading it and returns
+ // > correct value.
+ //
+ // Note that `prepare_uninitialized_buffer` is unsafe to implement, so this
+ // is a guarantee we can rely on in unsafe code.
+ //
+ // The assert!() is technically only necessary in the first case.
+ let new_len = buf.len() + num;
+ assert!(new_len <= buf.capacity());
+
+ buf.set_len(new_len);
}
+ Poll::Ready(res)
+}
+
+/// This function prepares the unused capacity for use with the provided AsyncRead.
+pub(super) fn prepare_buffer<R: AsyncRead + ?Sized>(buf: &mut Vec<u8>, read: &R) {
+ let buffer = get_unused_capacity(buf);
- ret
+ // safety: This function is only unsafe to implement.
+ unsafe {
+ read.prepare_uninitialized_buffer(buffer);
+ }
+}
+
+/// Allocates more memory and ensures that the unused capacity is prepared for use
+/// with the `AsyncRead`.
+fn reserve<R: AsyncRead + ?Sized>(buf: &mut Vec<u8>, read: &R, bytes: usize) {
+ if buf.capacity() - buf.len() >= bytes {
+ return;
+ }
+ buf.reserve(bytes);
+ // The call above has reallocated the buffer, so we must reinitialize the entire
+ // unused capacity, even if we already initialized some of it before the resize.
+ prepare_buffer(buf, read);
+}
+
+/// Returns the unused capacity of the provided vector.
+fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
+ bytes::BufMut::bytes_mut(buf)
}
impl<A> Future for ReadToEnd<'_, A>
@@ -96,8 +151,10 @@ where
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let this = &mut *self;
- read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
+ let Self { reader, buf, read } = &mut *self;
+
+ // safety: The constructor of ReadToEnd calls `prepare_buffer`
+ unsafe { read_to_end_internal(buf, Pin::new(*reader), read, cx) }
}
}
diff --git a/tokio/src/io/util/read_to_string.rs b/tokio/src/io/util/read_to_string.rs
index cab0505a..4ef50be3 100644
--- a/tokio/src/io/util/read_to_string.rs
+++ b/tokio/src/io/util/read_to_string.rs
@@ -1,4 +1,5 @@
-use crate::io::util::read_to_end::read_to_end_internal;
+use crate::io::util::read_line::finish_string_read;
+use crate::io::util::read_to_end::{prepare_buffer, read_to_end_internal};
use crate::io::AsyncRead;
use std::future::Future;
@@ -12,47 +13,54 @@ cfg_io_util! {
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToString<'a, R: ?Sized> {
reader: &'a mut R,
- buf: &'a mut String,
- bytes: Vec<u8>,
- start_len: usize,
+ /// This is the buffer we were provided. It will be replaced with an empty string
+ /// while reading to postpone utf-8 handling until after reading.
+ output: &'a mut String,
+ /// The actual allocation of the string is moved into this vector instead.
+ buf: Vec<u8>,
+ /// The number of bytes appended to buf. This can be less than buf.len() if
+ /// the buffer was not empty when the operation was started.
+ read: usize,
}
}
-pub(crate) fn read_to_string<'a, R>(reader: &'a mut R, buf: &'a mut String) -> ReadToString<'a, R>
+pub(crate) fn read_to_string<'a, R>(
+ reader: &'a mut R,
+ string: &'a mut String,
+) -> ReadToString<'a, R>
where
R: AsyncRead + ?Sized + Unpin,
{
- let start_len = buf.len();
+ let mut buf = mem::replace(string, String::new()).into_bytes();
+ prepare_buffer(&mut buf, reader);
ReadToString {
reader,
- bytes: mem::replace(buf, String::new()).into_bytes(),
buf,
- start_len,
+ output: string,
+ read: 0,
}
}
-fn read_to_string_internal<R: AsyncRead + ?Sized>(
+/// # Safety
+///
+/// Before first calling this method, the unused capacity must have been
+/// prepared for use with the provided AsyncRead. This can be done using the
+/// `prepare_buffer` function in `read_to_end.rs`.
+unsafe fn read_to_string_internal<R: AsyncRead + ?Sized>(
reader: Pin<&mut R>,
+ output: &mut String,
+ buf: &mut Vec<u8>,
+ read: &mut usize,
cx: &mut Context<'_>,
- buf: &mut String,
- bytes: &mut Vec<u8>,
- start_len: usize,
) -> Poll<io::Result<usize>> {
- let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len))?;
- match String::from_utf8(mem::replace(bytes, Vec::new())) {
- Ok(string) => {
- debug_assert!(buf.is_empty());
- *buf = string;
- Poll::Ready(Ok(ret))
- }
- Err(e) => {
- *bytes = e.into_bytes();
- Poll::Ready(Err(io::Error::new(
- io::ErrorKind::InvalidData,
- "stream did not contain valid UTF-8",
- )))
- }
- }
+ let io_res = ready!(read_to_end_internal(buf, reader, read, cx));
+ let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
+
+ // At this point both buf and output are empty. The allocation is in utf8_res.
+
+ debug_assert!(buf.is_empty());
+ debug_assert!(output.is_empty());
+ finish_string_read(io_res, utf8_res, *read, output, true)
}
impl<A> Future for ReadToString<'_, A>
@@ -65,17 +73,12 @@ where
let Self {
reader,
buf,
- bytes,
- start_len,
+ output,
+ read,
} = &mut *self;
- let ret = read_to_string_internal(Pin::new(reader), cx, buf, bytes, *start_len);
- if let Poll::Ready(Err(_)) = ret {
- // Put back the original string.
- bytes.truncate(*start_len);
- **buf = String::from_utf8(mem::replace(bytes, Vec::new()))
- .expect("original string no longer utf-8");
- }
- ret
+
+ // safety: The constructor of ReadToString called `prepare_buffer`.
+ unsafe { read_to_string_internal(Pin::new(*reader), output, buf, read, cx) }
}
}