diff options
Diffstat (limited to 'tokio-util/src/io/reader_stream.rs')
-rw-r--r-- | tokio-util/src/io/reader_stream.rs | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index 49288c45..ab0c22fb 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -3,7 +3,7 @@ use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::AsyncRead; const CAPACITY: usize = 4096; @@ -70,9 +70,11 @@ impl<R: AsyncRead> ReaderStream<R> { impl<R: AsyncRead> Stream for ReaderStream<R> { type Item = std::io::Result<Bytes>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + use crate::util::poll_read_buf; + let mut this = self.as_mut().project(); - let mut reader = match this.reader.as_pin_mut() { + let reader = match this.reader.as_pin_mut() { Some(r) => r, None => return Poll::Ready(None), }; @@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> { this.buf.reserve(CAPACITY); } - match reader.poll_read_buf(&mut this.buf, cx) { + match poll_read_buf(cx, reader, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); |