diff options
author | John-John Tedro <udoprog@tedro.se> | 2020-10-19 11:06:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-19 11:06:06 +0200 |
commit | 8d17261a4b0e83487f6503816c45376bd82eb41d (patch) | |
tree | 037767371fadbffc6ef0688ce9d5889b78016990 | |
parent | 423ecc187a1c68cfd3d7288ce19f4c9a780e2060 (diff) |
util: add a poll_read_buf shim to tokio-util (#2972)
-rw-r--r-- | tokio-util/src/io/mod.rs | 4 | ||||
-rw-r--r-- | tokio-util/src/io/poll_read_buf.rs | 72 | ||||
-rw-r--r-- | tokio-util/src/io/read_buf.rs | 65 |
3 files changed, 141 insertions, 0 deletions
diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 53066c4e..7cf25989 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -6,8 +6,12 @@ //! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html //! [`AsyncRead`]: tokio::io::AsyncRead +mod poll_read_buf; +mod read_buf; mod reader_stream; mod stream_reader; +pub use self::poll_read_buf::poll_read_buf; +pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; diff --git a/tokio-util/src/io/poll_read_buf.rs b/tokio-util/src/io/poll_read_buf.rs new file mode 100644 index 00000000..fe7d14ca --- /dev/null +++ b/tokio-util/src/io/poll_read_buf.rs @@ -0,0 +1,72 @@ +use bytes::BufMut; +use futures_core::ready; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, ReadBuf}; + +/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. +/// +/// [`Buf`]: bytes::Buf +/// +/// # Example +/// +/// ``` +/// use bytes::{Bytes, BytesMut}; +/// use tokio::stream; +/// use tokio::io::Result; +/// use tokio_util::io::{StreamReader, poll_read_buf}; +/// use futures::future::poll_fn; +/// use std::pin::Pin; +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// +/// // Create a reader from an iterator. This particular reader will always be +/// // ready. +/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); +/// +/// let mut buf = BytesMut::new(); +/// let mut reads = 0; +/// +/// loop { +/// reads += 1; +/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; +/// +/// if n == 0 { +/// break; +/// } +/// } +/// +/// // one or more reads might be necessary. +/// assert!(reads >= 1); +/// assert_eq!(&buf[..], &[0, 1, 2, 3]); +/// # Ok(()) +/// # } +/// ``` +pub fn poll_read_buf<R, B>( + read: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut B, +) -> Poll<io::Result<usize>> +where + R: AsyncRead, + B: BufMut, +{ + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let n = { + let mut buf = ReadBuf::uninit(buf.bytes_mut()); + ready!(read.poll_read(cx, &mut buf)?); + buf.filled().len() + }; + + // Safety: This is guaranteed to be the number of initialized (and read) + // bytes due to the invariants provided by `ReadBuf::filled`. + unsafe { + buf.advance_mut(n); + } + + Poll::Ready(Ok(n)) +} diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs new file mode 100644 index 00000000..d617fa6f --- /dev/null +++ b/tokio-util/src/io/read_buf.rs @@ -0,0 +1,65 @@ +use bytes::BufMut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; + +/// Read data from an `AsyncRead` into an implementer of the [`Buf`] trait. +/// +/// [`Buf`]: bytes::Buf +/// +/// # Example +/// +/// ``` +/// use bytes::{Bytes, BytesMut}; +/// use tokio::stream; +/// use tokio::io::Result; +/// use tokio_util::io::{StreamReader, read_buf}; +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// +/// // Create a reader from an iterator. This particular reader will always be +/// // ready. +/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); +/// +/// let mut buf = BytesMut::new(); +/// let mut reads = 0; +/// +/// loop { +/// reads += 1; +/// let n = read_buf(&mut read, &mut buf).await?; +/// +/// if n == 0 { +/// break; +/// } +/// } +/// +/// // one or more reads might be necessary. +/// assert!(reads >= 1); +/// assert_eq!(&buf[..], &[0, 1, 2, 3]); +/// # Ok(()) +/// # } +/// ``` +pub async fn read_buf<R, B>(read: &mut R, buf: &mut B) -> io::Result<usize> +where + R: AsyncRead + Unpin, + B: BufMut, +{ + return ReadBufFn(read, buf).await; + + struct ReadBufFn<'a, R, B>(&'a mut R, &'a mut B); + + impl<'a, R, B> Future for ReadBufFn<'a, R, B> + where + R: AsyncRead + Unpin, + B: BufMut, + { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + super::poll_read_buf(Pin::new(this.0), cx, this.1) + } + } +} |