use crate::io::util::DEFAULT_BUF_SIZE; use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; use std::{cmp, fmt}; pin_project! { /// The `BufReader` struct adds buffering to any reader. /// /// It can be excessively inefficient to work directly with a [`AsyncRead`] /// instance. A `BufReader` performs large, infrequent reads on the underlying /// [`AsyncRead`] and maintains an in-memory buffer of the results. /// /// `BufReader` can improve the speed of programs that make *small* and /// *repeated* read calls to the same file or network socket. It does not /// help when reading very large amounts at once, or reading just one or a few /// times. It also provides no advantage when reading from a source that is /// already in memory, like a `Vec`. /// /// When the `BufReader` is dropped, the contents of its buffer will be /// discarded. Creating multiple instances of a `BufReader` on the same /// stream can cause data loss. #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct BufReader { #[pin] pub(super) inner: R, pub(super) buf: Box<[u8]>, pub(super) pos: usize, pub(super) cap: usize, } } impl BufReader { /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, /// but may change in the future. pub fn new(inner: R) -> Self { Self::with_capacity(DEFAULT_BUF_SIZE, inner) } /// Creates a new `BufReader` with the specified buffer capacity. pub fn with_capacity(capacity: usize, inner: R) -> Self { let buffer = vec![0; capacity]; Self { inner, buf: buffer.into_boxed_slice(), pos: 0, cap: 0, } } /// Gets a reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. pub fn get_ref(&self) -> &R { &self.inner } /// Gets a mutable reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. pub fn get_mut(&mut self) -> &mut R { &mut self.inner } /// Gets a pinned mutable reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { self.project().inner } /// Consumes this `BufReader`, returning the underlying reader. /// /// Note that any leftover data in the internal buffer is lost. pub fn into_inner(self) -> R { self.inner } /// Returns a reference to the internally buffered data. /// /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. pub fn buffer(&self) -> &[u8] { &self.buf[self.pos..self.cap] } /// Invalidates all data in the internal buffer. #[inline] fn discard_buffer(self: Pin<&mut Self>) { let me = self.project(); *me.pos = 0; *me.cap = 0; } } impl AsyncRead for BufReader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { // If we don't have any buffered data and we're doing a massive read // (larger than our internal buffer), bypass our internal buffer // entirely. if self.pos == self.cap && buf.remaining() >= self.buf.len() { let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); self.discard_buffer(); return Poll::Ready(res); } let rem = ready!(self.as_mut().poll_fill_buf(cx))?; let amt = std::cmp::min(rem.len(), buf.remaining()); buf.put_slice(&rem[..amt]); self.consume(amt); Poll::Ready(Ok(())) } } impl AsyncBufRead for BufReader { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let me = self.project(); // If we've reached the end of our internal buffer then we need to fetch // some more data from the underlying reader. // Branch using `>=` instead of the more correct `==` // to tell the compiler that the pos..cap slice is always valid. if *me.pos >= *me.cap { debug_assert!(*me.pos == *me.cap); let mut buf = ReadBuf::new(me.buf); ready!(me.inner.poll_read(cx, &mut buf))?; *me.cap = buf.filled().len(); *me.pos = 0; } Poll::Ready(Ok(&me.buf[*me.pos..*me.cap])) } fn consume(self: Pin<&mut Self>, amt: usize) { let me = self.project(); *me.pos = cmp::min(*me.pos + amt, *me.cap); } } impl AsyncWrite for BufReader { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.get_pin_mut().poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_pin_mut().poll_flush(cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_pin_mut().poll_shutdown(cx) } } impl fmt::Debug for BufReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("reader", &self.inner) .field( "buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), ) .finish() } } #[cfg(test)] mod tests { use super::*; #[test] fn assert_unpin() { crate::is_unpin::>(); } }