summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/semaphore_ll.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/semaphore_ll.rs')
-rw-r--r--tokio/src/sync/semaphore_ll.rs1221
1 files changed, 0 insertions, 1221 deletions
diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs
deleted file mode 100644
index f044095f..00000000
--- a/tokio/src/sync/semaphore_ll.rs
+++ /dev/null
@@ -1,1221 +0,0 @@
-#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
-
-//! Thread-safe, asynchronous counting semaphore.
-//!
-//! A `Semaphore` instance holds a set of permits. Permits are used to
-//! synchronize access to a shared resource.
-//!
-//! Before accessing the shared resource, callers acquire a permit from the
-//! semaphore. Once the permit is acquired, the caller then enters the critical
-//! section. If no permits are available, then acquiring the semaphore returns
-//! `Pending`. The task is woken once a permit becomes available.
-
-use crate::loom::cell::UnsafeCell;
-use crate::loom::future::AtomicWaker;
-use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
-use crate::loom::thread;
-
-use std::cmp;
-use std::fmt;
-use std::ptr::{self, NonNull};
-use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
-use std::task::Poll::{Pending, Ready};
-use std::task::{Context, Poll};
-use std::usize;
-
-/// Futures-aware semaphore.
-pub(crate) struct Semaphore {
- /// Tracks both the waiter queue tail pointer and the number of remaining
- /// permits.
- state: AtomicUsize,
-
- /// waiter queue head pointer.
- head: UnsafeCell<NonNull<Waiter>>,
-
- /// Coordinates access to the queue head.
- rx_lock: AtomicUsize,
-
- /// Stub waiter node used as part of the MPSC channel algorithm.
- stub: Box<Waiter>,
-}
-
-/// A semaphore permit
-///
-/// Tracks the lifecycle of a semaphore permit.
-///
-/// An instance of `Permit` is intended to be used with a **single** instance of
-/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
-/// instances will result in unexpected behavior.
-///
-/// `Permit` does **not** release the permit back to the semaphore on drop. It
-/// is the user's responsibility to ensure that `Permit::release` is called
-/// before dropping the permit.
-#[derive(Debug)]
-pub(crate) struct Permit {
- waiter: Option<Box<Waiter>>,
- state: PermitState,
-}
-
-/// Error returned by `Permit::poll_acquire`.
-#[derive(Debug)]
-pub(crate) struct AcquireError(());
-
-/// Error returned by `Permit::try_acquire`.
-#[derive(Debug)]
-pub(crate) enum TryAcquireError {
- Closed,
- NoPermits,
-}
-
-/// Node used to notify the semaphore waiter when permit is available.
-#[derive(Debug)]
-struct Waiter {
- /// Stores waiter state.
- ///
- /// See `WaiterState` for more details.
- state: AtomicUsize,
-
- /// Task to wake when a permit is made available.
- waker: AtomicWaker,
-
- /// Next pointer in the queue of waiting senders.
- next: AtomicPtr<Waiter>,
-}
-
-/// Semaphore state
-///
-/// The 2 low bits track the modes.
-///
-/// - Closed
-/// - Full
-///
-/// When not full, the rest of the `usize` tracks the total number of messages
-/// in the channel. When full, the rest of the `usize` is a pointer to the tail
-/// of the "waiting senders" queue.
-#[derive(Copy, Clone)]
-struct SemState(usize);
-
-/// Permit state
-#[derive(Debug, Copy, Clone)]
-enum PermitState {
- /// Currently waiting for permits to be made available and assigned to the
- /// waiter.
- Waiting(u16),
-
- /// The number of acquired permits
- Acquired(u16),
-}
-
-/// State for an individual waker node
-#[derive(Debug, Copy, Clone)]
-struct WaiterState(usize);
-
-/// Waiter node is in the semaphore queue
-const QUEUED: usize = 0b001;
-
-/// Semaphore has been closed, no more permits will be issued.
-const CLOSED: usize = 0b10;
-
-/// The permit that owns the `Waiter` dropped.
-const DROPPED: usize = 0b100;
-
-/// Represents "one requested permit" in the waiter state
-const PERMIT_ONE: usize = 0b1000;
-
-/// Masks the waiter state to only contain bits tracking number of requested
-/// permits.
-const PERMIT_MASK: usize = usize::MAX - (PERMIT_ONE - 1);
-
-/// How much to shift a permit count to pack it into the waker state
-const PERMIT_SHIFT: u32 = PERMIT_ONE.trailing_zeros();
-
-/// Flag differentiating between available permits and waiter pointers.
-///
-/// If we assume pointers are properly aligned, then the least significant bit
-/// will always be zero. So, we use that bit to track if the value represents a
-/// number.
-const NUM_FLAG: usize = 0b01;
-
-/// Signal the semaphore is closed
-const CLOSED_FLAG: usize = 0b10;
-
-/// Maximum number of permits a semaphore can manage
-const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;
-
-/// When representing "numbers", the state has to be shifted this much (to get
-/// rid of the flag bit).
-const NUM_SHIFT: usize = 2;
-
-// ===== impl Semaphore =====
-
-impl Semaphore {
- /// Creates a new semaphore with the initial number of permits
- ///
- /// # Panics
- ///
- /// Panics if `permits` is zero.
- pub(crate) fn new(permits: usize) -> Semaphore {
- let stub = Box::new(Waiter::new());
- let ptr = NonNull::from(&*stub);
-
- // Allocations are aligned
- debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);
-
- let state = SemState::new(permits, &stub);
-
- Semaphore {
- state: AtomicUsize::new(state.to_usize()),
- head: UnsafeCell::new(ptr),
- rx_lock: AtomicUsize::new(0),
- stub,
- }
- }
-
- /// Returns the current number of available permits
- pub(crate) fn available_permits(&self) -> usize {
- let curr = SemState(self.state.load(Acquire));
- curr.available_permits()
- }
-
- /// Tries to acquire the requested number of permits, registering the waiter
- /// if not enough permits are available.
- fn poll_acquire(
- &self,
- cx: &mut Context<'_>,
- num_permits: u16,
- permit: &mut Permit,
- ) -> Poll<Result<(), AcquireError>> {
- self.poll_acquire2(num_permits, || {
- let waiter = permit.waiter.get_or_insert_with(|| Box::new(Waiter::new()));
-
- waiter.waker.register_by_ref(cx.waker());
-
- Some(NonNull::from(&**waiter))
- })
- }
-
- fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
- match self.poll_acquire2(num_permits, || None) {
- Poll::Ready(res) => res.map_err(to_try_acquire),
- Poll::Pending => Err(TryAcquireError::NoPermits),
- }
- }
-
- /// Polls for a permit
- ///
- /// Tries to acquire available permits first. If unable to acquire a
- /// sufficient number of permits, the caller's waiter is pushed onto the
- /// semaphore's wait queue.
- fn poll_acquire2<F>(
- &self,
- num_permits: u16,
- mut get_waiter: F,
- ) -> Poll<Result<(), AcquireError>>
- where
- F: FnMut() -> Option<NonNull<Waiter>>,
- {
- let num_permits = num_permits as usize;
-
- // Load the current state
- let mut curr = SemState(self.state.load(Acquire));
-
- // Saves a ref to the waiter node
- let mut maybe_waiter: Option<NonNull<Waiter>> = None;
-
- /// Used in branches where we attempt to push the waiter into the wait
- /// queue but fail due to permits becoming available or the wait queue
- /// transitioning to "closed". In this case, the waiter must be
- /// transitioned back to the "idle" state.
- macro_rules! revert_to_idle {
- () => {
- if let Some(waiter) = maybe_waiter {
- unsafe { waiter.as_ref() }.revert_to_idle();
- }
- };
- }
-
- loop {
- let mut next = curr;
-
- if curr.is_closed() {
- revert_to_idle!();
- return Ready(Err(AcquireError::closed()));
- }
-
- let acquired = next.acquire_permits(num_permits, &self.stub);
-
- if !acquired {
- // There are not enough available permits to satisfy the
- // request. The permit transitions to a waiting state.
- debug_assert!(curr.waiter().is_some() || curr.available_permits() < num_permits);
-
- if let Some(waiter) = maybe_waiter.as_ref() {
- // Safety: the caller owns the waiter.
- let w = unsafe { waiter.as_ref() };
- w.set_permits_to_acquire(num_permits - curr.available_permits());
- } else {
- // Get the waiter for the permit.
- if let Some(waiter) = get_waiter() {
- // Safety: the caller owns the waiter.
- let w = unsafe { waiter.as_ref() };
-
- // If there are any currently available permits, the
- // waiter acquires those immediately and waits for the
- // remaining permits to become available.
- if !w.to_queued(num_permits - curr.available_permits()) {
- // The node is alrady queued, there is no further work
- // to do.
- return Pending;
- }
-
- maybe_waiter = Some(waiter);
- } else {
- // No waiter, this indicates the caller does not wish to
- // "wait", so there is nothing left to do.
- return Pending;
- }
- }
-
- next.set_waiter(maybe_waiter.unwrap());
- }
-
- debug_assert_ne!(curr.0, 0);
- debug_assert_ne!(next.0, 0);
-
- match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => {
- if acquired {
- // Successfully acquire permits **without** queuing the
- // waiter node. The waiter node is not currently in the
- // queue.
- revert_to_idle!();
- return Ready(Ok(()));
- } else {
- // The node is pushed into the queue, the final step is
- // to set the node's "next" pointer to return the wait
- // queue into a consistent state.
-
- let prev_waiter =
- curr.waiter().unwrap_or_else(|| NonNull::from(&*self.stub));
-
- let waiter = maybe_waiter.unwrap();
-
- // Link the nodes.
- //
- // Safety: the mpsc algorithm guarantees the old tail of
- // the queue is not removed from the queue during the
- // push process.
- unsafe {
- prev_waiter.as_ref().store_next(waiter);
- }
-
- return Pending;
- }
- }
- Err(actual) => {
- curr = SemState(actual);
- }
- }
- }
- }
-
- /// Closes the semaphore. This prevents the semaphore from issuing new
- /// permits and notifies all pending waiters.
- pub(crate) fn close(&self) {
- // Acquire the `rx_lock`, setting the "closed" flag on the lock.
- let prev = self.rx_lock.fetch_or(1, AcqRel);
-
- if prev != 0 {
- // Another thread has the lock and will be responsible for notifying
- // pending waiters.
- return;
- }
-
- self.add_permits_locked(0, true);
- }
- /// Adds `n` new permits to the semaphore.
- ///
- /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
- pub(crate) fn add_permits(&self, n: usize) {
- if n == 0 {
- return;
- }
-
- // TODO: Handle overflow. A panic is not sufficient, the process must
- // abort.
- let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
-
- if prev != 0 {
- // Another thread has the lock and will be responsible for notifying
- // pending waiters.
- return;
- }
-
- self.add_permits_locked(n, false);
- }
-
- fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
- while rem > 0 || closed {
- if closed {
- SemState::fetch_set_closed(&self.state, AcqRel);
- }
-
- // Release the permits and notify
- self.add_permits_locked2(rem, closed);
-
- let n = rem << 1;
-
- let actual = if closed {
- let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
- closed = false;
- actual
- } else {
- let actual = self.rx_lock.fetch_sub(n, AcqRel);
- closed = actual & 1 == 1;
- actual
- };
-
- rem = (actual >> 1) - rem;
- }
- }
-
- /// Releases a specific amount of permits to the semaphore
- ///
- /// This function is called by `add_permits` after the add lock has been
- /// acquired.
- fn add_permits_locked2(&self, mut n: usize, closed: bool) {
- // If closing the semaphore, we want to drain the entire queue. The
- // number of permits being assigned doesn't matter.
- if closed {
- n = usize::MAX;
- }
-
- 'outer: while n > 0 {
- unsafe {
- let mut head = self.head.with(|head| *head);
- let mut next_ptr = head.as_ref().next.load(Acquire);
-
- let stub = self.stub();
-
- if head == stub {
- // The stub node indicates an empty queue. Any remaining
- // permits get assigned back to the semaphore.
- let next = match NonNull::new(next_ptr) {
- Some(next) => next,
- None => {
- // This loop is not part of the standard intrusive mpsc
- // channel algorithm. This is where we atomically pop
- // the last task and add `n` to the remaining capacity.
- //
- // This modification to the pop algorithm works because,
- // at this point, we have not done any work (only done
- // reading). We have a *pretty* good idea that there is
- // no concurrent pusher.
- //
- // The capacity is then atomically added by doing an
- // AcqRel CAS on `state`. The `state` cell is the
- // linchpin of the algorithm.
- //
- // By successfully CASing `head` w/ AcqRel, we ensure
- // that, if any thread was racing and entered a push, we
- // see that and abort pop, retrying as it is
- // "inconsistent".
- let mut curr = SemState::load(&self.state, Acquire);
-
- loop {
- if curr.has_waiter(&self.stub) {
- // A waiter is being added concurrently.
- // This is the MPSC queue's "inconsistent"
- // state and we must loop and try again.
- thread::yield_now();
- continue 'outer;
- }
-
- // If closing, nothing more to do.
- if closed {
- debug_assert!(curr.is_closed(), "state = {:?}", curr);
- return;
- }
-
- let mut next = curr;
- next.release_permits(n, &self.stub);
-
- match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => return,
- Err(actual) => {
- curr = SemState(actual);
- }
- }
- }
- }
- };
-
- self.head.with_mut(|head| *head = next);
- head = next;
- next_ptr = next.as_ref().next.load(Acquire);
- }
-
- // `head` points to a waiter assign permits to the waiter. If
- // all requested permits are satisfied, then we can continue,
- // otherwise the node stays in the wait queue.
- if !head.as_ref().assign_permits(&mut n, closed) {
- assert_eq!(n, 0);
- return;
- }
-
- if let Some(next) = NonNull::new(next_ptr) {
- self.head.with_mut(|head| *head = next);
-
- self.remove_queued(head, closed);
- continue 'outer;
- }
-
- let state = SemState::load(&self.state, Acquire);
-
- // This must always be a pointer as the wait list is not empty.
- let tail = state.waiter().unwrap();
-
- if tail != head {
- // Inconsistent
- thread::yield_now();
- continue 'outer;
- }
-
- self.push_stub(closed);
-
- next_ptr = head.as_ref().next.load(Acquire);
-
- if let Some(next) = NonNull::new(next_ptr) {
- self.head.with_mut(|head| *head = next);
-
- self.remove_queued(head, closed);
- continue 'outer;
- }
-
- // Inconsistent state, loop
- thread::yield_now();
- }
- }
- }
-
- /// The wait node has had all of its permits assigned and has been removed
- /// from the wait queue.
- ///
- /// Attempt to remove the QUEUED bit from the node. If additional permits
- /// are concurrently requested, the node must be pushed back into the wait
- /// queued.
- fn remove_queued(&self, waiter: NonNull<Waiter>, closed: bool) {
- let mut curr = WaiterState(unsafe { waiter.as_ref() }.state.load(Acquire));
-
- loop {
- if curr.is_dropped() {
- // The Permit dropped, it is on us to release the memory
- let _ = unsafe { Box::from_raw(waiter.as_ptr()) };
- return;
- }
-
- // The node is removed from the queue. We attempt to unset the
- // queued bit, but concurrently the waiter has requested more
- // permits. When the waiter requested more permits, it saw the
- // queued bit set so took no further action. This requires us to
- // push the node back into the queue.
- if curr.permits_to_acquire() > 0 {
- // More permits are requested. The waiter must be re-queued
- unsafe {
- self.push_waiter(waiter, closed);
- }
- return;
- }
-
- let mut next = curr;
- next.unset_queued();
-
- let w = unsafe { waiter.as_ref() };
-
- match w.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => return,
- Err(actual) => {
- curr = WaiterState(actual);
- }
- }
- }
- }
-
- unsafe fn push_stub(&self, closed: bool) {
- self.push_waiter(self.stub(), closed);
- }
-
- unsafe fn push_waiter(&self, waiter: NonNull<Waiter>, closed: bool) {
- // Set the next pointer. This does not require an atomic operation as
- // this node is not accessible. The write will be flushed with the next
- // operation
- waiter.as_ref().next.store(ptr::null_mut(), Relaxed);
-
- // Update the tail to point to the new node. We need to see the previous
- // node in order to update the next pointer as well as release `task`
- // to any other threads calling `push`.
- let next = SemState::new_ptr(waiter, closed);
- let prev = SemState(self.state.swap(next.0, AcqRel));
-
- debug_assert_eq!(closed, prev.is_closed());
-
- // This function is only called when there are pending tasks. Because of
- // this, the state must *always* be in pointer mode.
- let prev = prev.waiter().unwrap();
-
- // No cycles plz
- debug_assert_ne!(prev, waiter);
-
- // Release `task` to the consume end.
- prev.as_ref().next.store(waiter.as_ptr(), Release);
- }
-
- fn stub(&self) -> NonNull<Waiter> {
- unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
- }
-}
-
-impl Drop for Semaphore {
- fn drop(&mut self) {
- self.close();
- }
-}
-
-impl fmt::Debug for Semaphore {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Semaphore")
- .field("state", &SemState::load(&self.state, Relaxed))
- .field("head", &self.head.with(|ptr| ptr))
- .field("rx_lock", &self.rx_lock.load(Relaxed))
- .field("stub", &self.stub)
- .finish()
- }
-}
-
-unsafe impl Send for Semaphore {}
-unsafe impl Sync for Semaphore {}
-
-// ===== impl Permit =====
-
-impl Permit {
- /// Creates a new `Permit`.
- ///
- /// The permit begins in the "unacquired" state.
- pub(crate) fn new() -> Permit {
- use PermitState::Acquired;
-
- Permit {
- waiter: None,
- state: Acquired(0),
- }
- }
-
- /// Returns `true` if the permit has been acquired
- #[allow(dead_code)] // may be used later
- pub(crate) fn is_acquired(&self) -> bool {
- match self.state {
- PermitState::Acquired(num) if num > 0 => true,
- _ => false,
- }
- }
-
- /// Tries to acquire the permit. If no permits are available, the current task
- /// is notified once a new permit becomes available.
- pub(crate) fn poll_acquire(
- &mut self,
- cx: &mut Context<'_>,
- num_permits: u16,
- semaphore: &Semaphore,
- ) -> Poll<Result<(), AcquireError>> {
- use std::cmp::Ordering::*;
- use PermitState::*;
-
- match self.state {
- Waiting(requested) => {
- // There must be a waiter
- let waiter = self.waiter.as_ref().unwrap();
-
- match requested.cmp(&num_permits) {
- Less => {
- let delta = num_permits - requested;
-
- // Request additional permits. If the waiter has been
- // dequeued, it must be re-queued.
- if !waiter.try_inc_permits_to_acquire(delta as usize) {
- let waiter = NonNull::from(&**waiter);
-
- // Ignore the result. The check for
- // `permits_to_acquire()` will converge the state as
- // needed
- let _ = semaphore.poll_acquire2(delta, || Some(waiter))?;
- }
-
- self.state = Waiting(num_permits);
- }
- Greater => {
- let delta = requested - num_permits;
- let to_release = waiter.try_dec_permits_to_acquire(delta as usize);
-
- semaphore.add_permits(to_release);
- self.state = Waiting(num_permits);
- }
- Equal => {}
- }
-
- if waiter.permits_to_acquire()? == 0 {
- self.state = Acquired(requested);
- return Ready(Ok(()));
- }
-
- waiter.waker.register_by_ref(cx.waker());
-
- if waiter.permits_to_acquire()? == 0 {
- self.state = Acquired(requested);
- return Ready(Ok(()));
- }
-
- Pending
- }
- Acquired(acquired) => {
- if acquired >= num_permits {
- Ready(Ok(()))
- } else {
- match semaphore.poll_acquire(cx, num_permits - acquired, self)? {
- Ready(()) => {
- self.state = Acquired(num_permits);
- Ready(Ok(()))
- }
- Pending => {
- self.state = Waiting(num_permits);
- Pending
- }
- }
- }
- }
- }
- }
-
- /// Tries to acquire the permit.
- pub(crate) fn try_acquire(
- &mut self,
- num_permits: u16,
- semaphore: &Semaphore,
- ) -> Result<(), TryAcquireError> {
- use PermitState::*;
-
- match self.state {
- Waiting(requested) => {
- // There must be a waiter
- let waiter = self.waiter.as_ref().unwrap();
-
- if requested > num_permits {
- let delta = requested - num_permits;
- let to_release = waiter.try_dec_permits_to_acquire(delta as usize);
-
- semaphore.add_permits(to_release);
- self.state = Waiting(num_permits);
- }
-
- let res = waiter.permits_to_acquire().map_err(to_try_acquire)?;
-
- if res == 0 {
- if requested < num_permits {
- // Try to acquire the additional permits
- semaphore.try_acquire(num_permits - requested)?;
- }
-
- self.state = Acquired(num_permits);
- Ok(())
- } else {
- Err(TryAcquireError::NoPermits)
- }
- }
- Acquired(acquired) => {
- if acquired < num_permits {
- semaphore.try_acquire(num_permits - acquired)?;
- self.state = Acquired(num_permits);
- }
-
- Ok(())
- }
- }
- }
-
- /// Releases a permit back to the semaphore
- pub(crate) fn release(&mut self, n: u16, semaphore: &Semaphore) {
- let n = self.forget(n);
- semaphore.add_permits(n as usize);
- }
-
- /// Forgets the permit **without** releasing it back to the semaphore.
- ///
- /// After calling `forget`, `poll_acquire` is able to acquire new permit
- /// from the semaphore.
- ///
- /// Repeatedly calling `forget` without associated calls to `add_permit`
- /// will result in the semaphore losing all permits.
- ///
- /// Will forget **at most** the number of acquired permits. This number is
- /// returned.
- pub(crate) fn forget(&mut self, n: u16) -> u16 {
- use PermitState::*;
-
- match self.state {
- Waiting(requested) => {
- let n = cmp::min(n, requested);
-
- // Decrement
- let acquired = self
- .waiter
- .as_ref()
- .unwrap()
- .try_dec_permits_to_acquire(n as usize) as u16;
-
- if n == requested {
- self.state = Acquired(0);
- } else if acquired == requested - n {
- self.state = Waiting(acquired);
- } else {
- self.state = Waiting(requested - n);
- }
-
- acquired
- }
- Acquired(acquired) => {
- let n = cmp::min(n, acquired);
- self.state = Acquired(acquired - n);
- n
- }
- }
- }
-}
-
-impl Default for Permit {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl Drop for Permit {
- fn drop(&mut self) {
- if let Some(waiter) = self.waiter.take() {
- // Set the dropped flag
- let state = WaiterState(waiter.state.fetch_or(DROPPED, AcqRel));
-
- if state.is_queued() {
- // The waiter is stored in the queue. The semaphore will drop it
- std::mem::forget(waiter);
- }
- }
- }
-}
-
-// ===== impl AcquireError ====
-
-impl AcquireError {
- fn closed() -> AcquireError {
- AcquireError(())
- }
-}
-
-fn to_try_acquire(_: AcquireError) -> TryAcquireError {
- TryAcquireError::Closed
-}
-
-impl fmt::Display for AcquireError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "semaphore closed")
- }
-}
-
-impl std::error::Error for AcquireError {}
-
-// ===== impl TryAcquireError =====
-
-impl TryAcquireError {
- /// Returns `true` if the error was caused by a closed semaphore.
- pub(crate) fn is_closed(&self) -> bool {
- match self {
- TryAcquireError::Closed => true,
- _ => false,
- }
- }
-
- /// Returns `true` if the error was caused by calling `try_acquire` on a
- /// semaphore with no available permits.
- pub(crate) fn is_no_permits(&self) -> bool {
- match self {
- TryAcquireError::NoPermits => true,
- _ => false,
- }
- }
-}
-
-impl fmt::Display for TryAcquireError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- TryAcquireError::Closed => write!(fmt, "semaphore closed"),
- TryAcquireError::NoPermits => write!(fmt, "no permits available"),
- }
- }
-}
-
-impl std::error::Error for TryAcquireError {}
-
-// ===== impl Waiter =====
-
-impl Waiter {
- fn new() -> Waiter {
- Waiter {
- state: AtomicUsize::new(0),
- waker: AtomicWaker::new(),
- next: AtomicPtr::new(ptr::null_mut()),
- }
- }
-
- fn permits_to_acquire(&self) -> Result<usize, AcquireError> {
- let state = WaiterState(self.state.load(Acquire));
-
- if state.is_closed() {
- Err(AcquireError(()))
- } else {
- Ok(state.permits_to_acquire())
- }
- }
-
- /// Only increments the number of permits *if* the waiter is currently
- /// queued.
- ///
- /// # Returns
- ///
- /// `true` if the number of permits to acquire has been incremented. `false`
- /// otherwise. On `false`, the caller should use `Semaphore::poll_acquire`.
- fn try_inc_permits_to_acquire(&self, n: usize) -> bool {
- let mut curr = WaiterState(self.state.load(Acquire));
-
- loop {
- if !curr.is_queued() {
- assert_eq!(0, curr.permits_to_acquire());
- return false;
- }
-
- let mut next = curr;
- next.set_permits_to_acquire(n + curr.permits_to_acquire());
-
- match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => return true,
- Err(actual) => curr = WaiterState(actual),
- }
- }
- }
-
- /// Try to decrement the number of permits to acquire. This returns the
- /// actual number of permits that were decremented. The delta between `n`
- /// and the return has been assigned to the permit and the caller must
- /// assign these back to the semaphore.
- fn try_dec_permits_to_acquire(&self, n: usize) -> usize {
- let mut curr = WaiterState(self.state.load(Acquire));
-
- loop {
- if !curr.is_queued() {
- assert_eq!(0, curr.permits_to_acquire());
- }
-
- let delta = cmp::min(n, curr.permits_to_acquire());
- let rem = curr.permits_to_acquire() - delta;
-
- let mut next = curr;
- next.set_permits_to_acquire(rem);
-
- match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => return n - delta,
- Err(actual) => curr = WaiterState(actual),
- }
- }
- }
-
- /// Store the number of remaining permits needed to satisfy the waiter and
- /// transition to the "QUEUED" state.
- ///
- /// # Returns
- ///
- /// `true` if the `QUEUED` bit was set as part of the transition.
- fn to_queued(&self, num_permits: usize) -> bool {
- let mut curr = WaiterState(self.state.load(Acquire));
-
- // The waiter should **not** be waiting for any permits.
- debug_assert_eq!(curr.permits_to_acquire(), 0);
-
- loop {
- let mut next = curr;
- next.set_permits_to_acquire(num_permits);
- next.set_queued();
-
- match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => {
- if curr.is_queued() {
- return false;
- } else {
- // Make sure the next pointer is null
- self.next.store(ptr::null_mut(), Relaxed);
- return true;
- }
- }
- Err(actual) => curr = WaiterState(actual),
- }
- }
- }
-
- /// Set the number of permits to acquire.
- ///
- /// This function is only called when the waiter is being inserted into the
- /// wait queue. Because of this, there are no concurrent threads that can
- /// modify the state and using `store` is safe.
- fn set_permits_to_acquire(&self, num_permits: usize) {
- debug_assert!(WaiterState(self.state.load(Acquire)).is_queued());
-
- let mut state = WaiterState(QUEUED);
- state.set_permits_to_acquire(num_permits);
-
- self.state.store(state.0, Release);
- }
-
- /// Assign permits to the waiter.
- ///
- /// Returns `true` if the waiter should be removed from the queue
- fn assign_permits(&self, n: &mut usize, closed: bool) -> bool {
- let mut curr = WaiterState(self.state.load(Acquire));
-
- loop {
- let mut next = curr;
-
- // Number of permits to assign to this waiter
- let assign = cmp::min(curr.permits_to_acquire(), *n);
-
- // Assign the permits
- next.set_permits_to_acquire(curr.permits_to_acquire() - assign);
-
- if closed {
- next.set_closed();
- }
-
- match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
- Ok(_) => {
- // Update `n`
- *n -= assign;
-
- if next.permits_to_acquire() == 0 {
- if curr.permits_to_acquire() > 0 {
- self.waker.wake();
- }
-
- return true;
- } else {
- return false;
- }
- }
- Err(actual) => curr = WaiterState(actual),
- }
- }
- }
-
- fn revert_to_idle(&self) {
- // An idle node is not waiting on any permits
- self.state.store(0, Relaxed);
- }
-
- fn store_next(&self, next: NonNull<Waiter>) {
- self.next.store(next.as_ptr(), Release);
- }
-}
-
-// ===== impl SemState =====
-
-impl SemState {
- /// Returns a new default `State` value.
- fn new(permits: usize, stub: &Waiter) -> SemState {
- assert!(permits <= MAX_PERMITS);
-
- if permits > 0 {
- SemState((permits << NUM_SHIFT) | NUM_FLAG)
- } else {
- SemState(stub as *const _ as usi