use bytes::{Bytes, BytesMut}; use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; const CAPACITY: usize = 4096; pin_project! { /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. /// /// This stream is fused. It performs the inverse operation of /// [`StreamReader`]. /// /// # Example /// /// ``` /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { /// use tokio::stream::StreamExt; /// use tokio_util::io::ReaderStream; /// /// // Create a stream of data. /// let data = b"hello, world!"; /// let mut stream = ReaderStream::new(&data[..]); /// /// // Read all of the chunks into a vector. /// let mut stream_contents = Vec::new(); /// while let Some(chunk) = stream.next().await { /// stream_contents.extend_from_slice(&chunk?); /// } /// /// // Once the chunks are concatenated, we should have the /// // original data. /// assert_eq!(stream_contents, data); /// # Ok(()) /// # } /// ``` /// /// [`AsyncRead`]: tokio::io::AsyncRead /// [`StreamReader`]: crate::io::StreamReader /// [`Stream`]: tokio::stream::Stream #[derive(Debug)] pub struct ReaderStream { // Reader itself. // // This value is `None` if the stream has terminated. #[pin] reader: Option, // Working buffer, used to optimize allocations. buf: BytesMut, } } impl ReaderStream { /// Convert an [`AsyncRead`] into a [`Stream`] with item type /// `Result`. /// /// [`AsyncRead`]: tokio::io::AsyncRead /// [`Stream`]: tokio::stream::Stream pub fn new(reader: R) -> Self { ReaderStream { reader: Some(reader), buf: BytesMut::new(), } } } impl Stream for ReaderStream { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use crate::util::poll_read_buf; let mut this = self.as_mut().project(); let reader = match this.reader.as_pin_mut() { Some(r) => r, None => return Poll::Ready(None), }; if this.buf.capacity() == 0 { this.buf.reserve(CAPACITY); } match poll_read_buf(reader, cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); Poll::Ready(Some(Err(err))) } Poll::Ready(Ok(0)) => { self.project().reader.set(None); Poll::Ready(None) } Poll::Ready(Ok(_)) => { let chunk = this.buf.split(); Poll::Ready(Some(Ok(chunk.freeze()))) } } } }