diff options
Diffstat (limited to 'tokio-sync/src/semaphore.rs')
-rw-r--r-- | tokio-sync/src/semaphore.rs | 73 |
1 files changed, 38 insertions, 35 deletions
diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs index 43e89c37..33fd2318 100644 --- a/tokio-sync/src/semaphore.rs +++ b/tokio-sync/src/semaphore.rs @@ -6,21 +6,23 @@ //! Before accessing the shared resource, callers acquire a permit from the //! semaphore. Once the permit is acquired, the caller then enters the critical //! section. If no permits are available, then acquiring the semaphore returns -//! `NotReady`. The task is notified once a permit becomes available. +//! `Pending`. The task is woken once a permit becomes available. use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::{ atomic::{AtomicPtr, AtomicUsize}, CausalCell, }, yield_now, }; -use futures::Poll; + use std::fmt; use std::ptr::{self, NonNull}; use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; use std::usize; /// Futures-aware semaphore. @@ -80,8 +82,8 @@ struct WaiterNode { /// See `NodeState` for more details. state: AtomicUsize, - /// Task to notify when a permit is made available. - task: AtomicTask, + /// Task to wake when a permit is made available. + waker: AtomicWaker, /// Next pointer in the queue of waiting senders. next: AtomicPtr<WaiterNode>, @@ -174,9 +176,10 @@ impl Semaphore { } /// Poll for a permit - fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> { - use futures::Async::*; - + fn poll_permit( + &self, + mut permit: Option<(&mut Context<'_>, &mut Permit)>, + ) -> Poll<Result<(), AcquireError>> { // Load the current state let mut curr = SemState::load(&self.state, Acquire); @@ -205,7 +208,7 @@ impl Semaphore { if curr.is_closed() { undo_strong!(); - return Err(AcquireError::closed()); + return Ready(Err(AcquireError::closed())); } if !next.acquire_permit(&self.stub) { @@ -214,13 +217,13 @@ impl Semaphore { debug_assert!(curr.waiter().is_some()); if maybe_strong.is_none() { - if let Some(ref mut permit) = permit { + if let Some((ref mut cx, ref mut permit)) = permit { // Get the Sender's waiter node, or initialize one let waiter = permit .waiter .get_or_insert_with(|| Arc::new(WaiterNode::new())); - waiter.register(); + waiter.register(cx); debug!(" + poll_permit -- to_queued_waiting"); @@ -228,14 +231,14 @@ impl Semaphore { debug!(" + poll_permit; waiter already queued"); // The node is alrady queued, there is no further work // to do. - return Ok(NotReady); + return Pending; } maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); } else { // If no `waiter`, then the task is not registered and there // is no further work to do. - return Ok(NotReady); + return Pending; } } @@ -261,14 +264,14 @@ impl Semaphore { debug!(" + poll_permit -- waiter pushed"); - return Ok(NotReady); + return Pending; } None => { debug!(" + poll_permit -- permit acquired"); undo_strong!(); - return Ok(Ready(())); + return Ready(Ok(())); } } } @@ -571,42 +574,42 @@ impl Permit { /// Try to acquire the permit. If no permits are available, the current task /// is notified once a new permit becomes available. - pub fn poll_acquire(&mut self, semaphore: &Semaphore) -> Poll<(), AcquireError> { - use futures::Async::*; - + pub fn poll_acquire( + &mut self, + cx: &mut Context<'_>, + semaphore: &Semaphore, + ) -> Poll<Result<(), AcquireError>> { match self.state { PermitState::Idle => {} PermitState::Waiting => { let waiter = self.waiter.as_ref().unwrap(); - if waiter.acquire()? { + if waiter.acquire(cx)? { self.state = PermitState::Acquired; - return Ok(Ready(())); + return Ready(Ok(())); } else { - return Ok(NotReady); + return Pending; } } PermitState::Acquired => { - return Ok(Ready(())); + return Ready(Ok(())); } } - match semaphore.poll_permit(Some(self))? { + match semaphore.poll_permit(Some((cx, self)))? { Ready(v) => { self.state = PermitState::Acquired; - Ok(Ready(v)) + Ready(Ok(v)) } - NotReady => { + Pending => { self.state = PermitState::Waiting; - Ok(NotReady) + Pending } } } /// Try to acquire the permit. pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { - use futures::Async::*; - match self.state { PermitState::Idle => {} PermitState::Waiting => { @@ -629,7 +632,7 @@ impl Permit { self.state = PermitState::Acquired; Ok(()) } - NotReady => Err(TryAcquireError::no_permits()), + Pending => Err(TryAcquireError::no_permits()), } } @@ -748,17 +751,17 @@ impl WaiterNode { fn new() -> WaiterNode { WaiterNode { state: AtomicUsize::new(NodeState::new().to_usize()), - task: AtomicTask::new(), + waker: AtomicWaker::new(), next: AtomicPtr::new(ptr::null_mut()), } } - fn acquire(&self) -> Result<bool, AcquireError> { + fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> { if self.acquire2()? { return Ok(true); } - self.task.register(); + self.waker.register_by_ref(cx.waker()); self.acquire2() } @@ -773,8 +776,8 @@ impl WaiterNode { } } - fn register(&self) { - self.task.register() + fn register(&self, cx: &mut Context<'_>) { + self.waker.register_by_ref(cx.waker()) } /// Returns `true` if the permit has been acquired @@ -860,7 +863,7 @@ impl WaiterNode { Ok(_) => match curr { QueuedWaiting => { debug!(" + notify -- task notified"); - self.task.notify(); + self.waker.wake(); return true; } other => { |