From 6b6e76080afc92450238df69c4edc12ee5f7518d Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 12 Jun 2020 19:49:39 +0900 Subject: chore: reduce pin related unsafe code (#2613) --- tokio-test/src/task.rs | 14 ++------------ tokio/src/runtime/blocking/task.rs | 7 +++++-- tokio/src/runtime/enter.rs | 14 ++++---------- tokio/src/time/timeout.rs | 39 +++++++++++++++++++------------------- tokio/tests/rt_basic.rs | 16 +++++++++------- tokio/tests/tcp_accept.rs | 16 +++++++++------- 6 files changed, 48 insertions(+), 58 deletions(-) diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index 82d29134..728117cc 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -46,21 +46,11 @@ const SLEEP: usize = 2; impl Spawn { /// Consumes `self` returning the inner value - pub fn into_inner(mut self) -> T + pub fn into_inner(self) -> T where T: Unpin, { - drop(self.task); - - // Pin::into_inner is unstable, so we work around it - // - // Safety: `T` is bound by `Unpin`. - unsafe { - let ptr = Pin::get_mut(self.future.as_mut()) as *mut T; - let future = Box::from_raw(ptr); - mem::forget(self.future); - *future - } + *Pin::into_inner(self.future) } /// Returns `true` if the inner future has received a wake notification diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index e0ae6e4e..a521af46 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -14,14 +14,17 @@ impl BlockingTask { } } +// The closure `F` is never pinned +impl Unpin for BlockingTask {} + impl Future for BlockingTask where T: FnOnce() -> R, { type Output = R; - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - let me = unsafe { self.get_unchecked_mut() }; + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let me = &mut *self; let func = me .func .take() diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index ad5580cc..56a7c57b 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -142,12 +142,11 @@ cfg_block_on! { impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. - pub(crate) fn block_on(&mut self, mut f: F) -> Result + pub(crate) fn block_on(&mut self, f: F) -> Result where F: std::future::Future, { use crate::park::{CachedParkThread, Park}; - use std::pin::Pin; use std::task::Context; use std::task::Poll::Ready; @@ -155,9 +154,7 @@ cfg_block_on! { let waker = park.get_unpark()?.into_waker(); let mut cx = Context::from_waker(&waker); - // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can - // no longer be accessed, making the pinning safe. - let mut f = unsafe { Pin::new_unchecked(&mut f) }; + pin!(f); loop { if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { @@ -179,12 +176,11 @@ cfg_blocking_impl! { /// /// If the future completes before `timeout`, the result is returned. If /// `timeout` elapses, then `Err` is returned. - pub(crate) fn block_on_timeout(&mut self, mut f: F, timeout: Duration) -> Result + pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result where F: std::future::Future, { use crate::park::{CachedParkThread, Park}; - use std::pin::Pin; use std::task::Context; use std::task::Poll::Ready; use std::time::Instant; @@ -193,9 +189,7 @@ cfg_blocking_impl! { let waker = park.get_unpark()?.into_waker(); let mut cx = Context::from_waker(&waker); - // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can - // no longer be accessed, making the pinning safe. - let mut f = unsafe { Pin::new_unchecked(&mut f) }; + pin!(f); let when = Instant::now() + timeout; loop { diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 401856a8..efc3dc5c 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -6,6 +6,7 @@ use crate::time::{delay_until, Delay, Duration, Instant}; +use pin_project_lite::pin_project; use std::fmt; use std::future::Future; use std::pin::Pin; @@ -99,12 +100,16 @@ where } } -/// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at). -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] -pub struct Timeout { - value: T, - delay: Delay, +pin_project! { + /// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at). + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Timeout { + #[pin] + value: T, + #[pin] + delay: Delay, + } } /// Error returned by `Timeout`. @@ -146,24 +151,18 @@ where { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // First, try polling the future + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let me = self.project(); - // Safety: we never move `self.value` - unsafe { - let p = self.as_mut().map_unchecked_mut(|me| &mut me.value); - if let Poll::Ready(v) = p.poll(cx) { - return Poll::Ready(Ok(v)); - } + // First, try polling the future + if let Poll::Ready(v) = me.value.poll(cx) { + return Poll::Ready(Ok(v)); } // Now check the timer - // Safety: X_X! - unsafe { - match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) { - Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))), - Poll::Pending => Poll::Pending, - } + match me.delay.poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))), + Poll::Pending => Poll::Pending, } } } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 9a1a432e..0885992d 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -29,6 +29,7 @@ fn spawned_task_does_not_progress_without_block_on() { #[test] fn no_extra_poll() { + use pin_project_lite::pin_project; use std::pin::Pin; use std::sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, @@ -37,9 +38,12 @@ fn no_extra_poll() { use std::task::{Context, Poll}; use tokio::stream::{Stream, StreamExt}; - struct TrackPolls { - npolls: Arc, - s: S, + pin_project! { + struct TrackPolls { + npolls: Arc, + #[pin] + s: S, + } } impl Stream for TrackPolls @@ -48,11 +52,9 @@ fn no_extra_poll() { { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // safety: we do not move s - let this = unsafe { self.get_unchecked_mut() }; + let this = self.project(); this.npolls.fetch_add(1, SeqCst); - // safety: we are pinned, and so is s - unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx) + this.s.poll_next(cx) } } diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs index f7ccd7f4..9f5b4414 100644 --- a/tokio/tests/tcp_accept.rs +++ b/tokio/tests/tcp_accept.rs @@ -39,6 +39,7 @@ test_accept! { (ip_port_tuple, ("127.0.0.1".parse::().unwrap(), 0)), } +use pin_project_lite::pin_project; use std::pin::Pin; use std::sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, @@ -47,9 +48,12 @@ use std::sync::{ use std::task::{Context, Poll}; use tokio::stream::{Stream, StreamExt}; -struct TrackPolls { - npolls: Arc, - s: S, +pin_project! { + struct TrackPolls { + npolls: Arc, + #[pin] + s: S, + } } impl Stream for TrackPolls @@ -58,11 +62,9 @@ where { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // safety: we do not move s - let this = unsafe { self.get_unchecked_mut() }; + let this = self.project(); this.npolls.fetch_add(1, SeqCst); - // safety: we are pinned, and so is s - unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx) + this.s.poll_next(cx) } } -- cgit v1.2.3