//! Thread-safe, asynchronous counting semaphore.
//!
//! A `Semaphore` instance holds a set of permits. Permits are used to
//! synchronize access to a shared resource.
//!
//! Before accessing the shared resource, callers acquire a permit from the
//! semaphore. Once the permit is acquired, the caller then enters the critical
//! section. If no permits are available, then acquiring the semaphore returns
//! `Pending`. The task is woken once a permit becomes available.
use crate::loom::{
futures::AtomicWaker,
sync::{
atomic::{AtomicPtr, AtomicUsize},
CausalCell,
},
yield_now,
};
use std::fmt;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;
/// Futures-aware semaphore.
pub struct Semaphore {
/// Tracks both the waiter queue tail pointer and the number of remaining
/// permits.
state: AtomicUsize,
/// waiter queue head pointer.
head: CausalCell<NonNull<WaiterNode>>,
/// Coordinates access to the queue head.
rx_lock: AtomicUsize,
/// Stub waiter node used as part of the MPSC channel algorithm.
stub: Box<WaiterNode>,
}
/// A semaphore permit
///
/// Tracks the lifecycle of a semaphore permit.
///
/// An instance of `Permit` is intended to be used with a **single** instance of
/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
/// instances will result in unexpected behavior.
///
/// `Permit` does **not** release the permit back to the semaphore on drop. It
/// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit.
#[derive(Debug)]
pub struct Permit {
waiter: Option<Arc<WaiterNode>>,
state: PermitState,
}
/// Error returned by `Permit::poll_acquire`.
#[derive(Debug)]
pub struct AcquireError(());
/// Error returned by `Permit::try_acquire`.
#[derive(Debug)]
pub struct TryAcquireError {
kind: ErrorKind,
}
#[derive(Debug)]
enum ErrorKind {
Closed,
NoPermits,
}
/// Node used to notify the semaphore waiter when permit is available.
#[derive(Debug)]
struct WaiterNode {
/// Stores waiter state.
///
/// See `NodeState` for more details.
state: AtomicUsize,
/// Task to wake when a permit is made available.
waker: AtomicWaker,
/// Next pointer in the queue of waiting senders.
next: AtomicPtr<WaiterNode>,
}
/// Semaphore state
///
/// The 2 low bits track the modes.
///
/// - Closed
/// - Full
///
/// When not full, the rest of the `usize` tracks the total number of messages
/// in the channel. When full, the rest of the `usize` is a pointer to the tail
/// of the "waiting senders" queue.
#[derive(Copy, Clone)]
struct SemState(usize);
/// Permit state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum PermitState {
/// The permit has not been requested.
Idle,
/// Currently waiting for a permit to be made available and assigned to the
/// waiter.
Waiting,
/// The permit has been acquired.
Acquired,
}
/// Waiter node state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[repr(usize)]
enum NodeState {
/// Not waiting for a permit and the node is not in the wait queue.
///
/// This is the initial state.
Idle = 0,
/// Not waiting for a permit but the node is in the wait queue.
///
/// This happens when the waiter has previously requested a permit, but has
/// since canceled the request. The node cannot be removed by the waiter, so
/// this state informs the receiver to skip the node when it pops it from
/// the wait queue.
Queued = 1,
/// Waiting for a permit and the node is in the wait queue.
QueuedWaiting = 2,
/// The waiter has been assigned a permit and the node has been removed from
/// the queue.
Assigned = 3,
/// The semaphore has been closed. No more permits will be issued.
Closed = 4,
}
// ===== impl Semaphore =====
impl Semaphore {
/// Creates a new semaphore with the initial number of permits
///
/// # Panics
///
/// Panics if `permits` is zero.
pub fn new(permits: usize) -> Semaphore {
let stub = Box::new(WaiterNode::new());
let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
// Allocations are aligned
debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);
let state = SemState::new(permits, &stub);
Semaphore {
state: AtomicUsize::new(state.to_usize()),
head: CausalCell::new(ptr),
rx_lock: AtomicUsize::new(0),
stub,
}
}
/// Returns the current number of available permits
pub fn available_permits(&self) -> usize {
let curr = SemState::load(&self.state, Acquire);
curr.available_permit