From c23c1ecbcbd7ff8e1ee137f691eddad31aa39331 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 6 Oct 2020 02:32:11 +0900 Subject: io, stream: make ext trait futures !Unpin (#2910) Make these future `!Unpin` for compatibility with async trait methods. --- tokio/src/io/seek.rs | 32 +++++++++++++--------------- tokio/src/io/util/async_seek_ext.rs | 6 ++++++ tokio/src/io/util/flush.rs | 28 ++++++++++++------------- tokio/src/io/util/read.rs | 32 +++++++++++++--------------- tokio/src/io/util/read_exact.rs | 28 +++++++++++-------------- tokio/src/io/util/read_int.rs | 15 ++++++++++++- tokio/src/io/util/read_line.rs | 40 +++++++++++++---------------------- tokio/src/io/util/read_to_end.rs | 42 +++++++++++++++++-------------------- tokio/src/io/util/read_to_string.rs | 40 +++++++++++++---------------------- tokio/src/io/util/read_until.rs | 34 +++++++++++------------------- tokio/src/io/util/shutdown.rs | 30 +++++++++++++------------- tokio/src/io/util/write.rs | 17 +++++++++++---- tokio/src/io/util/write_all.rs | 34 ++++++++++++++---------------- tokio/src/io/util/write_int.rs | 18 ++++++++++++++-- tokio/src/stream/all.rs | 34 +++++++++++++++++++----------- tokio/src/stream/any.rs | 34 +++++++++++++++++++----------- tokio/src/stream/collect.rs | 12 +++++++++-- tokio/src/stream/fold.rs | 6 ++++++ tokio/src/stream/mod.rs | 25 ++++++++++++++++++++++ tokio/src/stream/next.rs | 29 ++++++++++++++++--------- tokio/src/stream/try_next.rs | 26 +++++++++++++++-------- 21 files changed, 315 insertions(+), 247 deletions(-) (limited to 'tokio') diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index 8f071167..c90330a5 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -1,16 +1,22 @@ use crate::io::AsyncSeek; + +use pin_project_lite::pin_project; use std::future::Future; use std::io::{self, SeekFrom}; +use std::marker::PhantomPinned; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { /// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Seek<'a, S: ?Sized> { seek: &'a mut S, pos: Option, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -21,6 +27,7 @@ where Seek { seek, pos: Some(pos), + _pin: PhantomPinned, } } @@ -30,29 +37,18 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let me = &mut *self; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); match me.pos { - Some(pos) => match Pin::new(&mut me.seek).start_seek(cx, pos) { + Some(pos) => match Pin::new(&mut *me.seek).start_seek(cx, *pos) { Poll::Ready(Ok(())) => { - me.pos = None; - Pin::new(&mut me.seek).poll_complete(cx) + *me.pos = None; + Pin::new(&mut *me.seek).poll_complete(cx) } Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Pending => Poll::Pending, }, - None => Pin::new(&mut me.seek).poll_complete(cx), + None => Pin::new(&mut *me.seek).poll_complete(cx), } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs index 35bc94ee..351900bd 100644 --- a/tokio/src/io/util/async_seek_ext.rs +++ b/tokio/src/io/util/async_seek_ext.rs @@ -37,6 +37,12 @@ cfg_io_util! { /// Creates a future which will seek an IO object, and then yield the /// new position in the object and the object itself. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn seek(&mut self, pos: SeekFrom) -> io::Result; + /// ``` + /// /// In the case of an error the buffer and the object will be discarded, with /// the error yielded. /// diff --git a/tokio/src/io/util/flush.rs b/tokio/src/io/util/flush.rs index 534a5160..88d60b86 100644 --- a/tokio/src/io/util/flush.rs +++ b/tokio/src/io/util/flush.rs @@ -1,18 +1,24 @@ use crate::io::AsyncWrite; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { /// A future used to fully flush an I/O object. /// /// Created by the [`AsyncWriteExt::flush`][flush] function. /// [flush]: crate::io::AsyncWriteExt::flush #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Flush<'a, A: ?Sized> { a: &'a mut A, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -21,7 +27,10 @@ pub(super) fn flush(a: &mut A) -> Flush<'_, A> where A: AsyncWrite + Unpin + ?Sized, { - Flush { a } + Flush { + a, + _pin: PhantomPinned, + } } impl Future for Flush<'_, A> @@ -30,19 +39,8 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let me = &mut *self; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); Pin::new(&mut *me.a).poll_flush(cx) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/util/read.rs b/tokio/src/io/util/read.rs index 28470d5a..edc9d5a9 100644 --- a/tokio/src/io/util/read.rs +++ b/tokio/src/io/util/read.rs @@ -1,7 +1,9 @@ use crate::io::{AsyncRead, ReadBuf}; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::marker::Unpin; use std::pin::Pin; use std::task::{Context, Poll}; @@ -15,10 +17,14 @@ pub(crate) fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R> where R: AsyncRead + Unpin + ?Sized, { - Read { reader, buf } + Read { + reader, + buf, + _pin: PhantomPinned, + } } -cfg_io_util! { +pin_project! { /// A future which can be used to easily read available number of bytes to fill /// a buffer. /// @@ -28,6 +34,9 @@ cfg_io_util! { pub struct Read<'a, R: ?Sized> { reader: &'a mut R, buf: &'a mut [u8], + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -37,21 +46,10 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = &mut *self; - let mut buf = ReadBuf::new(me.buf); - ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut buf))?; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); + let mut buf = ReadBuf::new(*me.buf); + ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?; Poll::Ready(Ok(buf.filled().len())) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/util/read_exact.rs b/tokio/src/io/util/read_exact.rs index 970074aa..1e8150eb 100644 --- a/tokio/src/io/util/read_exact.rs +++ b/tokio/src/io/util/read_exact.rs @@ -1,7 +1,9 @@ use crate::io::{AsyncRead, ReadBuf}; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::marker::Unpin; use std::pin::Pin; use std::task::{Context, Poll}; @@ -18,10 +20,11 @@ where ReadExact { reader, buf: ReadBuf::new(buf), + _pin: PhantomPinned, } } -cfg_io_util! { +pin_project! { /// Creates a future which will read exactly enough bytes to fill `buf`, /// returning an error if EOF is hit sooner. /// @@ -31,6 +34,9 @@ cfg_io_util! { pub struct ReadExact<'a, A: ?Sized> { reader: &'a mut A, buf: ReadBuf<'a>, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -44,30 +50,20 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + loop { // if our buffer is empty, then we need to read some data to continue. - let rem = self.buf.remaining(); + let rem = me.buf.remaining(); if rem != 0 { - let me = &mut *self; ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?; if me.buf.remaining() == rem { return Err(eof()).into(); } } else { - return Poll::Ready(Ok(self.buf.capacity())); + return Poll::Ready(Ok(me.buf.capacity())); } } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/util/read_int.rs b/tokio/src/io/util/read_int.rs index c3dbbd56..5b9fb7bf 100644 --- a/tokio/src/io/util/read_int.rs +++ b/tokio/src/io/util/read_int.rs @@ -5,6 +5,7 @@ use pin_project_lite::pin_project; use std::future::Future; use std::io; use std::io::ErrorKind::UnexpectedEof; +use std::marker::PhantomPinned; use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; @@ -16,11 +17,15 @@ macro_rules! reader { ($name:ident, $ty:ty, $reader:ident, $bytes:expr) => { pin_project! { #[doc(hidden)] + #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct $name { #[pin] src: R, buf: [u8; $bytes], read: u8, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -30,6 +35,7 @@ macro_rules! reader { src, buf: [0; $bytes], read: 0, + _pin: PhantomPinned, } } } @@ -77,15 +83,22 @@ macro_rules! reader8 { pin_project! { /// Future returned from `read_u8` #[doc(hidden)] + #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct $name { #[pin] reader: R, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } impl $name { pub(crate) fn new(reader: R) -> $name { - $name { reader } + $name { + reader, + _pin: PhantomPinned, + } } } diff --git a/tokio/src/io/util/read_line.rs b/tokio/src/io/util/read_line.rs index d1f66f38..d38ffaf2 100644 --- a/tokio/src/io/util/read_line.rs +++ b/tokio/src/io/util/read_line.rs @@ -1,27 +1,32 @@ use crate::io::util::read_until::read_until_internal; use crate::io::AsyncBufRead; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::mem; use std::pin::Pin; use std::string::FromUtf8Error; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { /// Future for the [`read_line`](crate::io::AsyncBufReadExt::read_line) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadLine<'a, R: ?Sized> { reader: &'a mut R, - /// This is the buffer we were provided. It will be replaced with an empty string - /// while reading to postpone utf-8 handling until after reading. + // This is the buffer we were provided. It will be replaced with an empty string + // while reading to postpone utf-8 handling until after reading. output: &'a mut String, - /// The actual allocation of the string is moved into this vector instead. + // The actual allocation of the string is moved into this vector instead. buf: Vec, - /// The number of bytes appended to buf. This can be less than buf.len() if - /// the buffer was not empty when the operation was started. + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. read: usize, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -34,6 +39,7 @@ where buf: mem::replace(string, String::new()).into_bytes(), output: string, read: 0, + _pin: PhantomPinned, } } @@ -105,25 +111,9 @@ pub(super) fn read_line_internal( impl Future for ReadLine<'_, R> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - output, - buf, - read, - } = &mut *self; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); - read_line_internal(Pin::new(reader), cx, output, buf, read) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); + read_line_internal(Pin::new(*me.reader), cx, me.output, me.buf, me.read) } } diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs index 609af28e..f4fbe631 100644 --- a/tokio/src/io/util/read_to_end.rs +++ b/tokio/src/io/util/read_to_end.rs @@ -1,20 +1,26 @@ use crate::io::{AsyncRead, ReadBuf}; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::mem::{self, MaybeUninit}; use std::pin::Pin; use std::task::{Context, Poll}; -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] -pub struct ReadToEnd<'a, R: ?Sized> { - reader: &'a mut R, - buf: &'a mut Vec, - /// The number of bytes appended to buf. This can be less than buf.len() if - /// the buffer was not empty when the operation was started. - read: usize, +pin_project! { + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ReadToEnd<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec) -> ReadToEnd<'a, R> @@ -25,6 +31,7 @@ where reader, buf: buffer, read: 0, + _pin: PhantomPinned, } } @@ -100,20 +107,9 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, buf, read } = &mut *self; - - read_to_end_internal(buf, Pin::new(*reader), read, cx) - } -} - -#[cfg(test)] -mod tests { - use super::*; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); + read_to_end_internal(me.buf, Pin::new(*me.reader), me.read, cx) } } diff --git a/tokio/src/io/util/read_to_string.rs b/tokio/src/io/util/read_to_string.rs index cf00e50d..e463203c 100644 --- a/tokio/src/io/util/read_to_string.rs +++ b/tokio/src/io/util/read_to_string.rs @@ -2,25 +2,30 @@ use crate::io::util::read_line::finish_string_read; use crate::io::util::read_to_end::read_to_end_internal; use crate::io::AsyncRead; +use pin_project_lite::pin_project; use std::future::Future; +use std::marker::PhantomPinned; use std::pin::Pin; use std::task::{Context, Poll}; use std::{io, mem}; -cfg_io_util! { +pin_project! { /// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadToString<'a, R: ?Sized> { reader: &'a mut R, - /// This is the buffer we were provided. It will be replaced with an empty string - /// while reading to postpone utf-8 handling until after reading. + // This is the buffer we were provided. It will be replaced with an empty string + // while reading to postpone utf-8 handling until after reading. output: &'a mut String, - /// The actual allocation of the string is moved into this vector instead. + // The actual allocation of the string is moved into this vector instead. buf: Vec, - /// The number of bytes appended to buf. This can be less than buf.len() if - /// the buffer was not empty when the operation was started. + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. read: usize, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -37,6 +42,7 @@ where buf, output: string, read: 0, + _pin: PhantomPinned, } } @@ -68,26 +74,10 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - buf, - output, - read, - } = &mut *self; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); // safety: The constructor of ReadToString called `prepare_buffer`. - unsafe { read_to_string_internal(Pin::new(*reader), output, buf, read, cx) } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); + unsafe { read_to_string_internal(Pin::new(*me.reader), me.output, me.buf, me.read, cx) } } } diff --git a/tokio/src/io/util/read_until.rs b/tokio/src/io/util/read_until.rs index 78dac8c2..3599cff3 100644 --- a/tokio/src/io/util/read_until.rs +++ b/tokio/src/io/util/read_until.rs @@ -1,12 +1,14 @@ use crate::io::AsyncBufRead; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method. /// The delimeter is included in the resulting vector. #[derive(Debug)] @@ -15,9 +17,12 @@ cfg_io_util! { reader: &'a mut R, delimeter: u8, buf: &'a mut Vec, - /// The number of bytes appended to buf. This can be less than buf.len() if - /// the buffer was not empty when the operation was started. + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. read: usize, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -34,6 +39,7 @@ where delimeter, buf, read: 0, + _pin: PhantomPinned, } } @@ -66,24 +72,8 @@ pub(super) fn read_until_internal( impl Future for ReadUntil<'_, R> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - delimeter, - buf, - read, - } = &mut *self; - read_until_internal(Pin::new(reader), cx, *delimeter, buf, read) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + read_until_internal(Pin::new(*me.reader), cx, *me.delimeter, me.buf, me.read) } } diff --git a/tokio/src/io/util/shutdown.rs b/tokio/src/io/util/shutdown.rs index 33ac0ac0..6d30b004 100644 --- a/tokio/src/io/util/shutdown.rs +++ b/tokio/src/io/util/shutdown.rs @@ -1,18 +1,24 @@ use crate::io::AsyncWrite; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { /// A future used to shutdown an I/O object. /// /// Created by the [`AsyncWriteExt::shutdown`][shutdown] function. /// [shutdown]: crate::io::AsyncWriteExt::shutdown + #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Shutdown<'a, A: ?Sized> { a: &'a mut A, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -21,7 +27,10 @@ pub(super) fn shutdown(a: &mut A) -> Shutdown<'_, A> where A: AsyncWrite + Unpin + ?Sized, { - Shutdown { a } + Shutdown { + a, + _pin: PhantomPinned, + } } impl Future for Shutdown<'_, A> @@ -30,19 +39,8 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let me = &mut *self; - Pin::new(&mut *me.a).poll_shutdown(cx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + Pin::new(me.a).poll_shutdown(cx) } } diff --git a/tokio/src/io/util/write.rs b/tokio/src/io/util/write.rs index 433a421d..92169ebc 100644 --- a/tokio/src/io/util/write.rs +++ b/tokio/src/io/util/write.rs @@ -1,17 +1,22 @@ use crate::io::AsyncWrite; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { /// A future to write some of the buffer to an `AsyncWrite`. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Write<'a, W: ?Sized> { writer: &'a mut W, buf: &'a [u8], + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -21,7 +26,11 @@ pub(crate) fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W> where W: AsyncWrite + Unpin + ?Sized, { - Write { writer, buf } + Write { + writer, + buf, + _pin: PhantomPinned, + } } impl Future for Write<'_, W> @@ -30,8 +39,8 @@ where { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = &mut *self; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); Pin::new(&mut *me.writer).poll_write(cx, me.buf) } } diff --git a/tokio/src/io/util/write_all.rs b/tokio/src/io/util/write_all.rs index 898006c5..e59d41e4 100644 --- a/tokio/src/io/util/write_all.rs +++ b/tokio/src/io/util/write_all.rs @@ -1,17 +1,22 @@ use crate::io::AsyncWrite; +use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { +pin_project! { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteAll<'a, W: ?Sized> { writer: &'a mut W, buf: &'a [u8], + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -19,7 +24,11 @@ pub(crate) fn write_all<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> WriteAll<'a, where W: AsyncWrite + Unpin + ?Sized, { - WriteAll { writer, buf } + WriteAll { + writer, + buf, + _pin: PhantomPinned, + } } impl Future for WriteAll<'_, W> @@ -28,13 +37,13 @@ where { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = &mut *self; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); while !me.buf.is_empty() { - let n = ready!(Pin::new(&mut me.writer).poll_write(cx, me.buf))?; + let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf))?; { - let (_, rest) = mem::replace(&mut me.buf, &[]).split_at(n); - me.buf = rest; + let (_, rest) = mem::replace(&mut *me.buf, &[]).split_at(n); + *me.buf = rest; } if n == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); @@ -44,14 +53,3 @@ where Poll::Ready(Ok(())) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/util/write_int.rs b/tokio/src/io/util/write_int.rs index ee992de1..13bc191e 100644 --- a/tokio/src/io/util/write_int.rs +++ b/tokio/src/io/util/write_int.rs @@ -4,6 +4,7 @@ use bytes::BufMut; use pin_project_lite::pin_project; use std::future::Future; use std::io; +use std::marker::PhantomPinned; use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; @@ -15,20 +16,25 @@ macro_rules! writer { ($name:ident, $ty:ty, $writer:ident, $bytes:expr) => { pin_project! { #[doc(hidden)] + #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct $name { #[pin] dst: W, buf: [u8; $bytes], written: u8, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } impl $name { pub(crate) fn new(w: W, value: $ty) -> Self { - let mut writer = $name { + let mut writer = Self { buf: [0; $bytes], written: 0, dst: w, + _pin: PhantomPinned, }; BufMut::$writer(&mut &mut writer.buf[..], value); writer @@ -72,16 +78,24 @@ macro_rules! writer8 { ($name:ident, $ty:ty) => { pin_project! { #[doc(hidden)] + #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct $name { #[pin] dst: W, byte: $ty, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } impl $name { pub(crate) fn new(dst: W, byte: $ty) -> Self { - Self { dst, byte } + Self { + dst, + byte, + _pin: PhantomPinned, + } } } diff --git a/tokio/src/stream/all.rs b/tokio/src/stream/all.rs index 615665d2..353d61a3 100644 --- a/tokio/src/stream/all.rs +++ b/tokio/src/stream/all.rs @@ -1,25 +1,34 @@ use crate::stream::Stream; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::task::{Context, Poll}; +use pin_project_lite::pin_project; -/// Future for the [`all`](super::StreamExt::all) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct AllFuture<'a, St: ?Sized, F> { - stream: &'a mut St, - f: F, +pin_project! { + /// Future for the [`all`](super::StreamExt::all) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct AllFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } impl<'a, St: ?Sized, F> AllFuture<'a, St, F> { pub(super) fn new(stream: &'a mut St, f: F) -> Self { - Self { stream, f } + Self { + stream, + f, + _pin: PhantomPinned, + } } } -impl Unpin for AllFuture<'_, St, F> {} - impl Future for AllFuture<'_, St, F> where St: ?Sized + Stream + Unpin, @@ -27,12 +36,13 @@ where { type Output = bool; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); match next { Some(v) => { - if !(&mut self.f)(v) { + if !(me.f)(v) { Poll::Ready(false) } else { cx.waker().wake_by_ref(); diff --git a/tokio/src/stream/any.rs b/tokio/src/stream/any.rs index f2ecad5e..aac0ec75 100644 --- a/tokio/src/stream/any.rs +++ b/tokio/src/stream/any.rs @@ -1,25 +1,34 @@ use crate::stream::Stream; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::task::{Context, Poll}; +use pin_project_lite::pin_project; -/// Future for the [`any`](super::StreamExt::any) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct AnyFuture<'a, St: ?Sized, F> { - stream: &'a mut St, - f: F, +pin_project! { + /// Future for the [`any`](super::StreamExt::any) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct AnyFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { pub(super) fn new(stream: &'a mut St, f: F) -> Self { - Self { stream, f } + Self { + stream, + f, + _pin: PhantomPinned, + } } } -impl Unpin for AnyFuture<'_, St, F> {} - impl Future for AnyFuture<'_, St, F> where St: ?Sized + Stream + Unpin, @@ -27,12 +36,13 @@ where { type Output = bool; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); match next { Some(v) => { - if (&mut self.f)(v) { + if (me.f)(v) { Poll::Ready(true) } else { cx.waker().wake_by_ref(); diff --git a/tokio/src/stream/collect.rs b/tokio/src/stream/collect.rs index a2b5169f..3f91a6f2 100644 --- a/tokio/src/stream/collect.rs +++ b/tokio/src/stream/collect.rs @@ -2,6 +2,7 @@ use crate::stream::Stream; use bytes::{Buf, BufMut, Bytes, BytesMut}; use core::future::Future; +use core::marker::PhantomPinned; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; @@ -10,7 +11,7 @@ use pin_project_lite::pin_project; // Do not export this struct until `FromStream` can be unsealed. pin_project! { /// Future returned by the [`collect`](super::StreamExt::collect) method. - #[must_use = "streams do nothing unless polled"] + #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Collect where @@ -20,6 +21,9 @@ pin_project! { #[pin] stream: T, collection: U::InternalCollection, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -44,7 +48,11 @@ where let (lower, upper) = stream.size_hint(); let collection = U::initialize(sealed::Internal, lower, upper); - Collect { stream, collection } + Collect { + stream, + collection, + _pin: PhantomPinned, + } } } diff --git a/tokio/src/stream/fold.rs b/tokio/src/stream/fold.rs index 7b9fead3..5cf2bfaf 100644 --- a/tokio/src/stream/fold.rs +++ b/tokio/src/stream/fold.rs @@ -1,6 +1,7 @@ use crate::stream::Stream; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -8,11 +9,15 @@ use pin_project_lite::pin_project; pin_project! { /// Future returned by the [`fold`](super::StreamExt::fold) method. #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct FoldFuture { #[pin] stream: St, acc: Option, f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, } } @@ -22,6 +27,7 @@ impl FoldFuture { stream, acc: Some(init), f, + _pin: PhantomPinned, } } } diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index ec48392b..308de3f9 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -572,6 +572,12 @@ pub trait StreamExt: Stream { /// Tests if every element of the stream matches a predicate. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn all(&mut self, f: F) -> bool; + /// ``` + /// /// `all()` takes a closure that returns `true` or `false`. It applies /// this closure to each element of the stream, and if they all return /// `true`, then so does `all`. If any of them return `false`, it @@ -627,6 +633,12 @@ pub trait StreamExt: Stream { /// Tests if any element of the stream matches a predicate. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn any(&mut self, f: F) -> bool; + /// ``` + /// /// `any()` takes a closure that returns `true` or `false`. It applies /// this closure to each element of the stream, and if any of them return /// `true`, then so does `any()`. If they all return `false`, it @@ -716,6 +728,12 @@ pub trait StreamExt: Stream { /// A combinator that applies a function to every element in a stream /// producing a single, final value. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fold(self, init: B, f: F) -> B; + /// ``` + /// /// # Examples /// Basic usage: /// ``` @@ -739,6 +757,12 @@ pub trait StreamExt: Stream { /// Drain stream pushing all emitted values into a collection. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn collect(self) -> T; + /// ``` + /// /// `collect` streams all values, awaiting as needed. Values are pushed into /// a collection. A number of different target collection types are /// supported, including [`Vec`](std::vec::Vec), @@ -871,6 +895,7 @@ pub trait StreamExt: Stream { { Timeout::new(self, duration) } + /// Slows down a stream by enforcing a delay between items. /// /// # Example diff --git a/tokio/src/stream/next.rs b/tokio/src/stream/next.rs index 3909c0c2..d9b1f920 100644 --- a/tokio/src/stream/next.rs +++ b/tokio/src/stream/next.rs @@ -1,28 +1,37 @@ use crate::stream::Stream; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::task::{Context, Poll}; +use pin_project_lite::pin_project; -/// Future for the [`next`](super::StreamExt::next) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Next<'a, St: ?Sized> { - stream: &'a mut St, +pin_project! { + /// Future for the [`next`](super::StreamExt::next) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } -impl Unpin for Next<'_, St> {} - impl<'a, St: ?Sized> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { - Next { stream } + Next { + stream, + _pin: PhantomPinned, + } } } impl Future for Next<'_, St> { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.stream).poll_next(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + Pin::new(me.stream).poll_next(cx) } } diff --git a/tokio/src/stream/try_next.rs b/tokio/src/stream/try_next.rs index 59e0eb1a..b21d279a 100644 --- a/tokio/src/stream/try_next.rs +++ b/tokio/src/stream/try_next.rs @@ -1,22 +1,29 @@ use crate::stream::{Next, Stream}; use core::future::Future; +use core::marker::PhantomPinned; use core::pin::Pin; use core::task::{Context, Poll}; +use pin_project_lite::pin_project; -/// Future for the [`try_next`](super::StreamExt::try_next) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TryNext<'a, St: ?Sized> { - inner: Next<'a, St>, +pin_project! { + /// Future for the [`try_next`](super::StreamExt::try_next) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryNext<'a, St: ?Sized> { + #[pin] + inner: Next<'a, St>, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } -impl Unpin for TryNext<'_, St> {} - impl<'a, St: ?Sized> TryNext<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { inner: Next::new(stream), + _pin: PhantomPinned, } } } @@ -24,7 +31,8 @@ impl<'a, St: ?Sized> TryNext<'a, St> { impl> + Unpin> Future for TryNext<'_, St> { type Output = Result, E>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.inner).poll(cx).map(Option::transpose) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + me.inner.poll(cx).map(Option::transpose) } } -- cgit v1.2.3