From 06c473e62842d257ed275497ce906710ea3f8e19 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 24 Jun 2019 12:34:30 -0700 Subject: Update Tokio to use `std::future`. (#1120) A first pass at updating Tokio to use `std::future`. Implementations of `Future` from the futures crate are updated to implement `Future` from std. Implementations of `Stream` are moved to a feature flag. This commits disables a number of crates that have not yet been updated. --- tokio-sync/Cargo.toml | 14 +- tokio-sync/src/lib.rs | 13 ++ tokio-sync/src/lock.rs | 14 +- tokio-sync/src/loom.rs | 3 +- tokio-sync/src/mpsc/bounded.rs | 65 ++++--- tokio-sync/src/mpsc/chan.rs | 46 ++--- tokio-sync/src/mpsc/unbounded.rs | 44 +++-- tokio-sync/src/oneshot.rs | 90 ++++----- tokio-sync/src/semaphore.rs | 73 +++---- tokio-sync/src/task/atomic_task.rs | 336 -------------------------------- tokio-sync/src/task/atomic_waker.rs | 317 ++++++++++++++++++++++++++++++ tokio-sync/src/task/mod.rs | 4 +- tokio-sync/src/watch.rs | 108 ++++++----- tokio-sync/tests/atomic_task.rs | 52 ----- tokio-sync/tests/atomic_waker.rs | 37 ++++ tokio-sync/tests/errors.rs | 2 - tokio-sync/tests/fuzz_atomic_task.rs | 54 ------ tokio-sync/tests/fuzz_atomic_waker.rs | 53 +++++ tokio-sync/tests/fuzz_mpsc.rs | 7 +- tokio-sync/tests/fuzz_oneshot.rs | 65 +++++-- tokio-sync/tests/fuzz_semaphore.rs | 63 +++--- tokio-sync/tests/lock.rs | 73 +++---- tokio-sync/tests/mpsc.rs | 351 ++++++++++++++++++++-------------- tokio-sync/tests/oneshot.rs | 160 ++++++---------- tokio-sync/tests/semaphore.rs | 102 ++++------ tokio-sync/tests/watch.rs | 195 +++++++++++-------- 26 files changed, 1199 insertions(+), 1142 deletions(-) delete mode 100644 tokio-sync/src/task/atomic_task.rs create mode 100644 tokio-sync/src/task/atomic_waker.rs delete mode 100644 tokio-sync/tests/atomic_task.rs create mode 100644 tokio-sync/tests/atomic_waker.rs delete mode 100644 tokio-sync/tests/fuzz_atomic_task.rs create mode 100644 tokio-sync/tests/fuzz_atomic_waker.rs (limited to 'tokio-sync') diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index 7217b8a1..aedd4c62 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -21,12 +21,18 @@ Synchronization utilities. categories = ["asynchronous"] publish = false +[features] +async-traits = ["async-sink", "futures-core-preview"] + [dependencies] fnv = "1.0.6" -futures = "0.1.19" +async-sink = { git = "https://github.com/tokio-rs/async", optional = true } +futures-core-preview = { version = "0.3.0-alpha.16", optional = true } [dev-dependencies] +async-util = { git = "https://github.com/tokio-rs/async" } env_logger = { version = "0.5", default-features = false } -tokio = { version = "0.2.0", path = "../tokio" } -tokio-mock-task = "0.1.1" -loom = { version = "0.1.1", features = ["futures"] } +pin-utils = "0.1.0-alpha.4" +# tokio = { version = "0.2.0", path = "../tokio" } +tokio-test = { version = "0.2.0", path = "../tokio-test" } +loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] } diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index 1131899c..67903ace 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -20,6 +20,19 @@ macro_rules! debug { } } +/// Unwrap a ready value or propagate `Poll::Pending`. +#[macro_export] +macro_rules! ready { + ($e:expr) => {{ + use std::task::Poll::{Pending, Ready}; + + match $e { + Ready(v) => v, + Pending => return Pending, + } + }}; +} + macro_rules! if_fuzz { ($($t:tt)*) => {{ if false { $($t)* } diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs index 3cfa1beb..3b8ce36f 100644 --- a/tokio-sync/src/lock.rs +++ b/tokio-sync/src/lock.rs @@ -41,11 +41,13 @@ //! [`LockGuard`]: struct.LockGuard.html use crate::semaphore; -use futures::Async; + use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use std::task::Poll::Ready; +use std::task::{Context, Poll}; /// An asynchronous mutual exclusion primitive useful for protecting shared data /// @@ -103,14 +105,12 @@ impl Lock { /// Try to acquire the lock. /// /// If the lock is already held, the current task is notified when it is released. - pub fn poll_lock(&mut self) -> Async> { - if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| { + pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll> { + ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() - }) { - return Async::NotReady; - } + }); // We want to move the acquired permit into the guard, // and leave an unacquired one in self. @@ -118,7 +118,7 @@ impl Lock { inner: self.inner.clone(), permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()), }; - Async::Ready(LockGuard(acquired)) + Ready(LockGuard(acquired)) } } diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs index c46a5f54..92a21e5b 100644 --- a/tokio-sync/src/loom.rs +++ b/tokio-sync/src/loom.rs @@ -1,6 +1,5 @@ pub(crate) mod futures { - pub(crate) use crate::task::AtomicTask; - pub(crate) use futures::task; + pub(crate) use crate::task::AtomicWaker; } pub(crate) mod sync { diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index b2fce168..7f8cb905 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -1,6 +1,10 @@ use super::chan; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `Receiver`. /// @@ -127,6 +131,11 @@ impl Receiver { Receiver { chan } } + /// TODO: Dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -136,12 +145,12 @@ impl Receiver { } } -impl Stream for Receiver { +#[cfg(feature = "async-traits")] +impl futures_core::Stream for Receiver { type Item = T; - type Error = RecvError; - fn poll(&mut self) -> Poll, Self::Error> { - self.chan.recv().map_err(|_| RecvError(())) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Receiver::poll_next(self.get_mut(), cx) } } @@ -165,13 +174,13 @@ impl Sender { /// /// This method returns: /// - /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message. - /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which + /// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message. + /// - `Poll::Pending` if the channel may not have capacity, in which /// case the current task is queued to be notified once /// capacity is available; - /// - `Err(SendError)` if the receiver has been dropped. - pub fn poll_ready(&mut self) -> Poll<(), SendError> { - self.chan.poll_ready().map_err(|_| SendError(())) + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.poll_ready(cx).map_err(|_| SendError(())) } /// Attempts to send a message on this `Sender`, returning the message @@ -182,31 +191,29 @@ impl Sender { } } -impl Sink for Sender { - type SinkItem = T; - type SinkError = SendError; +#[cfg(feature = "async-traits")] +impl async_sink::Sink for Sender { + type Error = SendError; - fn start_send(&mut self, msg: T) -> StartSend { - use futures::Async::*; - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sender::poll_ready(self.get_mut(), cx) + } - match self.poll_ready()? { - Ready(_) => { - self.try_send(msg).map_err(|_| SendError(()))?; - Ok(AsyncSink::Ready) - } - NotReady => Ok(AsyncSink::NotReady(msg)), - } + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.as_mut() + .try_send(msg) + .map_err(|err| { + assert!(err.is_full(), "call `poll_ready` before sending"); + SendError(()) + }) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs index fae3159a..93e62824 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio-sync/src/mpsc/chan.rs @@ -1,13 +1,14 @@ use super::list; use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::atomic::AtomicUsize, sync::{Arc, CausalCell}, }; -use futures::Poll; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Channel sender pub(crate) struct Tx { @@ -61,7 +62,8 @@ pub(crate) trait Semaphore { fn add_permit(&self); - fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>; + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit) + -> Poll>; fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; @@ -81,8 +83,8 @@ struct Chan { /// Coordinates access to channel's capacity. semaphore: S, - /// Receiver task. Notified when a value is pushed into the channel. - rx_task: AtomicTask, + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, /// Tracks the number of outstanding sender handles. /// @@ -101,7 +103,7 @@ where fmt.debug_struct("Chan") .field("tx", &self.tx) .field("semaphore", &self.semaphore) - .field("rx_task", &self.rx_task) + .field("rx_waker", &self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() @@ -138,7 +140,7 @@ where let chan = Arc::new(Chan { tx, semaphore, - rx_task: AtomicTask::new(), + rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), rx_fields: CausalCell::new(RxFields { list: rx, @@ -163,8 +165,8 @@ where } /// TODO: Docs - pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> { - self.inner.semaphore.poll_acquire(&mut self.permit) + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.semaphore.poll_acquire(cx, &mut self.permit) } /// Send a message and notify the receiver. @@ -177,7 +179,7 @@ where self.inner.tx.push(value); // Notify the rx task - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); // Release the permit self.inner.semaphore.forget(&mut self.permit); @@ -217,7 +219,7 @@ where self.inner.tx.close(); // Notify the receiver - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); } } @@ -246,9 +248,8 @@ where } /// Receive the next value - pub(crate) fn recv(&mut self) -> Poll, ()> { + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; - use futures::Async::*; self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -258,7 +259,7 @@ where match rx_fields.list.pop(&self.inner.tx) { Some(Value(value)) => { self.inner.semaphore.add_permit(); - return Ok(Ready(Some(value))); + return Ready(Some(value)); } Some(Closed) => { // TODO: This check may not be required as it most @@ -268,7 +269,7 @@ where // which ensures that if dropping the tx handle is // visible, then all messages sent are also visible. assert!(self.inner.semaphore.is_idle()); - return Ok(Ready(None)); + return Ready(None); } None => {} // fall through } @@ -277,7 +278,7 @@ where try_recv!(); - self.inner.rx_task.register(); + self.inner.rx_waker.register_by_ref(cx.waker()); // It is possible that a value was pushed between attempting to read // and registering the task, so we have to check the channel a @@ -291,9 +292,9 @@ where ); if rx_fields.rx_closed && self.inner.semaphore.is_idle() { - Ok(Ready(None)) + Ready(None) } else { - Ok(NotReady) + Pending } }) } @@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) { self.0.available_permits() == self.1 } - fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> { - permit.poll_acquire(&self.0).map_err(|_| ()) + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll> { + permit.poll_acquire(cx, &self.0).map_err(|_| ()) } fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { @@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize { self.load(Acquire) >> 1 == 0 } - fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> { - use futures::Async::Ready; - self.try_acquire(permit).map(Ready).map_err(|_| ()) + fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll> { + Ready(self.try_acquire(permit).map_err(|_| ())) } fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 58967c91..960bee41 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -1,7 +1,11 @@ use super::chan; use crate::loom::sync::atomic::AtomicUsize; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `UnboundedReceiver`. /// @@ -83,6 +87,11 @@ impl UnboundedReceiver { UnboundedReceiver { chan } } + /// TODO: dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -92,12 +101,12 @@ impl UnboundedReceiver { } } -impl Stream for UnboundedReceiver { +#[cfg(feature = "async-traits")] +impl futures_core::Stream for UnboundedReceiver { type Item = T; - type Error = UnboundedRecvError; - fn poll(&mut self) -> Poll, Self::Error> { - self.chan.recv().map_err(|_| UnboundedRecvError(())) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) } } @@ -113,25 +122,24 @@ impl UnboundedSender { } } -impl Sink for UnboundedSender { - type SinkItem = T; - type SinkError = UnboundedSendError; +#[cfg(feature = "async-traits")] +impl async_sink::Sink for UnboundedSender { + type Error = UnboundedSendError; - fn start_send(&mut self, msg: T) -> StartSend { - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } - self.try_send(msg).map_err(|_| UnboundedSendError(()))?; - Ok(AsyncSink::Ready) + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index d3531bd9..c38bb0ce 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -1,15 +1,15 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::{ - futures::task::{self, Task}, - sync::atomic::AtomicUsize, - sync::CausalCell, -}; -use futures::{Async, Future, Poll}; +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; + use std::fmt; +use std::future::Future; use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll, Waker}; /// Sends a value to the associated `Receiver`. /// @@ -82,10 +82,10 @@ struct Inner { value: CausalCell>, /// The task to notify when the receiver drops without consuming the value. - tx_task: CausalCell>, + tx_task: CausalCell>, /// The task to notify when the value is sent. - rx_task: CausalCell>, + rx_task: CausalCell>, } #[derive(Clone, Copy)] @@ -167,33 +167,33 @@ impl Sender { /// /// # Return values /// - /// If `Ok(Ready)` is returned then the associated `Receiver` has been + /// If `Ready(Ok(_))` is returned then the associated `Receiver` has been /// dropped, which means any work required for sending should be canceled. /// - /// If `Ok(NotReady)` is returned then the associated `Receiver` is still + /// If `Pending` is returned then the associated `Receiver` is still /// alive and may be able to receive a message if sent. The current task is /// registered to receive a notification if the `Receiver` handle goes away. /// /// [`Receiver`]: struct.Receiver.html - pub fn poll_close(&mut self) -> Poll<(), ()> { + pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); if state.is_closed() { - return Ok(Async::Ready(())); + return Poll::Ready(()); } if state.is_tx_task_set() { let will_notify = inner .tx_task - .with(|ptr| unsafe { (&*ptr).will_notify_current() }); + .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) }); if !will_notify { state = State::unset_tx_task(&inner.state); if state.is_closed() { - return Ok(Async::Ready(())); + return Ready(()); } else { unsafe { inner.drop_tx_task() }; } @@ -203,18 +203,18 @@ impl Sender { if !state.is_tx_task_set() { // Attempt to set the task unsafe { - inner.set_tx_task(); + inner.set_tx_task(cx); } // Update the state state = State::set_tx_task(&inner.state); if state.is_closed() { - return Ok(Async::Ready(())); + return Ready(()); } } - Ok(Async::NotReady) + Pending } /// Check if the associated [`Receiver`] handle has been dropped. @@ -297,25 +297,18 @@ impl Drop for Receiver { } impl Future for Receiver { - type Item = T; - type Error = RecvError; - - fn poll(&mut self) -> Poll { - use futures::Async::{NotReady, Ready}; + type Output = Result; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // If `inner` is `None`, then `poll()` has already completed. - let ret = if let Some(inner) = self.inner.as_ref() { - match inner.poll_recv() { - Ok(Ready(v)) => Ok(Ready(v)), - Ok(NotReady) => return Ok(NotReady), - Err(e) => Err(e), - } + let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { + ready!(inner.poll_recv(cx))? } else { panic!("called after complete"); }; self.inner = None; - ret + Ready(Ok(ret)) } } @@ -328,30 +321,29 @@ impl Inner { } if prev.is_rx_task_set() { - self.rx_task.with(|ptr| unsafe { (&*ptr).notify() }); + // TODO: Consume waker? + self.rx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() }); } true } - fn poll_recv(&self) -> Poll { - use futures::Async::{NotReady, Ready}; - + fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { // Load the state let mut state = State::load(&self.state, Acquire); if state.is_complete() { match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), } } else if state.is_closed() { - Err(RecvError(())) + Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { let will_notify = self .rx_task - .with(|ptr| unsafe { (&*ptr).will_notify_current() }); + .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) }); // Check if the task is still the same if !will_notify { @@ -359,8 +351,8 @@ impl Inner { state = State::unset_rx_task(&self.state); if state.is_complete() { return match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), }; } else { unsafe { self.drop_rx_task() }; @@ -371,7 +363,7 @@ impl Inner { if !state.is_rx_task_set() { // Attempt to set the task unsafe { - self.set_rx_task(); + self.set_rx_task(cx); } // Update the state @@ -379,14 +371,14 @@ impl Inner { if state.is_complete() { match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), } } else { - return Ok(NotReady); + return Pending; } } else { - return Ok(NotReady); + return Pending; } } } @@ -396,7 +388,7 @@ impl Inner { let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { - self.tx_task.with(|ptr| unsafe { (&*ptr).notify() }); + self.tx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() }); } } @@ -413,14 +405,14 @@ impl Inner { self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr)) } - unsafe fn set_rx_task(&self) { + unsafe fn set_rx_task(&self, cx: &mut Context<'_>) { self.rx_task - .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current())); + .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone())); } - unsafe fn set_tx_task(&self) { + unsafe fn set_tx_task(&self, cx: &mut Context<'_>) { self.tx_task - .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current())); + .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone())); } } diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs index 43e89c37..33fd2318 100644 --- a/tokio-sync/src/semaphore.rs +++ b/tokio-sync/src/semaphore.rs @@ -6,21 +6,23 @@ //! Before accessing the shared resource, callers acquire a permit from the //! semaphore. Once the permit is acquired, the caller then enters the critical //! section. If no permits are available, then acquiring the semaphore returns -//! `NotReady`. The task is notified once a permit becomes available. +//! `Pending`. The task is woken once a permit becomes available. use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::{ atomic::{AtomicPtr, AtomicUsize}, CausalCell, }, yield_now, }; -use futures::Poll; + use std::fmt; use std::ptr::{self, NonNull}; use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; use std::usize; /// Futures-aware semaphore. @@ -80,8 +82,8 @@ struct WaiterNode { /// See `NodeState` for more details. state: AtomicUsize, - /// Task to notify when a permit is made available. - task: AtomicTask, + /// Task to wake when a permit is made available. + waker: AtomicWaker, /// Next pointer in the queue of waiting senders. next: AtomicPtr, @@ -174,9 +176,10 @@ impl Semaphore { } /// Poll for a permit - fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> { - use futures::Async::*; - + fn poll_permit( + &self, + mut permit: Option<(&mut Context<'_>, &mut Permit)>, + ) -> Poll> { // Load the current state let mut curr = SemState::load(&self.state, Acquire); @@ -205,7 +208,7 @@ impl Semaphore { if curr.is_closed() { undo_strong!(); - return Err(AcquireError::closed()); + return Ready(Err(AcquireError::closed())); } if !next.acquire_permit(&self.stub) { @@ -214,13 +217,13 @@ impl Semaphore { debug_assert!(curr.waiter().is_some()); if maybe_strong.is_none() { - if let Some(ref mut permit) = permit { + if let Some((ref mut cx, ref mut permit)) = permit { // Get the Sender's waiter node, or initialize one let waiter = permit .waiter .get_or_insert_with(|| Arc::new(WaiterNode::new())); - waiter.register(); + waiter.register(cx); debug!(" + poll_permit -- to_queued_waiting"); @@ -228,14 +231,14 @@ impl Semaphore { debug!(" + poll_permit; waiter already queued"); // The node is alrady queued, there is no further work // to do. - return Ok(NotReady); + return Pending; } maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); } else { // If no `waiter`, then the task is not registered and there // is no further work to do. - return Ok(NotReady); + return Pending; } } @@ -261,14 +264,14 @@ impl Semaphore { debug!(" + poll_permit -- waiter pushed"); - return Ok(NotReady); + return Pending; } None => { debug!(" + poll_permit -- permit acquired"); undo_strong!(); - return Ok(Ready(())); + return Ready(Ok(())); } } } @@ -571,42 +574,42 @@ impl Permit { /// Try to acquire the permit. If no permits are available, the current task /// is notified once a new permit becomes available. - pub fn poll_acquire(&mut self, semaphore: &Semaphore) -> Poll<(), AcquireError> { - use futures::Async::*; - + pub fn poll_acquire( + &mut self, + cx: &mut Context<'_>, + semaphore: &Semaphore, + ) -> Poll> { match self.state { PermitState::Idle => {} PermitState::Waiting => { let waiter = self.waiter.as_ref().unwrap(); - if waiter.acquire()? { + if waiter.acquire(cx)? { self.state = PermitState::Acquired; - return Ok(Ready(())); + return Ready(Ok(())); } else { - return Ok(NotReady); + return Pending; } } PermitState::Acquired => { - return Ok(Ready(())); + return Ready(Ok(())); } } - match semaphore.poll_permit(Some(self))? { + match semaphore.poll_permit(Some((cx, self)))? { Ready(v) => { self.state = PermitState::Acquired; - Ok(Ready(v)) + Ready(Ok(v)) } - NotReady => { + Pending => { self.state = PermitState::Waiting; - Ok(NotReady) + Pending } } } /// Try to acquire the permit. pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { - use futures::Async::*; - match self.state { PermitState::Idle => {} PermitState::Waiting => { @@ -629,7 +632,7 @@ impl Permit { self.state = PermitState::Acquired; Ok(()) } - NotReady => Err(TryAcquireError::no_permits()), + Pending => Err(TryAcquireError::no_permits()), } } @@ -748,17 +751,17 @@ impl WaiterNode { fn new() -> WaiterNode { WaiterNode { state: AtomicUsize::new(NodeState::new().to_usize()), - task: AtomicTask::new(), + waker: AtomicWaker::new(), next: AtomicPtr::new(ptr::null_mut()), } } - fn acquire(&self) -> Result { + fn acquire(&self, cx: &mut Context<'_>) -> Result { if self.acquire2()? { return Ok(true); } - self.task.register(); + self.waker.register_by_ref(cx.waker()); self.acquire2() } @@ -773,8 +776,8 @@ impl WaiterNode { } } - fn register(&self) { - self.task.register() + fn register(&self, cx: &mut Context<'_>) { + self.waker.register_by_ref(cx.waker()) } /// Returns `true` if the permit has been acquired @@ -860,7 +863,7 @@ impl WaiterNode { Ok(_) => match curr { QueuedWaiting => { debug!(" + notify -- task notified"); - self.task.notify(); + self.waker.wake(); return true; } other => { diff --git a/tokio-sync/src/task/atomic_task.rs b/tokio-sync/src/task/atomic_task.rs deleted file mode 100644 index 73110da2..00000000 --- a/tokio-sync/src/task/atomic_task.rs +++ /dev/null @@ -1,336 +0,0 @@ -use crate::loom::{ - futures::task::{self, Task}, - sync::atomic::AtomicUsize, - sync::CausalCell, -}; -use std::fmt; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; - -/// A synchronization primitive for task notification. -/// -/// `AtomicTask` will coordinate concurrent notifications with the consumer -/// potentially "updating" the underlying task to notify. This is useful in -/// scenarios where a computation completes in another thread and wants to -/// notify the consumer, but the consumer is in the process of being migrated to -/// a new logical task. -/// -/// Consumers should call `register` before checking the result of a computation -/// and producers should call `notify` after producing the computation (this -/// differs from the usual `thread::park` pattern). It is also permitted for -/// `notify` to be called **before** `register`. This results in a no-op. -/// -/// A single `AtomicTask` may be reused for any number of calls to `register` or -/// `notify`. -/// -/// `AtomicTask` does not provide any memory ordering guarantees, as such the -/// user should use caution and use other synchronization primitives to guard -/// the result of the underlying computation. -pub struct AtomicTask { - state: AtomicUsize, - task: CausalCell>, -} - -// `AtomicTask` is a multi-consumer, single-producer transfer cell. The cell -// stores a `Task` value produced by calls to `register` and many threads can -// race to take the task (to notify it) by calling `notify. -// -// If a new `Task` instance is produced by calling `register` before an existing -// one is consumed, then the existing one is overwritten. -// -// While `AtomicTask` is single-producer, the implementation ensures memory -// safety. In the event of concurrent calls to `register`, there will be a -// single winner whose task will get stored in the cell. The losers will not -// have their tasks notified. As such, callers should ensure to add -// synchronization to calls to `register`. -// -// The implementation uses a single `AtomicUsize` value to coordinate access to -// the `Task` cell. There are two bits that are operated on independently. These -// are represented by `REGISTERING` and `NOTIFYING`. -// -// The `REGISTERING` bit is set when a producer enters the critical section. The -// `NOTIFYING` bit is set when a consumer enters the critical section. Neither -// bit being set is represented by `WAITING`. -// -// A thread obtains an exclusive lock on the task cell by transitioning the -// state from `WAITING` to `REGISTERING` or `NOTIFYING`, depending on the -// operation the thread wishes to perform. When this transition is made, it is -// guaranteed that no other thread will access the task cell. -// -// # Registering -// -// On a call to `register`, an attempt to transition the state from WAITING to -// REGISTERING is made. On success, the caller obtains a lock on the task cell. -// -// If the lock is obtained, then the thread sets the task cell to the task -// provided as an argument. Then it attempts to transition the state back from -// `REGISTERING` -> `WAITING`. -// -// If this transition is successful, then the registering process is complete -// and the next call to `notify` will observe the task. -// -// If the transition fails, then there was a concurrent call to `notify` that -// was unable to access the task cell (due to the registering thread holding the -// lock). To handle this, the registering thread removes the task it just set -// from the cell and calls `notify` on it. This call to notify represents the -// attempt to notify by the other thread (that set the `NOTIFYING` bit). The -// state is then transitioned from `REGISTERING | NOTIFYING` back to `WAITING`. -// This transition must succeed because, at this point, the state cannot be -// transitioned by another thread. -// -// # Notifying -// -// On a call to `notify`, an attempt to transition the state from `WAITING` to -// `NOTIFYING` is made. On success, the caller obtains a lock on the task cell. -// -// If the lock is obtained, then the thread takes ownership of the current value -// in teh task cell, and calls `notify` on it. The state is then transitioned -// back to `WAITING`. This transition must succeed as, at this point, the state -// cannot be transitioned by another thread. -// -// If the thread is unable to obtain the lock, the `NOTIFYING` bit is still. -// This is because it has either been set by the current thread but the previous -// value included the `REGISTERING` bit **or** a concurrent thread is in the -// `NOTIFYING` critical section. Either way, no action must be taken. -// -// If the current thread is the only concurrent call to `notify` and another -// thread is in the `register` critical section, when the other thread **exits** -// the `register` critical section, it will observe the `NOTIFYING` bit and -// handle the notify itself. -// -// If another thread is in the `notify` critical section, then it will handle -// notifying the task. -// -// # A potential race (is safely handled). -// -// Imagine the following situation: -// -// * Thread A obtains the `notify` lock and notifies a task. -// -// * Before thread A releases the `notify` lock, the notified task is scheduled. -// -// * Thread B attempts to notify the task. In theory this should result in the -// task being notified, but it cannot because thread A still holds the notify -// lock. -// -// This case is handled by requiring users of `AtomicTask` to call `register` -// **before** attempting to observe the application state change that resulted -// in the task being notified. The notifiers also change the application state -// before calling notify. -// -// Because of this, the task will do one of two things. -// -// 1) Observe the application state change that Thread B is notifying on. In -// this case, it is OK for Thread B's notification to be lost. -// -// 2) Call register before attempting to observe the application state. Since -// Thread A still holds the `notify` lock, the call to `register` will result -// in the task notifying itself and get scheduled again. - -/// Idle state -const WAITING: usize = 0; - -/// A new task value is being registered with the `AtomicTask` cell. -const REGISTERING: usize = 0b01; - -/// The task currently registered with the `AtomicTask` cell is being notified. -const NOTIFYING: usize = 0b10; - -impl AtomicTask { - /// Create an `AtomicTask` initialized with the given `Task` - pub fn new() -> AtomicTask { - AtomicTask { - state: AtomicUsize::new(WAITING), - task: CausalCell::new(None), - } - } - - /// Registers the current task to be notified on calls to `notify`. - /// - /// This is the same as calling `register_task` with `task::current()`. - pub fn register(&self) { - self.do_register(CurrentTask); - } - - /// Registers the provided task to be notified on calls to `notify`. - /// - /// The new task will take place of any previous tasks that were registered - /// by previous calls to `register`. Any calls to `notify` that happen after - /// a call to `register` (as defined by the memory ordering rules), will - /// notify the `register` caller's task. - /// - /// It is safe to call `register` with multiple other threads concurrently - /// calling `notify`. This will result in the `register` caller's current - /// task being notified once. - /// - /// This function is safe to call concurrently, but this is generally a bad - /// idea. Concurrent calls to `register` will attempt to register different - /// tasks to be notified. One of the callers will win and have its task set, - /// but there is no guarantee as to which caller will succeed. - pub fn register_task(&self, task: Task) { - self.do_register(ExactTask(task)); - } - - fn do_register(&self, reg: R) - where - R: Register, - { - debug!(" + register_task"); - match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { - WAITING => { - unsafe { - // Locked acquired, update the waker cell - self.task.with_mut(|t| reg.register(&mut *t)); - - // Release the lock. If the state transitioned to include - // the `NOTIFYING` bit, this means that a notify has been - // called concurrently, so we have to remove the task and - // notify it.` - // - // Start by assuming that the state is `REGISTERING` as this - // is what we jut set it to. - let res = self - .state - .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); - - match res { - Ok(_) => {} - Err(actual) => { - // This branch can only be reached if a - // concurrent thread called `notify`. In this - // case, `actual` **must** be `REGISTERING | - // `NOTIFYING`. - debug_assert_eq!(actual, REGISTERING | NOTIFYING); - - // Take the task to notify once the atomic operation has - // completed. - let notify = self.task.with_mut(|t| (*t).take()).unwrap(); - - // Just swap, because no one could change state - // while state == `Registering | `Waking` - self.state.swap(WAITING, AcqRel); - - // The atomic swap was complete, now - // notify the task and return. - notify.notify(); - } - } - } - } - NOTIFYING => { - // Currently in the process of notifying the task, i.e., - // `notify` is currently being called on the old task handle. - // So, we call notify on the new task handle - reg.notify(); - } - state => { - // In this case, a concurrent thread is holding the - // "registering" lock. This probably indicates a bug in the - // caller's code as racing to call `register` doesn't make much - // sense. - // - // We just want to maintain memory safety. It is ok to drop the - // call to `register`. - debug_assert!(state == REGISTERING || state == REGISTERING | NOTIFYING); - } - } - } - - /// Notifies the task that last called `register`. - /// - /// If `register` has not been called yet, then this does nothing. - pub fn notify(&self) { - debug!(" + notify"); - if let Some(task) = self.take_task() { - task.notify(); - } - } - - /// Attempts to take the `Task` value out of the `AtomicTask` with the - /// intention that the caller will notify the task later. - pub fn take_task(&self) -> Option { - debug!(" + take_task"); - // AcqRel ordering is used in order to acquire the value of the `task` - // cell as well as to establish a `release` ordering with whatever - // memory the `AtomicTask` is associated with. - match self.state.fetch_or(NOTIFYING, AcqRel) { - WAITING => { - debug!(" + WAITING"); - // The notifying lock has been acquired. - let task = unsafe { self.task.with_mut(|t| (*t).take()) }; - - // Release the lock - self.state.fetch_and(!NOTIFYING, Release); - debug!(" + Done taking"); - - task - } - state => { - debug!(" + state = {:?}", state); - // There is a concurrent thread currently updating the - // associated task. - // - // Nothing more to do as the `NOTIFYING` bit has been set. It - // doesn't matter if there are concurrent registering threads or - // not. - // - debug_assert!( - state == REGISTERING || state == REGISTERING | NOTIFYING || state == NOTIFYING - ); - None - } - } - } -} - -impl Default for AtomicTask { - fn default() -> Self { - AtomicTask::new() - } -} - -impl fmt::Debug for AtomicTask { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "AtomicTask") - } -} - -unsafe impl Send for AtomicTask {} -unsafe impl Sync for AtomicTask {} - -trait Register { - fn register(self, slot: &mut Option); - fn notify(self); -} - -struct CurrentTask; - -impl Register for CurrentTask { - fn register(self, slot: &mut Option) { - let should_update = (&*slot) - .as_ref() - .map(|prev| !prev.will_notify_current()) - .unwrap_or(true); - if should_update { - *slot = Some(task::current()); - } - } - - fn notify(self) { - task::current().notify(); - } -} - -struct ExactTask(Task); - -impl Register for ExactTask { - fn register(self, slot: &mut Option) { - // When calling register_task with an exact task, it doesn't matter - // if the previous task would have notified current. We *always* want - // to save that exact task. - *slot = Some(self.0); - } - - fn notify(self) { - self.0.notify(); - } -} diff --git a/tokio-sync/src/task/atomic_waker.rs b/tokio-sync/src/task/atomic_waker.rs new file mode 100644 index 00000000..6f741d38 --- /dev/null +++ b/tokio-sync/src/task/atomic_waker.rs @@ -0,0 +1,317 @@ +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; + +use std::fmt; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::Waker; + +/// A synchronization primitive for task waking. +/// +/// `AtomicWaker` will coordinate concurrent wakes with the consumer +/// potentially "waking" the underlying task. This is useful in scenarios +/// where a computation completes in another thread and wants to wake the +/// consumer, but the consumer is in the process of being migrated to a new +/// logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `wake` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `wake` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicWaker` may be reused for any number of calls to `register` or +/// `wake`. +pub struct AtomicWaker { + state: AtomicUsize, + waker: CausalCell>, +} + +// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell +// stores a `Waker` value produced by calls to `register` and many threads can +// race to take the waker by calling `wake. +// +// If a new `Waker` instance is produced by calling `register` before an existing +// one is consumed, then the existing one is overwritten. +// +// While `AtomicWaker` is single-producer, the implementation ensures memory +// safety. In the event of concurrent calls to `register`, there will be a +// single winner whose waker will get stored in the cell. The losers will not +// have their tasks woken. As such, callers should ensure to add synchronization +// to calls to `register`. +// +// The implementation uses a single `AtomicUsize` value to coordinate access to +// the `Waker` cell. There are two bits that are operated on independently. These +// are represented by `REGISTERING` and `WAKING`. +// +// The `REGISTERING` bit is set when a producer enters the critical section. The +// `WAKING` bit is set when a consumer enters the critical section. Neither +// bit being set is represented by `WAITING`. +// +// A thread obtains an exclusive lock on the waker cell by transitioning the +// state from `WAITING` to `REGISTERING` or `WAKING`, depending on the +// operation the thread wishes to perform. When this transition is made, it is +// guaranteed that no other thread will access the waker cell. +// +// # Registering +// +// On a call to `register`, an attempt to transition the state from WAITING to +// REGISTERING is made. On success, the caller obtains a lock on the waker cell. +// +// If the lock is obtained, then the thread sets the waker cell to the waker +// provided as an argument. Then it attempts to transition the state back from +// `REGISTERING` -> `WAITING`. +// +// If this transition is successful, then the registering process is complete +// and the next call to `wake` will observe the waker. +// +// If the transition fails, then there was a concurrent call to `wake` that +// was unable to access the waker cell (due to the registering thread holding the +// lock). To handle this, the registering thread removes the waker it just set +// from the cell and calls `wake` on it. This call to wake represents the +// attempt to wake by the other thread (that set the `WAKING` bit). The +// state is then transitioned from `REGISTERING | WAKING` back to `WAITING`. +// This transition must succeed because, at this point, the state cannot be +// transitioned by another thread. +// +// # Waking +// +// On a call to `wake`, an attempt to transition the state from `WAITING` to +// `WAKING` is made. On success, the caller obtains a lock on the waker cell. +// +// If the lock is obtained, then the thread takes ownership of the current value +// in the waker cell, and calls `wake` on it. The state is then transitioned +// back to `WAITING`. This transition must succeed as, at this point, the state +// cannot be transitioned by another thread. +// +// If the thread is unable to obtain the lock, the `WAKING` bit is still. +// This is because it has either been set by the current thread but the previous +// value included the `REGISTERING` bit **or** a concurrent thread is in the +// `WAKING` critical section. Either way, no action must be taken. +// +// If the current thread is the only concurrent call to `wake` and another +// thread is in the `register` critical section, when the other thread **exits** +// the `register` critical section, it will observe the `WAKING` bit and +// handle the waker itself. +// +// If another thread is in the `waker` critical section, then it will handle +// waking the caller task. +// +// # A potential race (is safely handled). +// +// Imagine the following situation: +// +// * Thread A obtains the `wake` lock and wakes a task. +// +// * Before thread A releases the `wake` lock, the woken task is scheduled. +// +// * Thread B attempts to wake the task. In theory this should result in the +// task being woken, but it cannot because thread A still holds the wake +// lock. +// +// This case is handled by requiring users of `AtomicWaker` to call `register` +// **before** attempting to observe the application state change that resulted +// in the task being woken. The wakers also change the application state +// before calling wake. +// +// Because of this, the task will do one of two things. +// +// 1) Observe the application state change that Thread B is waking on. In +// this case, it is OK for Thread B's wake to be lost. +// +// 2) Call register before attempting to observe the application state. Since +// Thread A still holds the `wake` lock, the call to `register` will result +// in the task waking itself and get scheduled again. + +/// Idle state +const WAITING: usize = 0; + +/// A new waker value is being registered with the `AtomicWaker` cell. +const REGISTERING: usize = 0b01; + +/// The task currently registered with the `AtomicWaker` cell is being woken. +const WAKING: usize = 0b10; + +impl AtomicWaker { + /// Create an `AtomicWaker` + pub fn new() -> AtomicWaker { + AtomicWaker { + state: AtomicUsize::new(WAITING), + waker: CausalCell::new(None), + } + } + + /// Registers the current waker to be notified on calls to `wake`. + /// + /// This is the same as calling `register_task` with `task::current()`. + pub fn register(&self, waker: Waker) { + self.do_register(waker); + } + + /// Registers the provided waker to be notified on calls to `wake`. + /// + /// The new waker will take place of any previous wakers that were registered + /// by previous calls to `register`. Any calls to `wake` that happen after + /// a call to `register` (as defined by the memory ordering rules), will + /// wake the `register` caller's task. + /// + /// It is safe to call `register` with multiple other threads concurrently + /// calling `wake`. This will result in the `register` caller's current + /// task being woken once. + /// + /// This function is safe to call concurrently, but this is generally a bad + /// idea. Concurrent calls to `register` will attempt to register different + /// tasks to be woken. One of the callers will win and have its task set, + /// but there is no guarantee as to which caller will succeed. + pub fn register_by_ref(&self, waker: &Waker) { + self.do_register(waker); + } + + fn do_register(&self, waker: W) + where + W: WakerRef, + { + debug!(" + register_task"); + match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + WAITING => { + unsafe { + // Locked acquired, update the waker cell + self.waker.with_mut(|t| *t = Some(waker.into_waker())); + + // Release the lock. If the state transitioned to include + // the `WAKING` bit, this means that a wake has been + // called concurrently, so we have to remove the waker and + // wake it.` + // + // Start by assuming that the state is `REGISTERING` as this + // is what we jut set it to. + let res = self + .state + .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); + + match res { + Ok(_) => {} + Err(actual) => { + // This branch can only be reached if a + // concurrent thread called `wake`. In this + // case, `actual` **must** be `REGISTERING | + // `WAKING`. + debug_assert_eq!(actual, REGISTERING | WAKING); + + // Take the waker to wake once the atomic operation has + // completed. + let waker = self.waker.with_mut(|t| (*t).take()).unwrap(); + + // Just swap, because no one could change state + // while state == `Registering | `Waking` + self.state.swap(WAITING, AcqRel); + + // The atomic swap was complete, now + // wake the waker and return. + waker.wake(); + } + } + } + } + WAKING => { + // Currently in the process of waking the task, i.e., + // `wake` is currently being called on the old waker. + // So, we call wake on the new waker. + waker.wake(); + } + state => { + // In this case, a concurrent thread is holding the + // "registering" lock. This probably indicates a bug in the + // caller's code as racing to call `register` doesn't make much + // sense. + // + // We just want to maintain memory safety. It is ok to drop the + // call to `register`. + debug_assert!(state == REGISTERING || state == REGISTERING | WAKING); + } + } + } + + /// Wakes the task that last called `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn wake(&self) { + debug!(" + wake"); + if let Some(waker) = self.take_waker() { + waker.wake(); + } + } + + /// Attempts to take the `Waker` value out of the `AtomicWaker` with the + /// intention that the caller will wake the task later. + pub fn take_waker(&self) -> Option { + debug!(" + take_waker"); + // AcqRel ordering is used in order to acquire the value of the `waker` + // cell as well as to establish a `release` ordering with whatever + // memory the `AtomicWaker` is associated with. + match self.state.fetch_or(WAKING, AcqRel) { + WAITING => { + debug!(" + WAITING"); + // The waking lock has been acquired. + let waker = unsafe { self.waker.with_mut(|t| (*t).take()) }; + + // Release the lock + self.state.fetch_and(!WAKING, Release); + debug!(" + Done taking"); + + waker + } + state => { + debug!(" + state = {:?}", state); + // There is a concurrent thread currently updating the + // associated waker. + // + // Nothing more to do as the `WAKING` bit has been set. It + // doesn't matter if there are concurrent registering threads or + // not. + // + debug_assert!( + state == REGISTERING || state == REGISTERING | WAKING || state == WAKING + ); + None + } + } + } +} + +impl Default for AtomicWaker { + fn default() -> Self { + AtomicWaker::new() + } +} + +impl fmt::Debug for AtomicWaker { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "AtomicWaker") + } +} + +unsafe impl Send for AtomicWaker {} +unsafe impl Sync for AtomicWaker {} + +trait WakerRef { + fn wake(self); + fn into_waker(self) -> Waker; +} + +impl WakerRef for Waker { + fn wake(self) { + self.wake() + } + + fn into_waker(self) -> Waker { + self + } +} + +impl<'a> WakerRef for &'a Waker { + fn wake(self) { + self.wake_by_ref() + } + + fn into_waker(self) -> Waker { + self.clone() + } +} diff --git a/tokio-sync/src/task/mod.rs b/tokio-sync/src/task/mod.rs index 42c40de5..cff96656 100644 --- a/tokio-sync/src/task/mod.rs +++ b/tokio-sync/src/task/mod.rs @@ -1,5 +1,5 @@ //! Thread-safe task notification primitives. -mod atomic_task; +mod atomic_waker; -pub use self::atomic_task::AtomicTask; +pub use self::atomic_waker::AtomicWaker; diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index aab016b5..ce275b4e 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -53,14 +53,19 @@ //! [`Receiver::poll`]: struct.Receiver.html#method.poll //! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref +use crate::task::AtomicWaker; + +use core::task::Poll::{Pending, Ready}; +use core::task::{Context, Poll}; use fnv::FnvHashMap; -use futures::task::AtomicTask; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; +#[cfg(feature = "async-traits")] +use std::pin::Pin; + /// Receives values from the associated `Sender`. /// /// Instances are created by the [`channel`](fn.channel.html) function. @@ -102,33 +107,12 @@ pub mod error { use std::fmt; - /// Error produced when receiving a value fails. - #[derive(Debug)] - pub struct RecvError { - pub(crate) _p: (), - } - /// Error produced when sending a value fails. #[derive(Debug)] pub struct SendError { pub(crate) inner: T, } - // ===== impl RecvError ===== - - impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - use std::error::Error; - write!(fmt, "{}", self.description()) - } - } - - impl ::std::error::Error for RecvError { - fn description(&self) -> &str { - "channel closed" - } - } - // ===== impl SendError ===== impl fmt::Display for SendError { @@ -160,7 +144,7 @@ struct Shared { watchers: Mutex, /// Task to notify when all watchers drop - cancel: AtomicTask, + cancel: AtomicWaker, } #[derive(Debug)] @@ -171,7 +155,7 @@ struct Watchers { #[derive(Debug)] struct WatchInner { - task: AtomicTask, + waker: AtomicWaker, } const CLOSED: usize = 1; @@ -216,7 +200,7 @@ pub fn channel(init: T) -> (Sender, Receiver) { next_id: INIT_ID + 1, watchers, }), - cancel: AtomicTask::new(), + cancel: AtomicWaker::new(), }); let tx = Sender { @@ -256,14 +240,14 @@ impl Receiver { /// Attempts to receive the latest value sent via the channel. /// /// If a new, unobserved, value has been sent, a reference to it is - /// returned. If no new value has been sent, then `NotReady` is returned and + /// returned. If no new value has been sent, then `Pending` is returned and /// the current task is notified once a new value is sent. /// /// Only the **most recent** value is returned. If the receiver is falling /// behind the sender, intermediate values are dropped. - pub fn poll_ref(&mut self) -> Poll>, error::RecvError> { + pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll>> { // Make sure the task is up to date - self.inner.task.register(); + self.inner.waker.register_by_ref(cx.waker()); let state = self.shared.version.load(SeqCst); let version = state & !CLOSED; @@ -274,25 +258,35 @@ impl Receiver { let inner = self.shared.value.read().unwrap(); - return Ok(Some(Ref { inner }).into()); + return Ready(Some(Ref { inner })); } if CLOSED == state & CLOSED { // The `Store` handle has been dropped. - return Ok(None.into()); + return Ready(None); } - Ok(Async::NotReady) + Pending } } -impl Str