#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
//! Thread-safe, asynchronous counting semaphore.
//!
//! A `Semaphore` instance holds a set of permits. Permits are used to
//! synchronize access to a shared resource.
//!
//! Before accessing the shared resource, callers acquire a permit from the
//! semaphore. Once the permit is acquired, the caller then enters the critical
//! section. If no permits are available, then acquiring the semaphore returns
//! `Pending`. The task is woken once a permit becomes available.
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
use crate::loom::thread;
use std::cmp;
use std::fmt;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;
/// Futures-aware semaphore.
pub(crate) struct Semaphore {
/// Tracks both the waiter queue tail pointer and the number of remaining
/// permits.
state: AtomicUsize,
/// waiter queue head pointer.
head: UnsafeCell<NonNull<Waiter>>,
/// Coordinates access to the queue head.
rx_lock: AtomicUsize,
/// Stub waiter node used as part of the MPSC channel algorithm.
stub: Box<Waiter>,
}
/// A semaphore permit
///
/// Tracks the lifecycle of a semaphore permit.
///
/// An instance of `Permit` is intended to be used with a **single** instance of
/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
/// instances will result in unexpected behavior.
///
/// `Permit` does **not** release the permit back to the semaphore on drop. It
/// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit.
#[derive(Debug)]
pub(crate) struct Permit {
waiter: Option<Box<Waiter>>,
state: PermitState,
}
/// Error returned by `Permit::poll_acquire`.
#[derive(Debug)]
pub(crate) struct AcquireError(());
/// Error returned by `Permit::try_acquire`.
#[derive(Debug)]
pub(crate) enum TryAcquireError {
Closed,
NoPermits,
}
/// Node used to notify the semaphore waiter when permit is available.
#[derive(Debug)]
struct Waiter {
/// Stores waiter state.
///
/// See `WaiterState` for more details.
state: AtomicUsize,
/// Task to wake when a permit is made available.
waker: AtomicWaker,
/// Next pointer in the queue of waiting senders.
next: AtomicPtr<Waiter>,
}
/// Semaphore state
///
/// The 2 low bits track the modes.
///
/// - Closed
/// - Full
///
/// When not full, the rest of the `usize` tracks the total number of messages
/// in the channel. When full, the rest of the `usize` is a pointer to the tail
/// of the "waiting senders" queue.
#[derive(Copy, Clone)]
struct SemState(usize);
/// Permit state
#[derive(Debug, Copy, Clone)]
enum PermitState {
/// Currently waiting for permits to be made available and assigned to the
/// waiter.
Waiting(u16),
/// The number of acquired permits
Acquired(u16),
}
/// State for an individual waker node
#[derive(Debug, Copy, Clone)]
struct WaiterState(usize);
/// Waiter node is in the semaphore queue
const QUEUED: usize = 0b001;
/// Semaphore has been closed, no more permits will be issued.
const CLOSED: usize = 0b10;
/// The permit that owns the `Waiter` dropped.
const DROPPED: usize = 0b100;
/// Represents "one requested permit" in the waiter state
const PERMIT_ONE: usize = 0b1000;
/// Masks the waiter state to only contain bits tracking number of requested
/// permits.
const PERMIT_MASK: usize = usize::MAX - (PERMIT_ONE - 1);
/// How much to shift a permit count to pack it into the waker state
const PERMIT_SHIFT: u32 = PERMIT_ONE.trailing_zeros();
/// Flag differentiating between available permits and waiter pointers.
///
/// If we assume pointers are properly aligned, then the least significant bit
/// will always be zero. So, we use that bit to track if the value represents a
/// number.
const NUM_FLAG: usize = 0b01;
/// Signal the semaphore is closed
const CLOSED_FLAG: usize = 0b10;
/// Maximum number of permits a semaphore can manage
const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;
/// When representing "numbers", the state has to be shifted this much (to get
/// rid of the flag bit).
const NUM_SHIFT: usize = 2;
// ===== impl Semaphore =====
impl Semaphore {
/// Creates a new semaphore with the initial number of permits
///
/// # Panics
///
/// Panics if `permits` is zero.
pub(crate)