summaryrefslogtreecommitdiffstats
path: root/tokio-sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-06-24 12:34:30 -0700
committerGitHub <noreply@github.com>2019-06-24 12:34:30 -0700
commit06c473e62842d257ed275497ce906710ea3f8e19 (patch)
tree4ca6d337a892aa23266a761b35dc61e988e57954 /tokio-sync
parentaa99950b9c983b842bd2107bb771c277d09d495d (diff)
Update Tokio to use `std::future`. (#1120)
A first pass at updating Tokio to use `std::future`. Implementations of `Future` from the futures crate are updated to implement `Future` from std. Implementations of `Stream` are moved to a feature flag. This commits disables a number of crates that have not yet been updated.
Diffstat (limited to 'tokio-sync')
-rw-r--r--tokio-sync/Cargo.toml14
-rw-r--r--tokio-sync/src/lib.rs13
-rw-r--r--tokio-sync/src/lock.rs14
-rw-r--r--tokio-sync/src/loom.rs3
-rw-r--r--tokio-sync/src/mpsc/bounded.rs65
-rw-r--r--tokio-sync/src/mpsc/chan.rs46
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs44
-rw-r--r--tokio-sync/src/oneshot.rs90
-rw-r--r--tokio-sync/src/semaphore.rs73
-rw-r--r--tokio-sync/src/task/atomic_task.rs336
-rw-r--r--tokio-sync/src/task/atomic_waker.rs317
-rw-r--r--tokio-sync/src/task/mod.rs4
-rw-r--r--tokio-sync/src/watch.rs108
-rw-r--r--tokio-sync/tests/atomic_task.rs52
-rw-r--r--tokio-sync/tests/atomic_waker.rs37
-rw-r--r--tokio-sync/tests/errors.rs2
-rw-r--r--tokio-sync/tests/fuzz_atomic_waker.rs (renamed from tokio-sync/tests/fuzz_atomic_task.rs)27
-rw-r--r--tokio-sync/tests/fuzz_mpsc.rs7
-rw-r--r--tokio-sync/tests/fuzz_oneshot.rs65
-rw-r--r--tokio-sync/tests/fuzz_semaphore.rs63
-rw-r--r--tokio-sync/tests/lock.rs73
-rw-r--r--tokio-sync/tests/mpsc.rs351
-rw-r--r--tokio-sync/tests/oneshot.rs160
-rw-r--r--tokio-sync/tests/semaphore.rs102
-rw-r--r--tokio-sync/tests/watch.rs195
25 files changed, 1159 insertions, 1102 deletions
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml
index 7217b8a1..aedd4c62 100644
--- a/tokio-sync/Cargo.toml
+++ b/tokio-sync/Cargo.toml
@@ -21,12 +21,18 @@ Synchronization utilities.
categories = ["asynchronous"]
publish = false
+[features]
+async-traits = ["async-sink", "futures-core-preview"]
+
[dependencies]
fnv = "1.0.6"
-futures = "0.1.19"
+async-sink = { git = "https://github.com/tokio-rs/async", optional = true }
+futures-core-preview = { version = "0.3.0-alpha.16", optional = true }
[dev-dependencies]
+async-util = { git = "https://github.com/tokio-rs/async" }
env_logger = { version = "0.5", default-features = false }
-tokio = { version = "0.2.0", path = "../tokio" }
-tokio-mock-task = "0.1.1"
-loom = { version = "0.1.1", features = ["futures"] }
+pin-utils = "0.1.0-alpha.4"
+# tokio = { version = "0.2.0", path = "../tokio" }
+tokio-test = { version = "0.2.0", path = "../tokio-test" }
+loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] }
diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs
index 1131899c..67903ace 100644
--- a/tokio-sync/src/lib.rs
+++ b/tokio-sync/src/lib.rs
@@ -20,6 +20,19 @@ macro_rules! debug {
}
}
+/// Unwrap a ready value or propagate `Poll::Pending`.
+#[macro_export]
+macro_rules! ready {
+ ($e:expr) => {{
+ use std::task::Poll::{Pending, Ready};
+
+ match $e {
+ Ready(v) => v,
+ Pending => return Pending,
+ }
+ }};
+}
+
macro_rules! if_fuzz {
($($t:tt)*) => {{
if false { $($t)* }
diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs
index 3cfa1beb..3b8ce36f 100644
--- a/tokio-sync/src/lock.rs
+++ b/tokio-sync/src/lock.rs
@@ -41,11 +41,13 @@
//! [`LockGuard`]: struct.LockGuard.html
use crate::semaphore;
-use futures::Async;
+
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
+use std::task::Poll::Ready;
+use std::task::{Context, Poll};
/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
@@ -103,14 +105,12 @@ impl<T> Lock<T> {
/// Try to acquire the lock.
///
/// If the lock is already held, the current task is notified when it is released.
- pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
- if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
+ pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
+ ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
- }) {
- return Async::NotReady;
- }
+ });
// We want to move the acquired permit into the guard,
// and leave an unacquired one in self.
@@ -118,7 +118,7 @@ impl<T> Lock<T> {
inner: self.inner.clone(),
permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
};
- Async::Ready(LockGuard(acquired))
+ Ready(LockGuard(acquired))
}
}
diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs
index c46a5f54..92a21e5b 100644
--- a/tokio-sync/src/loom.rs
+++ b/tokio-sync/src/loom.rs
@@ -1,6 +1,5 @@
pub(crate) mod futures {
- pub(crate) use crate::task::AtomicTask;
- pub(crate) use futures::task;
+ pub(crate) use crate::task::AtomicWaker;
}
pub(crate) mod sync {
diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs
index b2fce168..7f8cb905 100644
--- a/tokio-sync/src/mpsc/bounded.rs
+++ b/tokio-sync/src/mpsc/bounded.rs
@@ -1,6 +1,10 @@
use super::chan;
-use futures::{Poll, Sink, StartSend, Stream};
+
use std::fmt;
+use std::task::{Context, Poll};
+
+#[cfg(feature = "async-traits")]
+use std::pin::Pin;
/// Send values to the associated `Receiver`.
///
@@ -127,6 +131,11 @@ impl<T> Receiver<T> {
Receiver { chan }
}
+ /// TODO: Dox
+ pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
@@ -136,12 +145,12 @@ impl<T> Receiver<T> {
}
}
-impl<T> Stream for Receiver<T> {
+#[cfg(feature = "async-traits")]
+impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
- type Error = RecvError;
- fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
- self.chan.recv().map_err(|_| RecvError(()))
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Receiver::poll_next(self.get_mut(), cx)
}
}
@@ -165,13 +174,13 @@ impl<T> Sender<T> {
///
/// This method returns:
///
- /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message.
- /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which
+ /// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message.
+ /// - `Poll::Pending` if the channel may not have capacity, in which
/// case the current task is queued to be notified once
/// capacity is available;
- /// - `Err(SendError)` if the receiver has been dropped.
- pub fn poll_ready(&mut self) -> Poll<(), SendError> {
- self.chan.poll_ready().map_err(|_| SendError(()))
+ /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ self.chan.poll_ready(cx).map_err(|_| SendError(()))
}
/// Attempts to send a message on this `Sender`, returning the message
@@ -182,31 +191,29 @@ impl<T> Sender<T> {
}
}
-impl<T> Sink for Sender<T> {
- type SinkItem = T;
- type SinkError = SendError;
+#[cfg(feature = "async-traits")]
+impl<T> async_sink::Sink<T> for Sender<T> {
+ type Error = SendError;
- fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
- use futures::Async::*;
- use futures::AsyncSink;
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Sender::poll_ready(self.get_mut(), cx)
+ }
- match self.poll_ready()? {
- Ready(_) => {
- self.try_send(msg).map_err(|_| SendError(()))?;
- Ok(AsyncSink::Ready)
- }
- NotReady => Ok(AsyncSink::NotReady(msg)),
- }
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.as_mut()
+ .try_send(msg)
+ .map_err(|err| {
+ assert!(err.is_full(), "call `poll_ready` before sending");
+ SendError(())
+ })
}
- fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
- fn close(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
}
diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs
index fae3159a..93e62824 100644
--- a/tokio-sync/src/mpsc/chan.rs
+++ b/tokio-sync/src/mpsc/chan.rs
@@ -1,13 +1,14 @@
use super::list;
use crate::loom::{
- futures::AtomicTask,
+ futures::AtomicWaker,
sync::atomic::AtomicUsize,
sync::{Arc, CausalCell},
};
-use futures::Poll;
use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
/// Channel sender
pub(crate) struct Tx<T, S: Semaphore> {
@@ -61,7 +62,8 @@ pub(crate) trait Semaphore {
fn add_permit(&self);
- fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
+ fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
+ -> Poll<Result<(), ()>>;
fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
@@ -81,8 +83,8 @@ struct Chan<T, S> {
/// Coordinates access to channel's capacity.
semaphore: S,
- /// Receiver task. Notified when a value is pushed into the channel.
- rx_task: AtomicTask,
+ /// Receiver waker. Notified when a value is pushed into the channel.
+ rx_waker: AtomicWaker,
/// Tracks the number of outstanding sender handles.
///
@@ -101,7 +103,7 @@ where
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
- .field("rx_task", &self.rx_task)
+ .field("rx_waker", &self.rx_waker)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
@@ -138,7 +140,7 @@ where
let chan = Arc::new(Chan {
tx,
semaphore,
- rx_task: AtomicTask::new(),
+ rx_waker: AtomicWaker::new(),
tx_count: AtomicUsize::new(1),
rx_fields: CausalCell::new(RxFields {
list: rx,
@@ -163,8 +165,8 @@ where
}
/// TODO: Docs
- pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
- self.inner.semaphore.poll_acquire(&mut self.permit)
+ pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
+ self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}
/// Send a message and notify the receiver.
@@ -177,7 +179,7 @@ where
self.inner.tx.push(value);
// Notify the rx task
- self.inner.rx_task.notify();
+ self.inner.rx_waker.wake();
// Release the permit
self.inner.semaphore.forget(&mut self.permit);
@@ -217,7 +219,7 @@ where
self.inner.tx.close();
// Notify the receiver
- self.inner.rx_task.notify();
+ self.inner.rx_waker.wake();
}
}
@@ -246,9 +248,8 @@ where
}
/// Receive the next value
- pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
+ pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;
- use futures::Async::*;
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
@@ -258,7 +259,7 @@ where
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
- return Ok(Ready(Some(value)));
+ return Ready(Some(value));
}
Some(Closed) => {
// TODO: This check may not be required as it most
@@ -268,7 +269,7 @@ where
// which ensures that if dropping the tx handle is
// visible, then all messages sent are also visible.
assert!(self.inner.semaphore.is_idle());
- return Ok(Ready(None));
+ return Ready(None);
}
None => {} // fall through
}
@@ -277,7 +278,7 @@ where
try_recv!();
- self.inner.rx_task.register();
+ self.inner.rx_waker.register_by_ref(cx.waker());
// It is possible that a value was pushed between attempting to read
// and registering the task, so we have to check the channel a
@@ -291,9 +292,9 @@ where
);
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
- Ok(Ready(None))
+ Ready(None)
} else {
- Ok(NotReady)
+ Pending
}
})
}
@@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) {
self.0.available_permits() == self.1
}
- fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
- permit.poll_acquire(&self.0).map_err(|_| ())
+ fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> {
+ permit.poll_acquire(cx, &self.0).map_err(|_| ())
}
fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
@@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}
- fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
- use futures::Async::Ready;
- self.try_acquire(permit).map(Ready).map_err(|_| ())
+ fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> {
+ Ready(self.try_acquire(permit).map_err(|_| ()))
}
fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs
index 58967c91..960bee41 100644
--- a/tokio-sync/src/mpsc/unbounded.rs
+++ b/tokio-sync/src/mpsc/unbounded.rs
@@ -1,7 +1,11 @@
use super::chan;
use crate::loom::sync::atomic::AtomicUsize;
-use futures::{Poll, Sink, StartSend, Stream};
+
use std::fmt;
+use std::task::{Context, Poll};
+
+#[cfg(feature = "async-traits")]
+use std::pin::Pin;
/// Send values to the associated `UnboundedReceiver`.
///
@@ -83,6 +87,11 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
+ /// TODO: dox
+ pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
@@ -92,12 +101,12 @@ impl<T> UnboundedReceiver<T> {
}
}
-impl<T> Stream for UnboundedReceiver<T> {
+#[cfg(feature = "async-traits")]
+impl<T> futures_core::Stream for UnboundedReceiver<T> {
type Item = T;
- type Error = UnboundedRecvError;
- fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
- self.chan.recv().map_err(|_| UnboundedRecvError(()))
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
}
}
@@ -113,25 +122,24 @@ impl<T> UnboundedSender<T> {
}
}
-impl<T> Sink for UnboundedSender<T> {
- type SinkItem = T;
- type SinkError = UnboundedSendError;
+#[cfg(feature = "async-traits")]
+impl<T> async_sink::Sink<T> for UnboundedSender<T> {
+ type Error = UnboundedSendError;
- fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
- use futures::AsyncSink;
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
- self.try_send(msg).map_err(|_| UnboundedSendError(()))?;
- Ok(AsyncSink::Ready)
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.try_send(msg).map_err(|_| UnboundedSendError(()))
}
- fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
- fn close(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
}
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs
index d3531bd9..c38bb0ce 100644
--- a/tokio-sync/src/oneshot.rs
+++ b/tokio-sync/src/oneshot.rs
@@ -1,15 +1,15 @@
//! A channel for sending a single message between asynchronous tasks.
-use crate::loom::{
- futures::task::{self, Task},
- sync::atomic::AtomicUsize,
- sync::CausalCell,
-};
-use futures::{Async, Future, Poll};
+use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell};
+
use std::fmt;
+use std::future::Future;
use std::mem::{self, ManuallyDrop};
+use std::pin::Pin;
use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll, Waker};
/// Sends a value to the associated `Receiver`.
///
@@ -82,10 +82,10 @@ struct Inner<T> {
value: CausalCell<Option<T>>,
/// The task to notify when the receiver drops without consuming the value.
- tx_task: CausalCell<ManuallyDrop<Task>>,
+ tx_task: CausalCell<ManuallyDrop<Waker>>,
/// The task to notify when the value is sent.
- rx_task: CausalCell<ManuallyDrop<Task>>,
+ rx_task: CausalCell<ManuallyDrop<Waker>>,
}
#[derive(Clone, Copy)]
@@ -167,33 +167,33 @@ impl<T> Sender<T> {
///
/// # Return values
///
- /// If `Ok(Ready)` is returned then the associated `Receiver` has been
+ /// If `Ready(Ok(_))` is returned then the associated `Receiver` has been
/// dropped, which means any work required for sending should be canceled.
///
- /// If `Ok(NotReady)` is returned then the associated `Receiver` is still
+ /// If `Pending` is returned then the associated `Receiver` is still
/// alive and may be able to receive a message if sent. The current task is
/// registered to receive a notification if the `Receiver` handle goes away.
///
/// [`Receiver`]: struct.Receiver.html
- pub fn poll_close(&mut self) -> Poll<(), ()> {
+ pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let inner = self.inner.as_ref().unwrap();
let mut state = State::load(&inner.state, Acquire);
if state.is_closed() {
- return Ok(Async::Ready(()));
+ return Poll::Ready(());
}
if state.is_tx_task_set() {
let will_notify = inner
.tx_task
- .with(|ptr| unsafe { (&*ptr).will_notify_current() });
+ .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) });
if !will_notify {
state = State::unset_tx_task(&inner.state);
if state.is_closed() {
- return Ok(Async::Ready(()));
+ return Ready(());
} else {
unsafe { inner.drop_tx_task() };
}
@@ -203,18 +203,18 @@ impl<T> Sender<T> {
if !state.is_tx_task_set() {
// Attempt to set the task
unsafe {
- inner.set_tx_task();
+ inner.set_tx_task(cx);
}
// Update the state
state = State::set_tx_task(&inner.state);
if state.is_closed() {
- return Ok(Async::Ready(()));
+ return Ready(());
}
}
- Ok(Async::NotReady)
+ Pending
}
/// Check if the associated [`Receiver`] handle has been dropped.
@@ -297,25 +297,18 @@ impl<T> Drop for Receiver<T> {
}
impl<T> Future for Receiver<T> {
- type Item = T;
- type Error = RecvError;
-
- fn poll(&mut self) -> Poll<T, RecvError> {
- use futures::Async::{NotReady, Ready};
+ type Output = Result<T, RecvError>;
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If `inner` is `None`, then `poll()` has already completed.
- let ret = if let Some(inner) = self.inner.as_ref() {
- match inner.poll_recv() {
- Ok(Ready(v)) => Ok(Ready(v)),
- Ok(NotReady) => return Ok(NotReady),
- Err(e) => Err(e),
- }
+ let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
+ ready!(inner.poll_recv(cx))?
} else {
panic!("called after complete");
};
self.inner = None;
- ret
+ Ready(Ok(ret))
}
}
@@ -328,30 +321,29 @@ impl<T> Inner<T> {
}
if prev.is_rx_task_set() {
- self.rx_task.with(|ptr| unsafe { (&*ptr).notify() });
+ // TODO: Consume waker?
+ self.rx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() });
}
true
}
- fn poll_recv(&self) -> Poll<T, RecvError> {
- use futures::Async::{NotReady, Ready};
-
+ fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// Load the state
let mut state = State::load(&self.state, Acquire);
if state.is_complete() {
match unsafe { self.consume_value() } {
- Some(value) => Ok(Ready(value)),
- None => Err(RecvError(())),
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
}
} else if state.is_closed() {
- Err(RecvError(()))
+ Ready(Err(RecvError(())))
} else {
if state.is_rx_task_set() {
let will_notify = self
.rx_task
- .with(|ptr| unsafe { (&*ptr).will_notify_current() });
+ .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) });
// Check if the task is still the same
if !will_notify {
@@ -359,8 +351,8 @@ impl<T> Inner<T> {
state = State::unset_rx_task(&self.state);
if state.is_complete() {
return match unsafe { self.consume_value() } {
- Some(value) => Ok(Ready(value)),
- None => Err(RecvError(())),
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
};
} else {
unsafe { self.drop_rx_task() };
@@ -371,7 +363,7 @@ impl<T> Inner<T> {
if !state.is_rx_task_set() {
// Attempt to set the task
unsafe {
- self.set_rx_task();
+ self.set_rx_task(cx);
}
// Update the state
@@ -379,14 +371,14 @@ impl<T> Inner<T> {
if state.is_complete() {
match unsafe { self.consume_value() } {
- Some(value) => Ok(Ready(value)),
- None => Err(RecvError(())),
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
}
} else {
- return Ok(NotReady);
+ return Pending;
}
} else {
- return Ok(NotReady);
+ return Pending;
}
}
}
@@ -396,7 +388,7 @@ impl<T> Inner<T> {
let prev = State::set_closed(&self.state);
if prev.is_tx_task_set() && !prev.is_complete() {
- self.tx_task.with(|ptr| unsafe { (&*ptr).notify() });
+ self.tx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() });
}
}
@@ -413,14 +405,14 @@ impl<T> Inner<T> {
self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr))
}
- unsafe fn set_rx_task(&self) {
+ unsafe fn set_rx_task(&self, cx: &mut Context<'_>) {
self.rx_task
- .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
+ .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone()));
}
- unsafe fn set_tx_task(&self) {
+ unsafe fn set_tx_task(&self, cx: &mut Context<'_>) {
self.tx_task
- .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
+ .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone()));
}
}
diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs
index 43e89c37..33fd2318 100644
--- a/tokio-sync/src/semaphore.rs
+++ b/tokio-sync/src/semaphore.rs
@@ -6,21 +6,23 @@
//! Before accessing the shared resource, callers acquire a permit from the
//! semaphore. Once the permit is acquired, the caller then enters the critical
//! section. If no permits are available, then acquiring the semaphore returns
-//! `NotReady`. The task is notified once a permit becomes available.
+//! `Pending`. The task is woken once a permit becomes available.
use crate::loom::{
- futures::AtomicTask,
+ futures::AtomicWaker,
sync::{
atomic::{AtomicPtr, AtomicUsize},
CausalCell,
},
yield_now,
};
-use futures::Poll;
+
use std::fmt;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
use std::usize;
/// Futures-aware semaphore.
@@ -80,8 +82,8 @@ struct WaiterNode {
/// See `NodeState` for more details.
state: AtomicUsize,
- /// Task to notify when a permit is made available.
- task: AtomicTask,
+ /// Task to wake when a permit is made available.
+ waker: AtomicWaker,
/// Next pointer in the queue of waiting senders.
next: AtomicPtr<WaiterNode>,
@@ -174,9 +176,10 @@ impl Semaphore {
}
/// Poll for a permit
- fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> {
- use futures::Async::*;
-
+ fn poll_permit(
+ &self,
+ mut permit: Option<(&mut Context<'_>, &mut Permit)>,
+ ) -> Poll<Result<(), AcquireError>> {
// Load the current state
let mut curr = SemState::load(&self.state, Acquire);
@@ -205,7 +208,7 @@ impl Semaphore {
if curr.is_closed() {
undo_strong!();
- return Err(AcquireError::closed());
+ return Ready(Err(AcquireError::closed()));
}
if !next.acquire_permit(&self.stub) {
@@ -214,13 +217,13 @@ impl Semaphore {
debug_assert!(curr.waiter().is_some());
if maybe_strong.is_none() {
- if let Some(ref mut permit) = permit {
+ if let Some((ref mut cx, ref mut permit)) = permit {
// Get the Sender's waiter node, or initialize one
let waiter = permit
.waiter
.get_or_insert_with(|| Arc::new(WaiterNode::new()));
- waiter.register();
+ waiter.register(cx);
debug!(" + poll_permit -- to_queued_waiting");
@@ -228,14 +231,14 @@ impl Semaphore {
debug!(" + poll_permit; waiter already queued");
// The node is alrady queued, there is no further work
// to do.
-