diff options
-rw-r--r-- | tokio-util/CHANGELOG.md | 6 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_impl.rs | 2 | ||||
-rw-r--r-- | tokio-util/src/io/mod.rs | 1 | ||||
-rw-r--r-- | tokio-util/src/io/read_buf.rs | 2 | ||||
-rw-r--r-- | tokio-util/src/io/reader_stream.rs | 2 | ||||
-rw-r--r-- | tokio-util/src/lib.rs | 45 |
6 files changed, 49 insertions, 9 deletions
diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 64228895..48dfe778 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,11 +1,11 @@ +### Added +- io: `poll_read_buf` util fn (#2972). + # 0.5.0 (October 30, 2020) ### Changed - io: update `bytes` to 0.6 (#3071). -### Added -- io: `poll_read_buf` util fn (#2972). - # 0.4.0 (October 15, 2020) ### Added diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index c161808f..e8b29999 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -150,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { + let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 6f181ab1..eefd65a5 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -13,3 +13,4 @@ mod stream_reader; pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; +pub use crate::util::poll_read_buf; diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs index 5bc0d586..cc3c505f 100644 --- a/tokio-util/src/io/read_buf.rs +++ b/tokio-util/src/io/read_buf.rs @@ -59,7 +59,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = &mut *self; - crate::util::poll_read_buf(cx, Pin::new(this.0), this.1) + crate::util::poll_read_buf(Pin::new(this.0), cx, this.1) } } } diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index ab0c22fb..3e6a05ef 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -83,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> { this.buf.reserve(CAPACITY); } - match poll_read_buf(cx, reader, &mut this.buf) { + match poll_read_buf(reader, cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 253f4437..09dd5a10 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -69,10 +69,49 @@ mod util { use std::pin::Pin; use std::task::{Context, Poll}; - pub(crate) fn poll_read_buf<T: AsyncRead>( - cx: &mut Context<'_>, + /// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. + /// + /// [`Buf`]: bytes::Buf + /// + /// # Example + /// + /// ``` + /// use bytes::{Bytes, BytesMut}; + /// use tokio::stream; + /// use tokio::io::Result; + /// use tokio_util::io::{StreamReader, poll_read_buf}; + /// use futures::future::poll_fn; + /// use std::pin::Pin; + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// // Create a reader from an iterator. This particular reader will always be + /// // ready. + /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); + /// + /// let mut buf = BytesMut::new(); + /// let mut reads = 0; + /// + /// loop { + /// reads += 1; + /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; + /// + /// if n == 0 { + /// break; + /// } + /// } + /// + /// // one or more reads might be necessary. + /// assert!(reads >= 1); + /// assert_eq!(&buf[..], &[0, 1, 2, 3]); + /// # Ok(()) + /// # } + /// ``` + #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] + pub fn poll_read_buf<T: AsyncRead, B: BufMut>( io: Pin<&mut T>, - buf: &mut impl BufMut, + cx: &mut Context<'_>, + buf: &mut B, ) -> Poll<io::Result<usize>> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0)); |