diff options
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/sync/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/mutex.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/semaphore.rs | 1089 | ||||
-rw-r--r-- | tokio/src/sync/semaphore_ll.rs | 1070 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_semaphore_ll.rs (renamed from tokio/src/sync/tests/loom_semaphore.rs) | 2 | ||||
-rw-r--r-- | tokio/src/sync/tests/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/sync/tests/semaphore_ll.rs (renamed from tokio/src/sync/tests/semaphore.rs) | 2 | ||||
-rw-r--r-- | tokio/tests/sync_semaphore.rs | 81 |
10 files changed, 1221 insertions, 1047 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index df31f8df..a05accbf 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -26,7 +26,9 @@ cfg_sync! { pub mod oneshot; - pub(crate) mod semaphore; + pub(crate) mod semaphore_ll; + mod semaphore; + pub use semaphore::{Semaphore, SemaphorePermit}; mod task; pub(crate) use task::AtomicWaker; @@ -48,7 +50,7 @@ cfg_not_sync! { cfg_signal! { pub(crate) mod mpsc; - pub(crate) mod semaphore; + pub(crate) mod semaphore_ll; } } diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 5cca1596..7294e4d5 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,6 +1,6 @@ use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; -use crate::sync::semaphore; +use crate::sync::semaphore_ll as semaphore; use std::fmt; use std::task::{Context, Poll}; diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index b6e94d5a..7a15e8b3 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -382,7 +382,7 @@ impl<T, S> Drop for Chan<T, S> { } } -use crate::sync::semaphore::TryAcquireError; +use crate::sync::semaphore_ll::TryAcquireError; impl From<TryAcquireError> for TrySendError { fn from(src: TryAcquireError) -> TrySendError { @@ -398,9 +398,9 @@ impl From<TryAcquireError> for TrySendError { // ===== impl Semaphore for (::Semaphore, capacity) ===== -use crate::sync::semaphore::Permit; +use crate::sync::semaphore_ll::Permit; -impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { +impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { type Permit = Permit; fn new_permit() -> Permit { diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index bee00df4..ec59d695 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -35,7 +35,7 @@ //! [`MutexGuard`]: struct.MutexGuard.html use crate::future::poll_fn; -use crate::sync::semaphore; +use crate::sync::semaphore_ll as semaphore; use std::cell::UnsafeCell; use std::error::Error; @@ -74,7 +74,7 @@ unsafe impl<T> Sync for Mutex<T> where T: Send {} unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {} /// An enumeration of possible errors associated with a `TryLockResult` -/// which can occur while trying to aquire a lock from the `try_lock` +/// which can occur while trying to acquire a lock from the `try_lock` /// method on a `Mutex`. #[derive(Debug)] pub enum TryLockError { @@ -129,7 +129,7 @@ impl<T> Mutex<T> { guard } - /// Try to aquire the lock + /// Try to acquire the lock pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> { let mut permit = semaphore::Permit::new(); match permit.try_acquire(&self.s) { diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index dab73a09..c1e9ef3e 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,1070 +1,91 @@ -#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] +use super::semaphore_ll as ll; // low level implementation +use crate::future::poll_fn; -//! Thread-safe, asynchronous counting semaphore. -//! -//! A `Semaphore` instance holds a set of permits. Permits are used to -//! synchronize access to a shared resource. -//! -//! Before accessing the shared resource, callers acquire a permit from the -//! semaphore. Once the permit is acquired, the caller then enters the critical -//! section. If no permits are available, then acquiring the semaphore returns -//! `Pending`. The task is woken once a permit becomes available. - -use crate::loom::{ - cell::CausalCell, - future::AtomicWaker, - sync::atomic::{AtomicPtr, AtomicUsize}, - thread, -}; - -use std::fmt; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; -use std::sync::Arc; -use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; -use std::usize; - -/// Futures-aware semaphore. -pub(crate) struct Semaphore { - /// Tracks both the waiter queue tail pointer and the number of remaining - /// permits. - state: AtomicUsize, - - /// waiter queue head pointer. - head: CausalCell<NonNull<WaiterNode>>, - - /// Coordinates access to the queue head. - rx_lock: AtomicUsize, - - /// Stub waiter node used as part of the MPSC channel algorithm. - stub: Box<WaiterNode>, -} - -/// A semaphore permit -/// -/// Tracks the lifecycle of a semaphore permit. +/// Counting semaphore performing asynchronous permit aquisition. /// -/// An instance of `Permit` is intended to be used with a **single** instance of -/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore -/// instances will result in unexpected behavior. +/// A semaphore maintains a set of permits. Permits are used to synchronize +/// access to a shared resource. A semaphore differs from a mutex in that it +/// can allow more than one concurrent caller to access the shared resource at a +/// time. /// -/// `Permit` does **not** release the permit back to the semaphore on drop. It -/// is the user's responsibility to ensure that `Permit::release` is called -/// before dropping the permit. +/// When `acquire` is called and the semaphore has remaining permits, the +/// function immediately returns a permit. However, if no remaining permits are +/// available, `acquire` (asynchronously) waits until an outstanding permit is +/// dropped. At this point, the freed permit is assigned to the caller. #[derive(Debug)] -pub(crate) struct Permit { - waiter: Option<Arc<WaiterNode>>, - state: PermitState, +pub struct Semaphore { + /// The low level semaphore + ll_sem: ll::Semaphore, } -/// Error returned by `Permit::poll_acquire`. +/// A permit from the semaphore +#[must_use] #[derive(Debug)] -pub(crate) struct AcquireError(()); - -/// Error returned by `Permit::try_acquire`. -#[derive(Debug)] -pub(crate) struct TryAcquireError { - kind: ErrorKind, -} - -#[derive(Debug)] -enum ErrorKind { - Closed, - NoPermits, -} - -/// Node used to notify the semaphore waiter when permit is available. -#[derive(Debug)] -struct WaiterNode { - /// Stores waiter state. - /// - /// See `NodeState` for more details. - state: AtomicUsize, - - /// Task to wake when a permit is made available. - waker: AtomicWaker, - - /// Next pointer in the queue of waiting senders. - next: AtomicPtr<WaiterNode>, +pub struct SemaphorePermit<'a> { + sem: &'a Semaphore, + // the low level permit + ll_permit: ll::Permit, } -/// Semaphore state -/// -/// The 2 low bits track the modes. +/// Error returned from the [`Semaphore::try_acquire`] function. /// -/// - Closed -/// - Full +/// A `try_acquire` operation can only fail if the semaphore has no available +/// permits. /// -/// When not full, the rest of the `usize` tracks the total number of messages -/// in the channel. When full, the rest of the `usize` is a pointer to the tail -/// of the "waiting senders" queue. -#[derive(Copy, Clone)] -struct SemState(usize); - -/// Permit state -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -enum PermitState { - /// The permit has not been requested. - Idle, - - /// Currently waiting for a permit to be made available and assigned to the - /// waiter. - Waiting, - - /// The permit has been acquired. - Acquired, -} - -/// Waiter node state -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -#[repr(usize)] -enum NodeState { - /// Not waiting for a permit and the node is not in the wait queue. - /// - /// This is the initial state. - Idle = 0, - - /// Not waiting for a permit but the node is in the wait queue. - /// - /// This happens when the waiter has previously requested a permit, but has - /// since canceled the request. The node cannot be removed by the waiter, so - /// this state informs the receiver to skip the node when it pops it from - /// the wait queue. - Queued = 1, - - /// Waiting for a permit and the node is in the wait queue. - QueuedWaiting = 2, - - /// The waiter has been assigned a permit and the node has been removed from - /// the queue. - Assigned = 3, - - /// The semaphore has been closed. No more permits will be issued. - Closed = 4, -} - -// ===== impl Semaphore ===== +/// [`Semaphore::try_acquire`]: Semaphore::try_acquire +#[derive(Debug)] +pub struct TryAcquireError(()); impl Semaphore { /// Creates a new semaphore with the initial number of permits - /// - /// # Panics - /// - /// Panics if `permits` is zero. - pub(crate) fn new(permits: usize) -> Semaphore { - let stub = Box::new(WaiterNode::new()); - let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); - - // Allocations are aligned - debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0); - - let state = SemState::new(permits, &stub); - - Semaphore { - state: AtomicUsize::new(state.to_usize()), - head: CausalCell::new(ptr), - rx_lock: AtomicUsize::new(0), - stub, + pub fn new(permits: usize) -> Self { + Self { + ll_sem: ll::Semaphore::new(permits), } } /// Returns the current number of available permits - pub(crate) fn available_permits(&self) -> usize { - let curr = SemState::load(&self.state, Acquire); - curr.available_permits() - } - - /// Poll for a permit - fn poll_permit( - &self, - mut permit: Option<(&mut Context<'_>, &mut Permit)>, - ) -> Poll<Result<(), AcquireError>> { - // Load the current state - let mut curr = SemState::load(&self.state, Acquire); - - // Tracks a *mut WaiterNode representing an Arc clone. - // - // This avoids having to bump the ref count unless required. - let mut maybe_strong: Option<NonNull<WaiterNode>> = None; - - macro_rules! undo_strong { - () => { - if let Some(waiter) = maybe_strong { - // The waiter was cloned, but never got queued. - // Before entering `poll_permit`, the waiter was in the - // `Idle` state. We must transition the node back to the - // idle state. - let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) }; - waiter.revert_to_idle(); - } - }; - } - - loop { - let mut next = curr; - - if curr.is_closed() { - undo_strong!(); - return Ready(Err(AcquireError::closed())); - } - - if !next.acquire_permit(&self.stub) { - debug_assert!(curr.waiter().is_some()); - - if maybe_strong.is_none() { - if let Some((ref mut cx, ref mut permit)) = permit { - // Get the Sender's waiter node, or initialize one - let waiter = permit - .waiter - .get_or_insert_with(|| Arc::new(WaiterNode::new())); - - waiter.register(cx); - - if !waiter.to_queued_waiting() { - // The node is alrady queued, there is no further work - // to do. - return Pending; - } - - maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); - } else { - // If no `waiter`, then the task is not registered and there - // is no further work to do. - return Pending; - } - } - - next.set_waiter(maybe_strong.unwrap()); - } - - debug_assert_ne!(curr.0, 0); - debug_assert_ne!(next.0, 0); - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => { - match curr.waiter() { - Some(prev_waiter) => { - let waiter = maybe_strong.unwrap(); - - // Finish pushing - unsafe { - prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); - } - - return Pending; - } - None => { - undo_strong!(); - - return Ready(Ok(())); - } - } - } - Err(actual) => { - curr = actual; - } - } - } - } - - /// Close the semaphore. This prevents the semaphore from issuing new - /// permits and notifies all pending waiters. - pub(crate) fn close(&self) { - // Acquire the `rx_lock`, setting the "closed" flag on the lock. - let prev = self.rx_lock.fetch_or(1, AcqRel); - - if prev != 0 { - // Another thread has the lock and will be responsible for notifying - // pending waiters. - return; - } - - self.add_permits_locked(0, true); + pub fn available_permits(&self) -> usize { + self.ll_sem.available_permits() } /// Add `n` new permits to the semaphore. - pub(crate) fn add_permits(&self, n: usize) { - if n == 0 { - return; - } - - // TODO: Handle overflow. A panic is not sufficient, the process must - // abort. - let prev = self.rx_lock.fetch_add(n << 1, AcqRel); - - if prev != 0 { - // Another thread has the lock and will be responsible for notifying - // pending waiters. - return; - } - - self.add_permits_locked(n, false); + pub fn add_permits(&self, n: usize) { + self.ll_sem.add_permits(n); } - fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { - while rem > 0 || closed { - if closed { - SemState::fetch_set_closed(&self.state, AcqRel); - } - - // Release the permits and notify - self.add_permits_locked2(rem, closed); - - let n = rem << 1; - - let actual = if closed { - let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); - closed = false; - actual - } else { - let actual = self.rx_lock.fetch_sub(n, AcqRel); - closed = actual & 1 == 1; - actual - }; - - rem = (actual >> 1) - rem; - } - } - - /// Release a specific amount of permits to the semaphore - /// - /// This function is called by `add_permits` after the add lock has been - /// acquired. - fn add_permits_locked2(&self, mut n: usize, closed: bool) { - while n > 0 || closed { - let waiter = match self.pop(n, closed) { - Some(waiter) => waiter, - None => { - return; - } - }; - - if waiter.notify(closed) { - n = n.saturating_sub(1); - } - } - } - - /// Pop a waiter - /// - /// `rem` represents the remaining number of times the caller will pop. If - /// there are no more waiters to pop, `rem` is used to set the available - /// permits. - fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> { - 'outer: loop { - unsafe { - let mut head = self.head.with(|head| *head); - let mut next_ptr = head.as_ref().next.load(Acquire); - - let stub = self.stub(); - - if head == stub { - let next = match NonNull::new(next_ptr) { - Some(next) => next, - None => { - // This loop is not part of the standard intrusive mpsc - // channel algorithm. This is where we atomically pop - // the last task and add `rem` to the remaining capacity. - // - // This modification to the pop algorithm works because, - // at this point, we have not done any work (only done - // reading). We have a *pretty* good idea that there is - // no concurrent pusher. - // - // The capacity is then atomically added by doing an - // AcqRel CAS on `state`. The `state` cell is the - // linchpin of the algorithm. - // - // By successfully CASing `head` w/ AcqRel, we ensure - // that, if any thread was racing and entered a push, we - // see that and abort pop, retrying as it is - // "inconsistent". - let mut curr = SemState::load(&self.state, Acquire); - - loop { - if curr.has_waiter(&self.stub) { - // Inconsistent - thread::yield_now(); - continue 'outer; - } - - // When closing the semaphore, nodes are popped - // with `rem == 0`. In this case, we are not - // adding permits, but notifying waiters of the - // semaphore's closed state. - if rem == 0 { - debug_assert!(curr.is_closed(), "state = {:?}", curr); - return None; - } - - let mut next = curr; - next.release_permits(rem, &self.stub); - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => return None, - Err(actual) => { - curr = actual; - } - } - } - } - }; - - self.head.with_mut(|head| *head = next); - head = next; - next_ptr = next.as_ref().next.load(Acquire); - } - - if let Some(next) = NonNull::new(next_ptr) { - self.head.with_mut(|head| *head = next); - - return Some(Arc::from_raw(head.as_ptr())); - } - - let state = SemState::load(&self.state, Acquire); - - // This must always be a pointer as the wait list is not empty. - let tail = state.waiter().unwrap(); - - if tail != head { - // Inconsistent - thread::yield_now(); - continue 'outer; - } - - self.push_stub(closed); - - next_ptr = head.as_ref().next.load(Acquire); - - if let Some(next) = NonNull::new(next_ptr) { - self.head.with_mut(|head| *head = next); - - return Some(Arc::from_raw(head.as_ptr())); - } - - // Inconsistent state, loop - thread::yield_now(); - } - } - } - - unsafe fn push_stub(&self, closed: bool) { - let stub = self.stub(); - - // Set the next pointer. This does not require an atomic operation as - // this node is not accessible. The write will be flushed with the next - // operation - stub.as_ref().next.store(ptr::null_mut(), Relaxed); - - // Update the tail to point to the new node. We need to see the previous - // node in order to update the next pointer as well as release `task` - // to any other threads calling `push`. - let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel); - - debug_assert_eq!(closed, prev.is_closed()); - - // The stub is only pushed when there are pending tasks. Because of - // this, the state must *always* be in pointer mode. - let prev = prev.waiter().unwrap(); - - // We don't want the *existing* pointer to be a stub. - debug_assert_ne!(prev, stub); - - // Release `task` to the consume end. - prev.as_ref().next.store(stub.as_ptr(), Release); - } - - fn stub(&self) -> NonNull<WaiterNode> { - unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) } - } -} - -impl fmt::Debug for Semaphore { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Semaphore") - .field("state", &SemState::load(&self.state, Relaxed)) - .field("head", &self.head.with(|ptr| ptr)) - .field("rx_lock", &self.rx_lock.load(Relaxed)) - .field("stub", &self.stub) - .finish() - } -} - -unsafe impl Send for Semaphore {} -unsafe impl Sync for Semaphore {} - -// ===== impl Permit ===== - -impl Permit { - /// Create a new `Permit`. - /// - /// The permit begins in the "unacquired" state. - pub(crate) fn new() -> Permit { - Permit { - waiter: None, - state: PermitState::Idle, - } - } - - /// Returns true if the permit has been acquired - pub(crate) fn is_acquired(&self) -> bool { - self.state == PermitState::Acquired - } - - /// Try to acquire the permit. If no permits are available, the current task - /// is notified once a new permit becomes available. - pub(crate) fn poll_acquire( - &mut self, - cx: &mut Context<'_>, - semaphore: &Semaphore, - ) -> Poll<Result<(), AcquireError>> { - match self.state { - PermitState::Idle => {} - PermitState::Waiting => { - let waiter = self.waiter.as_ref().unwrap(); - - if waiter.acquire(cx)? { - self.state = PermitState::Acquired; - return Ready(Ok(())); - } else { - return Pending; - } - } - PermitState::Acquired => { - return Ready(Ok(())); - } - } - - match semaphore.poll_permit(Some((cx, self)))? { - Ready(()) => { - self.state = PermitState::Acquired; - Ready(Ok(())) - } - Pending => { - self.state = PermitState::Waiting; - Pending - } - } - } - - /// Try to acquire the permit. - pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { - match self.state { - PermitState::Idle => {} - PermitState::Waiting => { - let waiter = self.waiter.as_ref().unwrap(); - - if waiter.acquire2().map_err(to_try_acquire)? { - self.state = PermitState::Acquired; - return Ok(()); - } else { - return Err(TryAcquireError::no_permits()); - } - } - PermitState::Acquired => { - return Ok(()); - } - } - - match semaphore.poll_permit(None).map_err(to_try_acquire)? { - Ready(()) => { - self.state = PermitState::Acquired; - Ok(()) - } - Pending => Err(TryAcquireError::no_permits()), - } - } - - /// Release a permit back to the semaphore - pub(crate) fn release(&mut self, semaphore: &Semaphore) { - if self.forget2() { - semaphore.add_permits(1); - } - } - - /// Forget the permit **without** releasing it back to the semaphore. - /// - /// After calling `forget`, `poll_acquire` is able to acquire new permit - /// from the sempahore. - /// - /// Repeatedly calling `forget` without associated calls to `add_permit` - /// will result in the semaphore losing all permits. - pub(crate) fn forget(&mut self) { - self.forget2(); - } - - /// Returns `true` if the permit was acquired - fn forget2(&mut self) -> bool { - match self.state { - PermitState::Idle => false, - PermitState::Waiting => { - let ret = self.waiter.as_ref().unwrap().cancel_interest(); - self.state = PermitState::Idle; - ret - } - PermitState::Acquired => { - self.state = PermitState::Idle; - true - } - } - } -} - -impl Default for Permit { - fn default() -> Self { - Self::new() - } -} - -// ===== impl AcquireError ==== - -impl AcquireError { - fn closed() -> AcquireError { - AcquireError(()) - } -} - -fn to_try_acquire(_: AcquireError) -> TryAcquireError { - TryAcquireError::closed() -} - -impl fmt::Display for AcquireError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "semaphore closed") - } -} - -impl ::std::error::Error for AcquireError {} - -// ===== impl TryAcquireError ===== - -impl TryAcquireError { - fn closed() -> TryAcquireError { - TryAcquireError { - kind: ErrorKind::Closed, - } - } - - fn no_permits() -> TryAcquireError { - TryAcquireError { - kind: ErrorKind::NoPermits, - } - } - - /// Returns true if the error was caused by a closed semaphore. - pub(crate) fn is_closed(&self) -> bool { - match self.kind { - ErrorKind::Closed => true, - _ => false, - } - } - - /// Returns true if the error was caused by calling `try_acquire` on a - /// semaphore with no available permits. - pub(crate) fn is_no_permits(&self) -> bool { - match self.kind { - ErrorKind::NoPermits => true, - _ => false, - } - } -} - -impl fmt::Display for TryAcquireError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = match self.kind { - ErrorKind::Closed => "semaphore closed", - ErrorKind::NoPermits => "no permits available", + /// Acquire permit from the semaphore + pub async fn acquire(&self) -> SemaphorePermit<'_> { + let mut permit = SemaphorePermit { + sem: &self, + ll_permit: ll::Permit::new(), }; - write!(fmt, "{}", descr) - } -} - -impl ::std::error::Error for TryAcquireError {} - -// ===== impl WaiterNode ===== - -impl WaiterNode { - fn new() -> WaiterNode { - WaiterNode { - state: AtomicUsize::new(NodeState::new().to_usize()), - waker: AtomicWaker::new(), - next: AtomicPtr::new(ptr::null_mut()), - } - } - - fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> { - if self.acquire2()? { - return Ok(true); - } - - self.waker.register_by_ref(cx.waker()); - - self.acquire2() - } - - fn acquire2(&self) -> Result<bool, AcquireError> { - use self::NodeState::*; - - match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) { - Ok(_) => Ok(true), - Err(Closed) => Err(AcquireError::closed()), - Err(_) => Ok(false), - } - } - - fn register(&self, cx: &mut Context<'_>) { - self.waker.register_by_ref(cx.waker()) - } - - /// Returns `true` if the permit has been acquired - fn cancel_interest(&self) -> bool { - use self::NodeState::*; - - match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) { - // Successfully removed interest from the queued node. The permit - // has not been assigned to the node. - Ok(_) => false, - // The semaphore has been closed, there is no further action to - // take. - Err(Closed) => false, - // The permit has been assigned. It must be acquired in order to - // be released back to the semaphore. - Err(Assigned) => { - match self.acquire2() { - Ok(true) => true, - // Not a reachable state - Ok(false) => panic!(), - // The semaphore has been closed, no further action to take. - Err(_) => false, - } - } - Err(state) => panic!("unexpected state = {:?}", state), - } - } - - /// Transition the state to `QueuedWaiting`. - /// - /// This step can only happen from `Queued` or from `Idle`. - /// - /// Returns `true` if transitioning into a queued state. - fn to_queued_waiting(&self) -> bool { - use self::NodeState::*; - - let mut curr = NodeState::load(&self.state, Acquire); - - loop { - debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr); - let next = QueuedWaiting; - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => { - if curr.is_queued() { - return false; - } else { - // Transitioned to queued, reset next pointer - self.next.store(ptr::null_mut(), Relaxed); - return true; - } - } - Err(actual) => { - curr = actual; - } - } - } - } - - /// Notify the waiter - /// - /// Returns `true` if the waiter accepts the notification - fn notify(&self, closed: bool) -> bool { - use self::NodeState::*; - - // Assume QueuedWaiting state - let mut curr = QueuedWaiting; - - loop { - let next = match curr { - Queued => Idle, - QueuedWaiting => { - if closed { - Closed - } else { - Assigned - } - } - actual => panic!("actual = {:?}", actual), - }; - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => match curr { - QueuedWaiting => { - self.waker.wake(); - return true; - } - _ => return false, - }, - Err(actual) => curr = actual, - } - } - } - - fn revert_to_idle(&self) { - use self::NodeState::Idle; - - // There are no other handles to the node - NodeState::store(&self.state, Idle, Relaxed); - } - - #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293 - fn into_non_null(self: Arc<WaiterNode>) -> NonNull<WaiterNode> { - let ptr = Arc::into_raw(self); - unsafe { NonNull::new_unchecked(ptr as *mut _) } - } -} - -// ===== impl State ===== - -/// Flag differentiating between available permits and waiter pointers. -/// -/// If we assume pointers are properly aligned, then the least significant bit -/// will always be zero. So, we use that bit to track if the value represents a -/// number. -const NUM_FLAG: usize = 0b01; - -const CLOSED_FLAG: usize = 0b10; - -const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT; - -/// When representing "numbers", the state has to be shifted this much (to get -/// rid of the flag bit). -const NUM_SHIFT: usize = 2; - -impl SemState { - /// Returns a new default `State` value. - fn new(permits: usize, stub: &WaiterNode) -> SemState { - assert!(permits <= MAX_PERMITS); - - if permits > 0 { - |