summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTaiki Endo <te316e89@gmail.com>2020-10-06 02:32:11 +0900
committerGitHub <noreply@github.com>2020-10-05 10:32:11 -0700
commitc23c1ecbcbd7ff8e1ee137f691eddad31aa39331 (patch)
tree75796657a11bbe4f71152383e8d1103cdc1c2386
parent561a71ad63e5e9fde7900c700596952372e4c5ac (diff)
io, stream: make ext trait futures !Unpin (#2910)
Make these future `!Unpin` for compatibility with async trait methods.
-rw-r--r--tokio/src/io/seek.rs32
-rw-r--r--tokio/src/io/util/async_seek_ext.rs6
-rw-r--r--tokio/src/io/util/flush.rs28
-rw-r--r--tokio/src/io/util/read.rs32
-rw-r--r--tokio/src/io/util/read_exact.rs28
-rw-r--r--tokio/src/io/util/read_int.rs15
-rw-r--r--tokio/src/io/util/read_line.rs40
-rw-r--r--tokio/src/io/util/read_to_end.rs42
-rw-r--r--tokio/src/io/util/read_to_string.rs40
-rw-r--r--tokio/src/io/util/read_until.rs34
-rw-r--r--tokio/src/io/util/shutdown.rs30
-rw-r--r--tokio/src/io/util/write.rs17
-rw-r--r--tokio/src/io/util/write_all.rs34
-rw-r--r--tokio/src/io/util/write_int.rs18
-rw-r--r--tokio/src/stream/all.rs34
-rw-r--r--tokio/src/stream/any.rs34
-rw-r--r--tokio/src/stream/collect.rs12
-rw-r--r--tokio/src/stream/fold.rs6
-rw-r--r--tokio/src/stream/mod.rs25
-rw-r--r--tokio/src/stream/next.rs29
-rw-r--r--tokio/src/stream/try_next.rs26
21 files changed, 315 insertions, 247 deletions
diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs
index 8f071167..c90330a5 100644
--- a/tokio/src/io/seek.rs
+++ b/tokio/src/io/seek.rs
@@ -1,16 +1,22 @@
use crate::io::AsyncSeek;
+
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io::{self, SeekFrom};
+use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Seek<'a, S: ?Sized> {
seek: &'a mut S,
pos: Option<SeekFrom>,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -21,6 +27,7 @@ where
Seek {
seek,
pos: Some(pos),
+ _pin: PhantomPinned,
}
}
@@ -30,29 +37,18 @@ where
{
type Output = io::Result<u64>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let me = &mut *self;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
match me.pos {
- Some(pos) => match Pin::new(&mut me.seek).start_seek(cx, pos) {
+ Some(pos) => match Pin::new(&mut *me.seek).start_seek(cx, *pos) {
Poll::Ready(Ok(())) => {
- me.pos = None;
- Pin::new(&mut me.seek).poll_complete(cx)
+ *me.pos = None;
+ Pin::new(&mut *me.seek).poll_complete(cx)
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
},
- None => Pin::new(&mut me.seek).poll_complete(cx),
+ None => Pin::new(&mut *me.seek).poll_complete(cx),
}
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<Seek<'_, PhantomPinned>>();
- }
-}
diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs
index 35bc94ee..351900bd 100644
--- a/tokio/src/io/util/async_seek_ext.rs
+++ b/tokio/src/io/util/async_seek_ext.rs
@@ -37,6 +37,12 @@ cfg_io_util! {
/// Creates a future which will seek an IO object, and then yield the
/// new position in the object and the object itself.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn seek(&mut self, pos: SeekFrom) -> io::Result<u64>;
+ /// ```
+ ///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded.
///
diff --git a/tokio/src/io/util/flush.rs b/tokio/src/io/util/flush.rs
index 534a5160..88d60b86 100644
--- a/tokio/src/io/util/flush.rs
+++ b/tokio/src/io/util/flush.rs
@@ -1,18 +1,24 @@
use crate::io::AsyncWrite;
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
/// A future used to fully flush an I/O object.
///
/// Created by the [`AsyncWriteExt::flush`][flush] function.
/// [flush]: crate::io::AsyncWriteExt::flush
#[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flush<'a, A: ?Sized> {
a: &'a mut A,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -21,7 +27,10 @@ pub(super) fn flush<A>(a: &mut A) -> Flush<'_, A>
where
A: AsyncWrite + Unpin + ?Sized,
{
- Flush { a }
+ Flush {
+ a,
+ _pin: PhantomPinned,
+ }
}
impl<A> Future for Flush<'_, A>
@@ -30,19 +39,8 @@ where
{
type Output = io::Result<()>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let me = &mut *self;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
Pin::new(&mut *me.a).poll_flush(cx)
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<Flush<'_, PhantomPinned>>();
- }
-}
diff --git a/tokio/src/io/util/read.rs b/tokio/src/io/util/read.rs
index 28470d5a..edc9d5a9 100644
--- a/tokio/src/io/util/read.rs
+++ b/tokio/src/io/util/read.rs
@@ -1,7 +1,9 @@
use crate::io::{AsyncRead, ReadBuf};
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -15,10 +17,14 @@ pub(crate) fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
- Read { reader, buf }
+ Read {
+ reader,
+ buf,
+ _pin: PhantomPinned,
+ }
}
-cfg_io_util! {
+pin_project! {
/// A future which can be used to easily read available number of bytes to fill
/// a buffer.
///
@@ -28,6 +34,9 @@ cfg_io_util! {
pub struct Read<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -37,21 +46,10 @@ where
{
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
- let me = &mut *self;
- let mut buf = ReadBuf::new(me.buf);
- ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut buf))?;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
+ let me = self.project();
+ let mut buf = ReadBuf::new(*me.buf);
+ ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<Read<'_, PhantomPinned>>();
- }
-}
diff --git a/tokio/src/io/util/read_exact.rs b/tokio/src/io/util/read_exact.rs
index 970074aa..1e8150eb 100644
--- a/tokio/src/io/util/read_exact.rs
+++ b/tokio/src/io/util/read_exact.rs
@@ -1,7 +1,9 @@
use crate::io::{AsyncRead, ReadBuf};
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -18,10 +20,11 @@ where
ReadExact {
reader,
buf: ReadBuf::new(buf),
+ _pin: PhantomPinned,
}
}
-cfg_io_util! {
+pin_project! {
/// Creates a future which will read exactly enough bytes to fill `buf`,
/// returning an error if EOF is hit sooner.
///
@@ -31,6 +34,9 @@ cfg_io_util! {
pub struct ReadExact<'a, A: ?Sized> {
reader: &'a mut A,
buf: ReadBuf<'a>,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -44,30 +50,20 @@ where
{
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
+ let mut me = self.project();
+
loop {
// if our buffer is empty, then we need to read some data to continue.
- let rem = self.buf.remaining();
+ let rem = me.buf.remaining();
if rem != 0 {
- let me = &mut *self;
ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?;
if me.buf.remaining() == rem {
return Err(eof()).into();
}
} else {
- return Poll::Ready(Ok(self.buf.capacity()));
+ return Poll::Ready(Ok(me.buf.capacity()));
}
}
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<ReadExact<'_, PhantomPinned>>();
- }
-}
diff --git a/tokio/src/io/util/read_int.rs b/tokio/src/io/util/read_int.rs
index c3dbbd56..5b9fb7bf 100644
--- a/tokio/src/io/util/read_int.rs
+++ b/tokio/src/io/util/read_int.rs
@@ -5,6 +5,7 @@ use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::io::ErrorKind::UnexpectedEof;
+use std::marker::PhantomPinned;
use std::mem::size_of;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -16,11 +17,15 @@ macro_rules! reader {
($name:ident, $ty:ty, $reader:ident, $bytes:expr) => {
pin_project! {
#[doc(hidden)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct $name<R> {
#[pin]
src: R,
buf: [u8; $bytes],
read: u8,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -30,6 +35,7 @@ macro_rules! reader {
src,
buf: [0; $bytes],
read: 0,
+ _pin: PhantomPinned,
}
}
}
@@ -77,15 +83,22 @@ macro_rules! reader8 {
pin_project! {
/// Future returned from `read_u8`
#[doc(hidden)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct $name<R> {
#[pin]
reader: R,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
impl<R> $name<R> {
pub(crate) fn new(reader: R) -> $name<R> {
- $name { reader }
+ $name {
+ reader,
+ _pin: PhantomPinned,
+ }
}
}
diff --git a/tokio/src/io/util/read_line.rs b/tokio/src/io/util/read_line.rs
index d1f66f38..d38ffaf2 100644
--- a/tokio/src/io/util/read_line.rs
+++ b/tokio/src/io/util/read_line.rs
@@ -1,27 +1,32 @@
use crate::io::util::read_until::read_until_internal;
use crate::io::AsyncBufRead;
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::mem;
use std::pin::Pin;
use std::string::FromUtf8Error;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
/// Future for the [`read_line`](crate::io::AsyncBufReadExt::read_line) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadLine<'a, R: ?Sized> {
reader: &'a mut R,
- /// This is the buffer we were provided. It will be replaced with an empty string
- /// while reading to postpone utf-8 handling until after reading.
+ // This is the buffer we were provided. It will be replaced with an empty string
+ // while reading to postpone utf-8 handling until after reading.
output: &'a mut String,
- /// The actual allocation of the string is moved into this vector instead.
+ // The actual allocation of the string is moved into this vector instead.
buf: Vec<u8>,
- /// The number of bytes appended to buf. This can be less than buf.len() if
- /// the buffer was not empty when the operation was started.
+ // The number of bytes appended to buf. This can be less than buf.len() if
+ // the buffer was not empty when the operation was started.
read: usize,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -34,6 +39,7 @@ where
buf: mem::replace(string, String::new()).into_bytes(),
output: string,
read: 0,
+ _pin: PhantomPinned,
}
}
@@ -105,25 +111,9 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let Self {
- reader,
- output,
- buf,
- read,
- } = &mut *self;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
- read_line_internal(Pin::new(reader), cx, output, buf, read)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<ReadLine<'_, PhantomPinned>>();
+ read_line_internal(Pin::new(*me.reader), cx, me.output, me.buf, me.read)
}
}
diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs
index 609af28e..f4fbe631 100644
--- a/tokio/src/io/util/read_to_end.rs
+++ b/tokio/src/io/util/read_to_end.rs
@@ -1,20 +1,26 @@
use crate::io::{AsyncRead, ReadBuf};
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
-pub struct ReadToEnd<'a, R: ?Sized> {
- reader: &'a mut R,
- buf: &'a mut Vec<u8>,
- /// The number of bytes appended to buf. This can be less than buf.len() if
- /// the buffer was not empty when the operation was started.
- read: usize,
+pin_project! {
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct ReadToEnd<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ // The number of bytes appended to buf. This can be less than buf.len() if
+ // the buffer was not empty when the operation was started.
+ read: usize,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
}
pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
@@ -25,6 +31,7 @@ where
reader,
buf: buffer,
read: 0,
+ _pin: PhantomPinned,
}
}
@@ -100,20 +107,9 @@ where
{
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let Self { reader, buf, read } = &mut *self;
-
- read_to_end_internal(buf, Pin::new(*reader), read, cx)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<ReadToEnd<'_, PhantomPinned>>();
+ read_to_end_internal(me.buf, Pin::new(*me.reader), me.read, cx)
}
}
diff --git a/tokio/src/io/util/read_to_string.rs b/tokio/src/io/util/read_to_string.rs
index cf00e50d..e463203c 100644
--- a/tokio/src/io/util/read_to_string.rs
+++ b/tokio/src/io/util/read_to_string.rs
@@ -2,25 +2,30 @@ use crate::io::util::read_line::finish_string_read;
use crate::io::util::read_to_end::read_to_end_internal;
use crate::io::AsyncRead;
+use pin_project_lite::pin_project;
use std::future::Future;
+use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{io, mem};
-cfg_io_util! {
+pin_project! {
/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToString<'a, R: ?Sized> {
reader: &'a mut R,
- /// This is the buffer we were provided. It will be replaced with an empty string
- /// while reading to postpone utf-8 handling until after reading.
+ // This is the buffer we were provided. It will be replaced with an empty string
+ // while reading to postpone utf-8 handling until after reading.
output: &'a mut String,
- /// The actual allocation of the string is moved into this vector instead.
+ // The actual allocation of the string is moved into this vector instead.
buf: Vec<u8>,
- /// The number of bytes appended to buf. This can be less than buf.len() if
- /// the buffer was not empty when the operation was started.
+ // The number of bytes appended to buf. This can be less than buf.len() if
+ // the buffer was not empty when the operation was started.
read: usize,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -37,6 +42,7 @@ where
buf,
output: string,
read: 0,
+ _pin: PhantomPinned,
}
}
@@ -68,26 +74,10 @@ where
{
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let Self {
- reader,
- buf,
- output,
- read,
- } = &mut *self;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
// safety: The constructor of ReadToString called `prepare_buffer`.
- unsafe { read_to_string_internal(Pin::new(*reader), output, buf, read, cx) }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<ReadToString<'_, PhantomPinned>>();
+ unsafe { read_to_string_internal(Pin::new(*me.reader), me.output, me.buf, me.read, cx) }
}
}
diff --git a/tokio/src/io/util/read_until.rs b/tokio/src/io/util/read_until.rs
index 78dac8c2..3599cff3 100644
--- a/tokio/src/io/util/read_until.rs
+++ b/tokio/src/io/util/read_until.rs
@@ -1,12 +1,14 @@
use crate::io::AsyncBufRead;
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
/// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
/// The delimeter is included in the resulting vector.
#[derive(Debug)]
@@ -15,9 +17,12 @@ cfg_io_util! {
reader: &'a mut R,
delimeter: u8,
buf: &'a mut Vec<u8>,
- /// The number of bytes appended to buf. This can be less than buf.len() if
- /// the buffer was not empty when the operation was started.
+ // The number of bytes appended to buf. This can be less than buf.len() if
+ // the buffer was not empty when the operation was started.
read: usize,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -34,6 +39,7 @@ where
delimeter,
buf,
read: 0,
+ _pin: PhantomPinned,
}
}
@@ -66,24 +72,8 @@ pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let Self {
- reader,
- delimeter,
- buf,
- read,
- } = &mut *self;
- read_until_internal(Pin::new(reader), cx, *delimeter, buf, read)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<ReadUntil<'_, PhantomPinned>>();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ read_until_internal(Pin::new(*me.reader), cx, *me.delimeter, me.buf, me.read)
}
}
diff --git a/tokio/src/io/util/shutdown.rs b/tokio/src/io/util/shutdown.rs
index 33ac0ac0..6d30b004 100644
--- a/tokio/src/io/util/shutdown.rs
+++ b/tokio/src/io/util/shutdown.rs
@@ -1,18 +1,24 @@
use crate::io::AsyncWrite;
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
/// A future used to shutdown an I/O object.
///
/// Created by the [`AsyncWriteExt::shutdown`][shutdown] function.
/// [shutdown]: crate::io::AsyncWriteExt::shutdown
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Shutdown<'a, A: ?Sized> {
a: &'a mut A,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -21,7 +27,10 @@ pub(super) fn shutdown<A>(a: &mut A) -> Shutdown<'_, A>
where
A: AsyncWrite + Unpin + ?Sized,
{
- Shutdown { a }
+ Shutdown {
+ a,
+ _pin: PhantomPinned,
+ }
}
impl<A> Future for Shutdown<'_, A>
@@ -30,19 +39,8 @@ where
{
type Output = io::Result<()>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let me = &mut *self;
- Pin::new(&mut *me.a).poll_shutdown(cx)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<Shutdown<'_, PhantomPinned>>();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ Pin::new(me.a).poll_shutdown(cx)
}
}
diff --git a/tokio/src/io/util/write.rs b/tokio/src/io/util/write.rs
index 433a421d..92169ebc 100644
--- a/tokio/src/io/util/write.rs
+++ b/tokio/src/io/util/write.rs
@@ -1,17 +1,22 @@
use crate::io::AsyncWrite;
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
/// A future to write some of the buffer to an `AsyncWrite`.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Write<'a, W: ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -21,7 +26,11 @@ pub(crate) fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W>
where
W: AsyncWrite + Unpin + ?Sized,
{
- Write { writer, buf }
+ Write {
+ writer,
+ buf,
+ _pin: PhantomPinned,
+ }
}
impl<W> Future for Write<'_, W>
@@ -30,8 +39,8 @@ where
{
type Output = io::Result<usize>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
- let me = &mut *self;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
+ let me = self.project();
Pin::new(&mut *me.writer).poll_write(cx, me.buf)
}
}
diff --git a/tokio/src/io/util/write_all.rs b/tokio/src/io/util/write_all.rs
index 898006c5..e59d41e4 100644
--- a/tokio/src/io/util/write_all.rs
+++ b/tokio/src/io/util/write_all.rs
@@ -1,17 +1,22 @@
use crate::io::AsyncWrite;
+use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_io_util! {
+pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAll<'a, W: ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
@@ -19,7 +24,11 @@ pub(crate) fn write_all<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> WriteAll<'a,
where
W: AsyncWrite + Unpin + ?Sized,
{
- WriteAll { writer, buf }
+ WriteAll {
+ writer,
+ buf,
+ _pin: PhantomPinned,
+ }
}
impl<W> Future for WriteAll<'_, W>
@@ -28,13 +37,13 @@ where
{
type Output = io::Result<()>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- let me = &mut *self;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let me = self.project();
while !me.buf.is_empty() {
- let n = ready!(Pin::new(&mut me.writer).poll_write(cx, me.buf))?;
+ let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf))?;
{
- let (_, rest) = mem::replace(&mut me.buf, &[]).split_at(n);
- me.buf = rest;
+ let (_, rest) = mem::replace(&mut *me.buf, &[]).split_at(n);
+ *me.buf = rest;
}
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
@@ -44,14 +53,3 @@ where
Poll::Ready(Ok(()))
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn assert_unpin() {
- use std::marker::PhantomPinned;
- crate::is_unpin::<WriteAll<'_, PhantomPinned>>();
- }
-}
diff --git a/tokio/src/io/util/write_int.rs b/tokio/src/io/util/write_int.rs
index ee992de1..13bc191e 100644
--- a/tokio/src/io/util/write_int.rs
+++ b/tokio/src/io/util/write_int.rs
@@ -4,6 +4,7 @@ use bytes::BufMut;
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
+use std::marker::PhantomPinned;
use std::mem::size_of;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -15,20 +16,25 @@ macro_rules! writer {
($name:ident, $ty:ty, $writer:ident, $bytes:expr) => {
pin_project! {
#[doc(hidden)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct $name<W> {
#[pin]
dst: W,
buf: [u8; $bytes],
written: u8,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
}
}
impl<W> $name<W> {
pub(crate) fn new(w: W, value