summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorDirkjan Ochtman <dirkjan@ochtman.nl>2020-10-27 09:30:29 +0100
committerGitHub <noreply@github.com>2020-10-27 09:30:29 +0100
commitfe2b9976755407b85c82b5cdee9d8c5e16e3d6c6 (patch)
tree65d9278870691e102cbcc8122782fe75104d2894 /tokio
parent6d0ba19af51015dcd80558ae768215448e285fdf (diff)
util: upgrade tokio-util to bytes 0.6 (#3052)
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/util/async_read_ext.rs28
-rw-r--r--tokio/src/io/util/read_buf.rs57
-rw-r--r--tokio/tests/io_read_buf.rs36
3 files changed, 96 insertions, 25 deletions
diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs
index 0ab66c28..96a5f70d 100644
--- a/tokio/src/io/util/async_read_ext.rs
+++ b/tokio/src/io/util/async_read_ext.rs
@@ -1,6 +1,6 @@
use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
-use crate::io::util::read_buf::{read_buf, ReadBuf};
+use crate::io::util::read_buf::{poll_read_buf, read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_int::{
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
@@ -14,6 +14,8 @@ use crate::io::util::take::{take, Take};
use crate::io::AsyncRead;
use bytes::BufMut;
+use std::io;
+use std::task::{Context, Poll};
cfg_io_util! {
/// Defines numeric reader
@@ -185,7 +187,7 @@ cfg_io_util! {
///
/// On a successful read, the number of read bytes is returned. If the
/// supplied buffer is not empty and the function returns `Ok(0)` then
- /// the source as reached an "end-of-file" event.
+ /// the source has reached an "end-of-file" event.
///
/// # Errors
///
@@ -231,6 +233,28 @@ cfg_io_util! {
read_buf(self, buf)
}
+ /// Attempts to pull some bytes from this source into the specified buffer,
+ /// advancing the buffer's internal cursor if the underlying reader is ready.
+ ///
+ /// Usually, only a single `read` syscall is issued, even if there is
+ /// more space in the supplied buffer.
+ ///
+ /// # Return
+ ///
+ /// On a successful read, the number of read bytes is returned. If the
+ /// supplied buffer is not empty and the function returns `Ok(0)` then
+ /// the source has reached an "end-of-file" event.
+ ///
+ /// # Errors
+ ///
+ /// If this function encounters any form of I/O or other error, an error
+ /// variant will be returned. If an error is returned then it must be
+ /// guaranteed that no bytes were read.
+ /// ```
+ fn poll_read_buf<'a, B>(&'a mut self, buf: &'a mut B, cx: &mut Context<'_>) -> Poll<io::Result<usize>> where Self: Unpin + Sized, B: BufMut {
+ poll_read_buf(self, buf, cx)
+ }
+
/// Reads the exact number of bytes required to fill `buf`.
///
/// Equivalent to:
diff --git a/tokio/src/io/util/read_buf.rs b/tokio/src/io/util/read_buf.rs
index 696deefd..7df429d7 100644
--- a/tokio/src/io/util/read_buf.rs
+++ b/tokio/src/io/util/read_buf.rs
@@ -40,33 +40,44 @@ where
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
- use crate::io::ReadBuf;
- use std::mem::MaybeUninit;
-
- let me = self.project();
+ let mut me = self.project();
+ poll_read_buf(&mut me.reader, &mut me.buf, cx)
+ }
+}
- if !me.buf.has_remaining_mut() {
- return Poll::Ready(Ok(0));
- }
+pub(crate) fn poll_read_buf<'a, R, B>(
+ reader: &'a mut R,
+ buf: &'a mut B,
+ cx: &mut Context<'_>,
+) -> Poll<io::Result<usize>>
+where
+ R: AsyncRead + Unpin,
+ B: BufMut,
+{
+ use crate::io::ReadBuf;
+ use std::mem::MaybeUninit;
- let n = {
- let dst = me.buf.bytes_mut();
- let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
- let mut buf = ReadBuf::uninit(dst);
- let ptr = buf.filled().as_ptr();
- ready!(Pin::new(me.reader).poll_read(cx, &mut buf)?);
+ if !buf.has_remaining_mut() {
+ return Poll::Ready(Ok(0));
+ }
- // Ensure the pointer does not change from under us
- assert_eq!(ptr, buf.filled().as_ptr());
- buf.filled().len()
- };
+ let n = {
+ let dst = buf.bytes_mut();
+ let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
+ let mut buf = ReadBuf::uninit(dst);
+ let ptr = buf.filled().as_ptr();
+ ready!(Pin::new(reader).poll_read(cx, &mut buf)?);
- // Safety: This is guaranteed to be the number of initialized (and read)
- // bytes due to the invariants provided by `ReadBuf::filled`.
- unsafe {
- me.buf.advance_mut(n);
- }
+ // Ensure the pointer does not change from under us
+ assert_eq!(ptr, buf.filled().as_ptr());
+ buf.filled().len()
+ };
- Poll::Ready(Ok(n))
+ // Safety: This is guaranteed to be the number of initialized (and read)
+ // bytes due to the invariants provided by `ReadBuf::filled`.
+ unsafe {
+ buf.advance_mut(n);
}
+
+ Poll::Ready(Ok(n))
}
diff --git a/tokio/tests/io_read_buf.rs b/tokio/tests/io_read_buf.rs
index 0328168d..35c12126 100644
--- a/tokio/tests/io_read_buf.rs
+++ b/tokio/tests/io_read_buf.rs
@@ -4,6 +4,7 @@
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_test::assert_ok;
+use futures::future::poll_fn;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -34,3 +35,38 @@ async fn read_buf() {
assert_eq!(n, 11);
assert_eq!(buf[..], b"hello world"[..]);
}
+
+#[tokio::test]
+async fn poll_read_buf() {
+ struct Rd {
+ cnt: usize,
+ }
+
+ impl AsyncRead for Rd {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.cnt += 1;
+ buf.put_slice(b"hello world");
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ let mut buf = vec![];
+ let mut rd = Rd { cnt: 0 };
+
+ let res = tokio::spawn(async move {
+ poll_fn(|cx| {
+ let res = rd.poll_read_buf(&mut buf, cx);
+ assert_eq!(1, rd.cnt);
+ assert_eq!(buf[..], b"hello world"[..]);
+ res
+ })
+ .await
+ })
+ .await;
+
+ assert!(matches!(res, Ok(Ok(11usize))));
+}