diff options
Diffstat (limited to 'tokio/src/io/util/read_to_end.rs')
-rw-r--r-- | tokio/src/io/util/read_to_end.rs | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs new file mode 100644 index 00000000..36eba5bb --- /dev/null +++ b/tokio/src/io/util/read_to_end.rs @@ -0,0 +1,108 @@ +use crate::io::AsyncRead; + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadToEnd<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + start_len: usize, +} + +pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R> +where + R: AsyncRead + Unpin + ?Sized, +{ + let start_len = buf.len(); + ReadToEnd { + reader, + buf, + start_len, + } +} + +struct Guard<'a> { + buf: &'a mut Vec<u8>, + len: usize, +} + +impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { + self.buf.set_len(self.len); + } + } +} + +// This uses an adaptive system to extend the vector when it fills. We want to +// avoid paying to allocate and zero a huge chunk of memory if the reader only +// has 4 bytes while still making large reads if the reader does have a ton +// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every +// time is 4,500 times (!) slower than this if the reader has a very small +// amount of data to return. +// +// Because we're extending the buffer with uninitialized data for trusted +// readers, we need to make sure to truncate that if any of this panics. +pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec<u8>, + start_len: usize, +) -> Poll<io::Result<usize>> { + let mut g = Guard { + len: buf.len(), + buf, + }; + let ret; + loop { + if g.len == g.buf.len() { + unsafe { + g.buf.reserve(32); + let capacity = g.buf.capacity(); + g.buf.set_len(capacity); + rd.prepare_uninitialized_buffer(&mut g.buf[g.len..]); + } + } + + match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { + Ok(0) => { + ret = Poll::Ready(Ok(g.len - start_len)); + break; + } + Ok(n) => g.len += n, + Err(e) => { + ret = Poll::Ready(Err(e)); + break; + } + } + } + + ret +} + +impl<A> Future for ReadToEnd<'_, A> +where + A: AsyncRead + ?Sized + Unpin, +{ + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::<ReadToEnd<'_, PhantomPinned>>(); + } +} |