diff options
author | Dirkjan Ochtman <dirkjan@ochtman.nl> | 2020-10-27 09:30:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-27 09:30:29 +0100 |
commit | fe2b9976755407b85c82b5cdee9d8c5e16e3d6c6 (patch) | |
tree | 65d9278870691e102cbcc8122782fe75104d2894 /tokio | |
parent | 6d0ba19af51015dcd80558ae768215448e285fdf (diff) |
util: upgrade tokio-util to bytes 0.6 (#3052)
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/io/util/async_read_ext.rs | 28 | ||||
-rw-r--r-- | tokio/src/io/util/read_buf.rs | 57 | ||||
-rw-r--r-- | tokio/tests/io_read_buf.rs | 36 |
3 files changed, 96 insertions, 25 deletions
diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index 0ab66c28..96a5f70d 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1,6 +1,6 @@ use crate::io::util::chain::{chain, Chain}; use crate::io::util::read::{read, Read}; -use crate::io::util::read_buf::{read_buf, ReadBuf}; +use crate::io::util::read_buf::{poll_read_buf, read_buf, ReadBuf}; use crate::io::util::read_exact::{read_exact, ReadExact}; use crate::io::util::read_int::{ ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8, @@ -14,6 +14,8 @@ use crate::io::util::take::{take, Take}; use crate::io::AsyncRead; use bytes::BufMut; +use std::io; +use std::task::{Context, Poll}; cfg_io_util! { /// Defines numeric reader @@ -185,7 +187,7 @@ cfg_io_util! { /// /// On a successful read, the number of read bytes is returned. If the /// supplied buffer is not empty and the function returns `Ok(0)` then - /// the source as reached an "end-of-file" event. + /// the source has reached an "end-of-file" event. /// /// # Errors /// @@ -231,6 +233,28 @@ cfg_io_util! { read_buf(self, buf) } + /// Attempts to pull some bytes from this source into the specified buffer, + /// advancing the buffer's internal cursor if the underlying reader is ready. + /// + /// Usually, only a single `read` syscall is issued, even if there is + /// more space in the supplied buffer. + /// + /// # Return + /// + /// On a successful read, the number of read bytes is returned. If the + /// supplied buffer is not empty and the function returns `Ok(0)` then + /// the source has reached an "end-of-file" event. + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. If an error is returned then it must be + /// guaranteed that no bytes were read. + /// ``` + fn poll_read_buf<'a, B>(&'a mut self, buf: &'a mut B, cx: &mut Context<'_>) -> Poll<io::Result<usize>> where Self: Unpin + Sized, B: BufMut { + poll_read_buf(self, buf, cx) + } + /// Reads the exact number of bytes required to fill `buf`. /// /// Equivalent to: diff --git a/tokio/src/io/util/read_buf.rs b/tokio/src/io/util/read_buf.rs index 696deefd..7df429d7 100644 --- a/tokio/src/io/util/read_buf.rs +++ b/tokio/src/io/util/read_buf.rs @@ -40,33 +40,44 @@ where type Output = io::Result<usize>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> { - use crate::io::ReadBuf; - use std::mem::MaybeUninit; - - let me = self.project(); + let mut me = self.project(); + poll_read_buf(&mut me.reader, &mut me.buf, cx) + } +} - if !me.buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } +pub(crate) fn poll_read_buf<'a, R, B>( + reader: &'a mut R, + buf: &'a mut B, + cx: &mut Context<'_>, +) -> Poll<io::Result<usize>> +where + R: AsyncRead + Unpin, + B: BufMut, +{ + use crate::io::ReadBuf; + use std::mem::MaybeUninit; - let n = { - let dst = me.buf.bytes_mut(); - let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) }; - let mut buf = ReadBuf::uninit(dst); - let ptr = buf.filled().as_ptr(); - ready!(Pin::new(me.reader).poll_read(cx, &mut buf)?); + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } - // Ensure the pointer does not change from under us - assert_eq!(ptr, buf.filled().as_ptr()); - buf.filled().len() - }; + let n = { + let dst = buf.bytes_mut(); + let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) }; + let mut buf = ReadBuf::uninit(dst); + let ptr = buf.filled().as_ptr(); + ready!(Pin::new(reader).poll_read(cx, &mut buf)?); - // Safety: This is guaranteed to be the number of initialized (and read) - // bytes due to the invariants provided by `ReadBuf::filled`. - unsafe { - me.buf.advance_mut(n); - } + // Ensure the pointer does not change from under us + assert_eq!(ptr, buf.filled().as_ptr()); + buf.filled().len() + }; - Poll::Ready(Ok(n)) + // Safety: This is guaranteed to be the number of initialized (and read) + // bytes due to the invariants provided by `ReadBuf::filled`. + unsafe { + buf.advance_mut(n); } + + Poll::Ready(Ok(n)) } diff --git a/tokio/tests/io_read_buf.rs b/tokio/tests/io_read_buf.rs index 0328168d..35c12126 100644 --- a/tokio/tests/io_read_buf.rs +++ b/tokio/tests/io_read_buf.rs @@ -4,6 +4,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_test::assert_ok; +use futures::future::poll_fn; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -34,3 +35,38 @@ async fn read_buf() { assert_eq!(n, 11); assert_eq!(buf[..], b"hello world"[..]); } + +#[tokio::test] +async fn poll_read_buf() { + struct Rd { + cnt: usize, + } + + impl AsyncRead for Rd { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.cnt += 1; + buf.put_slice(b"hello world"); + Poll::Ready(Ok(())) + } + } + + let mut buf = vec![]; + let mut rd = Rd { cnt: 0 }; + + let res = tokio::spawn(async move { + poll_fn(|cx| { + let res = rd.poll_read_buf(&mut buf, cx); + assert_eq!(1, rd.cnt); + assert_eq!(buf[..], b"hello world"[..]); + res + }) + .await + }) + .await; + + assert!(matches!(res, Ok(Ok(11usize)))); +} |