diff options
Diffstat (limited to 'tokio-sync')
25 files changed, 1159 insertions, 1102 deletions
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index 7217b8a1..aedd4c62 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -21,12 +21,18 @@ Synchronization utilities. categories = ["asynchronous"] publish = false +[features] +async-traits = ["async-sink", "futures-core-preview"] + [dependencies] fnv = "1.0.6" -futures = "0.1.19" +async-sink = { git = "https://github.com/tokio-rs/async", optional = true } +futures-core-preview = { version = "0.3.0-alpha.16", optional = true } [dev-dependencies] +async-util = { git = "https://github.com/tokio-rs/async" } env_logger = { version = "0.5", default-features = false } -tokio = { version = "0.2.0", path = "../tokio" } -tokio-mock-task = "0.1.1" -loom = { version = "0.1.1", features = ["futures"] } +pin-utils = "0.1.0-alpha.4" +# tokio = { version = "0.2.0", path = "../tokio" } +tokio-test = { version = "0.2.0", path = "../tokio-test" } +loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] } diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index 1131899c..67903ace 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -20,6 +20,19 @@ macro_rules! debug { } } +/// Unwrap a ready value or propagate `Poll::Pending`. +#[macro_export] +macro_rules! ready { + ($e:expr) => {{ + use std::task::Poll::{Pending, Ready}; + + match $e { + Ready(v) => v, + Pending => return Pending, + } + }}; +} + macro_rules! if_fuzz { ($($t:tt)*) => {{ if false { $($t)* } diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs index 3cfa1beb..3b8ce36f 100644 --- a/tokio-sync/src/lock.rs +++ b/tokio-sync/src/lock.rs @@ -41,11 +41,13 @@ //! [`LockGuard`]: struct.LockGuard.html use crate::semaphore; -use futures::Async; + use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use std::task::Poll::Ready; +use std::task::{Context, Poll}; /// An asynchronous mutual exclusion primitive useful for protecting shared data /// @@ -103,14 +105,12 @@ impl<T> Lock<T> { /// Try to acquire the lock. /// /// If the lock is already held, the current task is notified when it is released. - pub fn poll_lock(&mut self) -> Async<LockGuard<T>> { - if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| { + pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> { + ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() - }) { - return Async::NotReady; - } + }); // We want to move the acquired permit into the guard, // and leave an unacquired one in self. @@ -118,7 +118,7 @@ impl<T> Lock<T> { inner: self.inner.clone(), permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()), }; - Async::Ready(LockGuard(acquired)) + Ready(LockGuard(acquired)) } } diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs index c46a5f54..92a21e5b 100644 --- a/tokio-sync/src/loom.rs +++ b/tokio-sync/src/loom.rs @@ -1,6 +1,5 @@ pub(crate) mod futures { - pub(crate) use crate::task::AtomicTask; - pub(crate) use futures::task; + pub(crate) use crate::task::AtomicWaker; } pub(crate) mod sync { diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index b2fce168..7f8cb905 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -1,6 +1,10 @@ use super::chan; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `Receiver`. /// @@ -127,6 +131,11 @@ impl<T> Receiver<T> { Receiver { chan } } + /// TODO: Dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -136,12 +145,12 @@ impl<T> Receiver<T> { } } -impl<T> Stream for Receiver<T> { +#[cfg(feature = "async-traits")] +impl<T> futures_core::Stream for Receiver<T> { type Item = T; - type Error = RecvError; - fn poll(&mut self) -> Poll<Option<T>, Self::Error> { - self.chan.recv().map_err(|_| RecvError(())) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Receiver::poll_next(self.get_mut(), cx) } } @@ -165,13 +174,13 @@ impl<T> Sender<T> { /// /// This method returns: /// - /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message. - /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which + /// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message. + /// - `Poll::Pending` if the channel may not have capacity, in which /// case the current task is queued to be notified once /// capacity is available; - /// - `Err(SendError)` if the receiver has been dropped. - pub fn poll_ready(&mut self) -> Poll<(), SendError> { - self.chan.poll_ready().map_err(|_| SendError(())) + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + self.chan.poll_ready(cx).map_err(|_| SendError(())) } /// Attempts to send a message on this `Sender`, returning the message @@ -182,31 +191,29 @@ impl<T> Sender<T> { } } -impl<T> Sink for Sender<T> { - type SinkItem = T; - type SinkError = SendError; +#[cfg(feature = "async-traits")] +impl<T> async_sink::Sink<T> for Sender<T> { + type Error = SendError; - fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> { - use futures::Async::*; - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Sender::poll_ready(self.get_mut(), cx) + } - match self.poll_ready()? { - Ready(_) => { - self.try_send(msg).map_err(|_| SendError(()))?; - Ok(AsyncSink::Ready) - } - NotReady => Ok(AsyncSink::NotReady(msg)), - } + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.as_mut() + .try_send(msg) + .map_err(|err| { + assert!(err.is_full(), "call `poll_ready` before sending"); + SendError(()) + }) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs index fae3159a..93e62824 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio-sync/src/mpsc/chan.rs @@ -1,13 +1,14 @@ use super::list; use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::atomic::AtomicUsize, sync::{Arc, CausalCell}, }; -use futures::Poll; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Channel sender pub(crate) struct Tx<T, S: Semaphore> { @@ -61,7 +62,8 @@ pub(crate) trait Semaphore { fn add_permit(&self); - fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>; + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit) + -> Poll<Result<(), ()>>; fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; @@ -81,8 +83,8 @@ struct Chan<T, S> { /// Coordinates access to channel's capacity. semaphore: S, - /// Receiver task. Notified when a value is pushed into the channel. - rx_task: AtomicTask, + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, /// Tracks the number of outstanding sender handles. /// @@ -101,7 +103,7 @@ where fmt.debug_struct("Chan") .field("tx", &self.tx) .field("semaphore", &self.semaphore) - .field("rx_task", &self.rx_task) + .field("rx_waker", &self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() @@ -138,7 +140,7 @@ where let chan = Arc::new(Chan { tx, semaphore, - rx_task: AtomicTask::new(), + rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), rx_fields: CausalCell::new(RxFields { list: rx, @@ -163,8 +165,8 @@ where } /// TODO: Docs - pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> { - self.inner.semaphore.poll_acquire(&mut self.permit) + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> { + self.inner.semaphore.poll_acquire(cx, &mut self.permit) } /// Send a message and notify the receiver. @@ -177,7 +179,7 @@ where self.inner.tx.push(value); // Notify the rx task - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); // Release the permit self.inner.semaphore.forget(&mut self.permit); @@ -217,7 +219,7 @@ where self.inner.tx.close(); // Notify the receiver - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); } } @@ -246,9 +248,8 @@ where } /// Receive the next value - pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> { + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { use super::block::Read::*; - use futures::Async::*; self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -258,7 +259,7 @@ where match rx_fields.list.pop(&self.inner.tx) { Some(Value(value)) => { self.inner.semaphore.add_permit(); - return Ok(Ready(Some(value))); + return Ready(Some(value)); } Some(Closed) => { // TODO: This check may not be required as it most @@ -268,7 +269,7 @@ where // which ensures that if dropping the tx handle is // visible, then all messages sent are also visible. assert!(self.inner.semaphore.is_idle()); - return Ok(Ready(None)); + return Ready(None); } None => {} // fall through } @@ -277,7 +278,7 @@ where try_recv!(); - self.inner.rx_task.register(); + self.inner.rx_waker.register_by_ref(cx.waker()); // It is possible that a value was pushed between attempting to read // and registering the task, so we have to check the channel a @@ -291,9 +292,9 @@ where ); if rx_fields.rx_closed && self.inner.semaphore.is_idle() { - Ok(Ready(None)) + Ready(None) } else { - Ok(NotReady) + Pending } }) } @@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) { self.0.available_permits() == self.1 } - fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> { - permit.poll_acquire(&self.0).map_err(|_| ()) + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> { + permit.poll_acquire(cx, &self.0).map_err(|_| ()) } fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { @@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize { self.load(Acquire) >> 1 == 0 } - fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> { - use futures::Async::Ready; - self.try_acquire(permit).map(Ready).map_err(|_| ()) + fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> { + Ready(self.try_acquire(permit).map_err(|_| ())) } fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 58967c91..960bee41 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -1,7 +1,11 @@ use super::chan; use crate::loom::sync::atomic::AtomicUsize; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `UnboundedReceiver`. /// @@ -83,6 +87,11 @@ impl<T> UnboundedReceiver<T> { UnboundedReceiver { chan } } + /// TODO: dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -92,12 +101,12 @@ impl<T> UnboundedReceiver<T> { } } -impl<T> Stream for UnboundedReceiver<T> { +#[cfg(feature = "async-traits")] +impl<T> futures_core::Stream for UnboundedReceiver<T> { type Item = T; - type Error = UnboundedRecvError; - fn poll(&mut self) -> Poll<Option<T>, Self::Error> { - self.chan.recv().map_err(|_| UnboundedRecvError(())) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) } } @@ -113,25 +122,24 @@ impl<T> UnboundedSender<T> { } } -impl<T> Sink for UnboundedSender<T> { - type SinkItem = T; - type SinkError = UnboundedSendError; +#[cfg(feature = "async-traits")] +impl<T> async_sink::Sink<T> for UnboundedSender<T> { + type Error = UnboundedSendError; - fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> { - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } - self.try_send(msg).map_err(|_| UnboundedSendError(()))?; - Ok(AsyncSink::Ready) + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index d3531bd9..c38bb0ce 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -1,15 +1,15 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::{ - futures::task::{self, Task}, - sync::atomic::AtomicUsize, - sync::CausalCell, -}; -use futures::{Async, Future, Poll}; +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; + use std::fmt; +use std::future::Future; use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll, Waker}; /// Sends a value to the associated `Receiver`. /// @@ -82,10 +82,10 @@ struct Inner<T> { value: CausalCell<Option<T>>, /// The task to notify when the receiver drops without consuming the value. - tx_task: CausalCell<ManuallyDrop<Task>>, + tx_task: CausalCell<ManuallyDrop<Waker>>, /// The task to notify when the value is sent. - rx_task: CausalCell<ManuallyDrop<Task>>, + rx_task: CausalCell<ManuallyDrop<Waker>>, } #[derive(Clone, Copy)] @@ -167,33 +167,33 @@ impl<T> Sender<T> { /// /// # Return values /// - /// If `Ok(Ready)` is returned then the associated `Receiver` has been + /// If `Ready(Ok(_))` is returned then the associated `Receiver` has been /// dropped, which means any work required for sending should be canceled. /// - /// If `Ok(NotReady)` is returned then the associated `Receiver` is still + /// If `Pending` is returned then the associated `Receiver` is still /// alive and may be able to receive a message if sent. The current task is /// registered to receive a notification if the `Receiver` handle goes away. /// /// [`Receiver`]: struct.Receiver.html - pub fn poll_close(&mut self) -> Poll<(), ()> { + pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); if state.is_closed() { - return Ok(Async::Ready(())); + return Poll::Ready(()); } if state.is_tx_task_set() { let will_notify = inner .tx_task - .with(|ptr| unsafe { (&*ptr).will_notify_current() }); + .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) }); if !will_notify { state = State::unset_tx_task(&inner.state); if state.is_closed() { - return Ok(Async::Ready(())); + return Ready(()); } else { unsafe { inner.drop_tx_task() }; } @@ -203,18 +203,18 @@ impl<T> Sender<T> { if !state.is_tx_task_set() { // Attempt to set the task unsafe { - inner.set_tx_task(); + inner.set_tx_task(cx); } // Update the state state = State::set_tx_task(&inner.state); if state.is_closed() { - return Ok(Async::Ready(())); + return Ready(()); } } - Ok(Async::NotReady) + Pending } /// Check if the associated [`Receiver`] handle has been dropped. @@ -297,25 +297,18 @@ impl<T> Drop for Receiver<T> { } impl<T> Future for Receiver<T> { - type Item = T; - type Error = RecvError; - - fn poll(&mut self) -> Poll<T, RecvError> { - use futures::Async::{NotReady, Ready}; + type Output = Result<T, RecvError>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // If `inner` is `None`, then `poll()` has already completed. - let ret = if let Some(inner) = self.inner.as_ref() { - match inner.poll_recv() { - Ok(Ready(v)) => Ok(Ready(v)), - Ok(NotReady) => return Ok(NotReady), - Err(e) => Err(e), - } + let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { + ready!(inner.poll_recv(cx))? } else { panic!("called after complete"); }; self.inner = None; - ret + Ready(Ok(ret)) } } @@ -328,30 +321,29 @@ impl<T> Inner<T> { } if prev.is_rx_task_set() { - self.rx_task.with(|ptr| unsafe { (&*ptr).notify() }); + // TODO: Consume waker? + self.rx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() }); } true } - fn poll_recv(&self) -> Poll<T, RecvError> { - use futures::Async::{NotReady, Ready}; - + fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { // Load the state let mut state = State::load(&self.state, Acquire); if state.is_complete() { match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), } } else if state.is_closed() { - Err(RecvError(())) + Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { let will_notify = self .rx_task - .with(|ptr| unsafe { (&*ptr).will_notify_current() }); + .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) }); // Check if the task is still the same if !will_notify { @@ -359,8 +351,8 @@ impl<T> Inner<T> { state = State::unset_rx_task(&self.state); if state.is_complete() { return match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), }; } else { unsafe { self.drop_rx_task() }; @@ -371,7 +363,7 @@ impl<T> Inner<T> { if !state.is_rx_task_set() { // Attempt to set the task unsafe { - self.set_rx_task(); + self.set_rx_task(cx); } // Update the state @@ -379,14 +371,14 @@ impl<T> Inner<T> { if state.is_complete() { match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), } } else { - return Ok(NotReady); + return Pending; } } else { - return Ok(NotReady); + return Pending; } } } @@ -396,7 +388,7 @@ impl<T> Inner<T> { let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { - self.tx_task.with(|ptr| unsafe { (&*ptr).notify() }); + self.tx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() }); } } @@ -413,14 +405,14 @@ impl<T> Inner<T> { self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr)) } - unsafe fn set_rx_task(&self) { + unsafe fn set_rx_task(&self, cx: &mut Context<'_>) { self.rx_task - .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current())); + .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone())); } - unsafe fn set_tx_task(&self) { + unsafe fn set_tx_task(&self, cx: &mut Context<'_>) { self.tx_task - .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current())); + .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone())); } } diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs index 43e89c37..33fd2318 100644 --- a/tokio-sync/src/semaphore.rs +++ b/tokio-sync/src/semaphore.rs @@ -6,21 +6,23 @@ //! Before accessing the shared resource, callers acquire a permit from the //! semaphore. Once the permit is acquired, the caller then enters the critical //! section. If no permits are available, then acquiring the semaphore returns -//! `NotReady`. The task is notified once a permit becomes available. +//! `Pending`. The task is woken once a permit becomes available. use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::{ atomic::{AtomicPtr, AtomicUsize}, CausalCell, }, yield_now, }; -use futures::Poll; + use std::fmt; use std::ptr::{self, NonNull}; use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; use std::usize; /// Futures-aware semaphore. @@ -80,8 +82,8 @@ struct WaiterNode { /// See `NodeState` for more details. state: AtomicUsize, - /// Task to notify when a permit is made available. - task: AtomicTask, + /// Task to wake when a permit is made available. + waker: AtomicWaker, /// Next pointer in the queue of waiting senders. next: AtomicPtr<WaiterNode>, @@ -174,9 +176,10 @@ impl Semaphore { } /// Poll for a permit - fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> { - use futures::Async::*; - + fn poll_permit( + &self, + mut permit: Option<(&mut Context<'_>, &mut Permit)>, + ) -> Poll<Result<(), AcquireError>> { // Load the current state let mut curr = SemState::load(&self.state, Acquire); @@ -205,7 +208,7 @@ impl Semaphore { if curr.is_closed() { undo_strong!(); - return Err(AcquireError::closed()); + return Ready(Err(AcquireError::closed())); } if !next.acquire_permit(&self.stub) { @@ -214,13 +217,13 @@ impl Semaphore { debug_assert!(curr.waiter().is_some()); if maybe_strong.is_none() { - if let Some(ref mut permit) = permit { + if let Some((ref mut cx, ref mut permit)) = permit { // Get the Sender's waiter node, or initialize one let waiter = permit .waiter .get_or_insert_with(|| Arc::new(WaiterNode::new())); - waiter.register(); + waiter.register(cx); debug!(" + poll_permit -- to_queued_waiting"); @@ -228,14 +231,14 @@ impl Semaphore { debug!(" + poll_permit; waiter already queued"); // The node is alrady queued, there is no further work |