summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/semaphore.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/semaphore.rs
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/semaphore.rs')
-rw-r--r--tokio/src/sync/semaphore.rs1142
1 files changed, 1142 insertions, 0 deletions
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
new file mode 100644
index 00000000..1120be07
--- /dev/null
+++ b/tokio/src/sync/semaphore.rs
@@ -0,0 +1,1142 @@
+//! Thread-safe, asynchronous counting semaphore.
+//!
+//! A `Semaphore` instance holds a set of permits. Permits are used to
+//! synchronize access to a shared resource.
+//!
+//! Before accessing the shared resource, callers acquire a permit from the
+//! semaphore. Once the permit is acquired, the caller then enters the critical
+//! section. If no permits are available, then acquiring the semaphore returns
+//! `Pending`. The task is woken once a permit becomes available.
+
+use crate::sync::loom::{
+ future::AtomicWaker,
+ sync::{
+ atomic::{AtomicPtr, AtomicUsize},
+ CausalCell,
+ },
+ thread,
+};
+
+use std::fmt;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
+use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
+use std::usize;
+
+/// Futures-aware semaphore.
+pub struct Semaphore {
+ /// Tracks both the waiter queue tail pointer and the number of remaining
+ /// permits.
+ state: AtomicUsize,
+
+ /// waiter queue head pointer.
+ head: CausalCell<NonNull<WaiterNode>>,
+
+ /// Coordinates access to the queue head.
+ rx_lock: AtomicUsize,
+
+ /// Stub waiter node used as part of the MPSC channel algorithm.
+ stub: Box<WaiterNode>,
+}
+
+/// A semaphore permit
+///
+/// Tracks the lifecycle of a semaphore permit.
+///
+/// An instance of `Permit` is intended to be used with a **single** instance of
+/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
+/// instances will result in unexpected behavior.
+///
+/// `Permit` does **not** release the permit back to the semaphore on drop. It
+/// is the user's responsibility to ensure that `Permit::release` is called
+/// before dropping the permit.
+#[derive(Debug)]
+pub struct Permit {
+ waiter: Option<Arc<WaiterNode>>,
+ state: PermitState,
+}
+
+/// Error returned by `Permit::poll_acquire`.
+#[derive(Debug)]
+pub struct AcquireError(());
+
+/// Error returned by `Permit::try_acquire`.
+#[derive(Debug)]
+pub struct TryAcquireError {
+ kind: ErrorKind,
+}
+
+#[derive(Debug)]
+enum ErrorKind {
+ Closed,
+ NoPermits,
+}
+
+/// Node used to notify the semaphore waiter when permit is available.
+#[derive(Debug)]
+struct WaiterNode {
+ /// Stores waiter state.
+ ///
+ /// See `NodeState` for more details.
+ state: AtomicUsize,
+
+ /// Task to wake when a permit is made available.
+ waker: AtomicWaker,
+
+ /// Next pointer in the queue of waiting senders.
+ next: AtomicPtr<WaiterNode>,
+}
+
+/// Semaphore state
+///
+/// The 2 low bits track the modes.
+///
+/// - Closed
+/// - Full
+///
+/// When not full, the rest of the `usize` tracks the total number of messages
+/// in the channel. When full, the rest of the `usize` is a pointer to the tail
+/// of the "waiting senders" queue.
+#[derive(Copy, Clone)]
+struct SemState(usize);
+
+/// Permit state
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+enum PermitState {
+ /// The permit has not been requested.
+ Idle,
+
+ /// Currently waiting for a permit to be made available and assigned to the
+ /// waiter.
+ Waiting,
+
+ /// The permit has been acquired.
+ Acquired,
+}
+
+/// Waiter node state
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+#[repr(usize)]
+enum NodeState {
+ /// Not waiting for a permit and the node is not in the wait queue.
+ ///
+ /// This is the initial state.
+ Idle = 0,
+
+ /// Not waiting for a permit but the node is in the wait queue.
+ ///
+ /// This happens when the waiter has previously requested a permit, but has
+ /// since canceled the request. The node cannot be removed by the waiter, so
+ /// this state informs the receiver to skip the node when it pops it from
+ /// the wait queue.
+ Queued = 1,
+
+ /// Waiting for a permit and the node is in the wait queue.
+ QueuedWaiting = 2,
+
+ /// The waiter has been assigned a permit and the node has been removed from
+ /// the queue.
+ Assigned = 3,
+
+ /// The semaphore has been closed. No more permits will be issued.
+ Closed = 4,
+}
+
+// ===== impl Semaphore =====
+
+impl Semaphore {
+ /// Creates a new semaphore with the initial number of permits
+ ///
+ /// # Panics
+ ///
+ /// Panics if `permits` is zero.
+ pub fn new(permits: usize) -> Semaphore {
+ let stub = Box::new(WaiterNode::new());
+ let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
+
+ // Allocations are aligned
+ debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);
+
+ let state = SemState::new(permits, &stub);
+
+ Semaphore {
+ state: AtomicUsize::new(state.to_usize()),
+ head: CausalCell::new(ptr),
+ rx_lock: AtomicUsize::new(0),
+ stub,
+ }
+ }
+
+ /// Returns the current number of available permits
+ pub fn available_permits(&self) -> usize {
+ let curr = SemState::load(&self.state, Acquire);
+ curr.available_permits()
+ }
+
+ /// Poll for a permit
+ fn poll_permit(
+ &self,
+ mut permit: Option<(&mut Context<'_>, &mut Permit)>,
+ ) -> Poll<Result<(), AcquireError>> {
+ // Load the current state
+ let mut curr = SemState::load(&self.state, Acquire);
+
+ debug!(" + poll_permit; sem-state = {:?}", curr);
+
+ // Tracks a *mut WaiterNode representing an Arc clone.
+ //
+ // This avoids having to bump the ref count unless required.
+ let mut maybe_strong: Option<NonNull<WaiterNode>> = None;
+
+ macro_rules! undo_strong {
+ () => {
+ if let Some(waiter) = maybe_strong {
+ // The waiter was cloned, but never got queued.
+ // Before entering `poll_permit`, the waiter was in the
+ // `Idle` state. We must transition the node back to the
+ // idle state.
+ let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) };
+ waiter.revert_to_idle();
+ }
+ };
+ }
+
+ loop {
+ let mut next = curr;
+
+ if curr.is_closed() {
+ undo_strong!();
+ return Ready(Err(AcquireError::closed()));
+ }
+
+ if !next.acquire_permit(&self.stub) {
+ debug!(" + poll_permit -- no permits");
+
+ debug_assert!(curr.waiter().is_some());
+
+ if maybe_strong.is_none() {
+ if let Some((ref mut cx, ref mut permit)) = permit {
+ // Get the Sender's waiter node, or initialize one
+ let waiter = permit
+ .waiter
+ .get_or_insert_with(|| Arc::new(WaiterNode::new()));
+
+ waiter.register(cx);
+
+ debug!(" + poll_permit -- to_queued_waiting");
+
+ if !waiter.to_queued_waiting() {
+ debug!(" + poll_permit; waiter already queued");
+ // The node is alrady queued, there is no further work
+ // to do.
+ return Pending;
+ }
+
+ maybe_strong = Some(WaiterNode::into_non_null(waiter.clone()));
+ } else {
+ // If no `waiter`, then the task is not registered and there
+ // is no further work to do.
+ return Pending;
+ }
+ }
+
+ next.set_waiter(maybe_strong.unwrap());
+ }
+
+ debug!(" + poll_permit -- pre-CAS; next = {:?}", next);
+
+ debug_assert_ne!(curr.0, 0);
+ debug_assert_ne!(next.0, 0);
+
+ match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
+ Ok(_) => {
+ debug!(" + poll_permit -- CAS ok");
+ match curr.waiter() {
+ Some(prev_waiter) => {
+ let waiter = maybe_strong.unwrap();
+
+ // Finish pushing
+ unsafe {
+ prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
+ }
+
+ debug!(" + poll_permit -- waiter pushed");
+
+ return Pending;
+ }
+ None => {
+ debug!(" + poll_permit -- permit acquired");
+
+ undo_strong!();
+
+ return Ready(Ok(()));
+ }
+ }
+ }
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+
+ /// Close the semaphore. This prevents the semaphore from issuing new
+ /// permits and notifies all pending waiters.
+ pub fn close(&self) {
+ debug!("+ Semaphore::close");
+
+ // Acquire the `rx_lock`, setting the "closed" flag on the lock.
+ let prev = self.rx_lock.fetch_or(1, AcqRel);
+ debug!(" + close -- rx_lock.fetch_add(1)");
+
+ if prev != 0 {
+ debug!("+ close -- locked; prev = {}", prev);
+ // Another thread has the lock and will be responsible for notifying
+ // pending waiters.
+ return;
+ }
+
+ self.add_permits_locked(0, true);
+ }
+
+ /// Add `n` new permits to the semaphore.
+ pub fn add_permits(&self, n: usize) {
+ debug!(" + add_permits; n = {}", n);
+
+ if n == 0 {
+ return;
+ }
+
+ // TODO: Handle overflow. A panic is not sufficient, the process must
+ // abort.
+ let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
+ debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n);
+
+ if prev != 0 {
+ debug!(" + add_permits -- locked; prev = {}", prev);
+ // Another thread has the lock and will be responsible for notifying
+ // pending waiters.
+ return;
+ }
+
+ self.add_permits_locked(n, false);
+ }
+
+ fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
+ while rem > 0 || closed {
+ debug!(
+ " + add_permits_locked -- iter; rem = {}; closed = {:?}",
+ rem, closed
+ );
+
+ if closed {
+ SemState::fetch_set_closed(&self.state, AcqRel);
+ }
+
+ // Release the permits and notify
+ self.add_permits_locked2(rem, closed);
+
+ let n = rem << 1;
+
+ let actual = if closed {
+ let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
+ debug!(
+ " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}",
+ n, actual
+ );
+
+ closed = false;
+ actual
+ } else {
+ let actual = self.rx_lock.fetch_sub(n, AcqRel);
+ debug!(
+ " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}",
+ n, actual
+ );
+
+ closed = actual & 1 == 1;
+ actual
+ };
+
+ rem = (actual >> 1) - rem;
+ }
+
+ debug!(" + add_permits; done");
+ }
+
+ /// Release a specific amount of permits to the semaphore
+ ///
+ /// This function is called by `add_permits` after the add lock has been
+ /// acquired.
+ fn add_permits_locked2(&self, mut n: usize, closed: bool) {
+ while n > 0 || closed {
+ let waiter = match self.pop(n, closed) {
+ Some(waiter) => waiter,
+ None => {
+ return;
+ }
+ };
+
+ debug!(" + release_n -- notify");
+
+ if waiter.notify(closed) {
+ n = n.saturating_sub(1);
+ debug!(" + release_n -- dec");
+ }
+ }
+ }
+
+ /// Pop a waiter
+ ///
+ /// `rem` represents the remaining number of times the caller will pop. If
+ /// there are no more waiters to pop, `rem` is used to set the available
+ /// permits.
+ fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
+ debug!(" + pop; rem = {}", rem);
+
+ 'outer: loop {
+ unsafe {
+ let mut head = self.head.with(|head| *head);
+ let mut next_ptr = head.as_ref().next.load(Acquire);
+
+ let stub = self.stub();
+
+ if head == stub {
+ debug!(" + pop; head == stub");
+
+ let next = match NonNull::new(next_ptr) {
+ Some(next) => next,
+ None => {
+ // This loop is not part of the standard intrusive mpsc
+ // channel algorithm. This is where we atomically pop
+ // the last task and add `rem` to the remaining capacity.
+ //
+ // This modification to the pop algorithm works because,
+ // at this point, we have not done any work (only done
+ // reading). We have a *pretty* good idea that there is
+ // no concurrent pusher.
+ //
+ // The capacity is then atomically added by doing an
+ // AcqRel CAS on `state`. The `state` cell is the
+ // linchpin of the algorithm.
+ //
+ // By successfully CASing `head` w/ AcqRel, we ensure
+ // that, if any thread was racing and entered a push, we
+ // see that and abort pop, retrying as it is
+ // "inconsistent".
+ let mut curr = SemState::load(&self.state, Acquire);
+
+ loop {
+ if curr.has_waiter(&self.stub) {
+ // Inconsistent
+ debug!(" + pop; inconsistent 1");
+ thread::yield_now();
+ continue 'outer;
+ }
+
+ // When closing the semaphore, nodes are popped
+ // with `rem == 0`. In this case, we are not
+ // adding permits, but notifying waiters of the
+ // semaphore's closed state.
+ if rem == 0 {
+ debug_assert!(curr.is_closed(), "state = {:?}", curr);
+ return None;
+ }
+
+ let mut next = curr;
+ next.release_permits(rem, &self.stub);
+
+ match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
+ Ok(_) => return None,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+ };
+
+ debug!(" + pop; got next waiter");
+
+ self.head.with_mut(|head| *head = next);
+ head = next;
+ next_ptr = next.as_ref().next.load(Acquire);
+ }
+
+ if let Some(next) = NonNull::new(next_ptr) {
+ self.head.with_mut(|head| *head = next);
+
+ return Some(Arc::from_raw(head.as_ptr()));
+ }
+
+ let state = SemState::load(&self.state, Acquire);
+
+ // This must always be a pointer as the wait list is not empty.
+ let tail = state.waiter().unwrap();
+
+ if tail != head {
+ // Inconsistent
+ debug!(" + pop; inconsistent 2");
+ thread::yield_now();
+ continue 'outer;
+ }
+
+ self.push_stub(closed);
+
+ next_ptr = head.as_ref().next.load(Acquire);
+
+ if let Some(next) = NonNull::new(next_ptr) {
+ self.head.with_mut(|head| *head = next);
+
+ return Some(Arc::from_raw(head.as_ptr()));
+ }
+
+ // Inconsistent state, loop
+ debug!(" + pop; inconsistent 3");
+ thread::yield_now();
+ }
+ }
+ }
+
+ unsafe fn push_stub(&self, closed: bool) {
+ let stub = self.stub();
+
+ // Set the next pointer. This does not require an atomic operation as
+ // this node is not accessible. The write will be flushed with the next
+ // operation
+ stub.as_ref().next.store(ptr::null_mut(), Relaxed);
+
+ // Update the tail to point to the new node. We need to see the previous
+ // node in order to update the next pointer as well as release `task`
+ // to any other threads calling `push`.
+ let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel);
+
+ debug_assert_eq!(closed, prev.is_closed());
+
+ // The stub is only pushed when there are pending tasks. Because of
+ // this, the state must *always* be in pointer mode.
+ let prev = prev.waiter().unwrap();
+
+ // We don't want the *existing* pointer to be a stub.
+ debug_assert_ne!(prev, stub);
+
+ // Release `task` to the consume end.
+ prev.as_ref().next.store(stub.as_ptr(), Release);
+ }
+
+ fn stub(&self) -> NonNull<WaiterNode> {
+ unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
+ }
+}
+
+impl fmt::Debug for Semaphore {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Semaphore")
+ .field("state", &SemState::load(&self.state, Relaxed))
+ .field("head", &self.head.with(|ptr| ptr))
+ .field("rx_lock", &self.rx_lock.load(Relaxed))
+ .field("stub", &self.stub)
+ .finish()
+ }
+}
+
+unsafe impl Send for Semaphore {}
+unsafe impl Sync for Semaphore {}
+
+// ===== impl Permit =====
+
+impl Permit {
+ /// Create a new `Permit`.
+ ///
+ /// The permit begins in the "unacquired" state.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::semaphore::Permit;
+ ///
+ /// let permit = Permit::new();
+ /// assert!(!permit.is_acquired());
+ /// ```
+ pub fn new() -> Permit {
+ Permit {
+ waiter: None,
+ state: PermitState::Idle,
+ }
+ }
+
+ /// Returns true if the permit has been acquired
+ pub fn is_acquired(&self) -> bool {
+ self.state == PermitState::Acquired
+ }
+
+ /// Try to acquire the permit. If no permits are available, the current task
+ /// is notified once a new permit becomes available.
+ pub fn poll_acquire(
+ &mut self,
+ cx: &mut Context<'_>,
+ semaphore: &Semaphore,
+ ) -> Poll<Result<(), AcquireError>> {
+ match self.state {
+ PermitState::Idle => {}
+ PermitState::Waiting => {
+ let waiter = self.waiter.as_ref().unwrap();
+
+ if waiter.acquire(cx)? {
+ self.state = PermitState::Acquired;
+ return Ready(Ok(()));
+ } else {
+ return Pending;
+ }
+ }
+ PermitState::Acquired => {
+ return Ready(Ok(()));
+ }
+ }
+
+ match semaphore.poll_permit(Some((cx, self)))? {
+ Ready(()) => {
+ self.state = PermitState::Acquired;
+ Ready(Ok(()))
+ }
+ Pending => {
+ self.state = PermitState::Waiting;
+ Pending
+ }
+ }
+ }
+
+ /// Try to acquire the permit.
+ pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
+ match self.state {
+ PermitState::Idle => {}
+ PermitState::Waiting => {
+ let waiter = self.waiter.as_ref().unwrap();
+
+ if waiter.acquire2().map_err(to_try_acquire)? {
+ self.state = PermitState::Acquired;
+ return Ok(());
+ } else {
+ return Err(TryAcquireError::no_permits());
+ }
+ }
+ PermitState::Acquired => {
+ return Ok(());
+ }
+ }
+
+ match semaphore.poll_permit(None).map_err(to_try_acquire)? {
+ Ready(()) => {
+ self.state = PermitState::Acquired;
+ Ok(())
+ }
+ Pending => Err(TryAcquireError::no_permits()),
+ }
+ }
+
+ /// Release a permit back to the semaphore
+ pub fn release(&mut self, semaphore: &Semaphore) {
+ if self.forget2() {
+ semaphore.add_permits(1);
+ }
+ }
+
+ /// Forget the permit **without** releasing it back to the semaphore.
+ ///
+ /// After calling `forget`, `poll_acquire` is able to acquire new permit
+ /// from the sempahore.
+ ///
+ /// Repeatedly calling `forget` without associated calls to `add_permit`
+ /// will result in the semaphore losing all permits.
+ pub fn forget(&mut self) {
+ self.forget2();
+ }
+
+ /// Returns `true` if the permit was acquired
+ fn forget2(&mut self) -> bool {
+ match self.state {
+ PermitState::Idle => false,
+ PermitState::Waiting => {
+ let ret = self.waiter.as_ref().unwrap().cancel_interest();
+ self.state = PermitState::Idle;
+ ret
+ }
+ PermitState::Acquired => {
+ self.state = PermitState::Idle;
+ true
+ }
+ }
+ }
+}
+
+impl Default for Permit {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ===== impl AcquireError ====
+
+impl AcquireError {
+ fn closed() -> AcquireError {
+ AcquireError(())
+ }
+}
+
+fn to_try_acquire(_: AcquireError) -> TryAcquireError {
+ TryAcquireError::closed()
+}
+
+impl fmt::Display for AcquireError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "semaphore closed")
+ }
+}
+
+impl ::std::error::Error for AcquireError {}
+
+// ===== impl TryAcquireError =====
+
+impl TryAcquireError {
+ fn closed() -> TryAcquireError {
+ TryAcquireError {
+ kind: ErrorKind::Closed,
+ }
+ }
+
+ fn no_permits() -> TryAcquireError {
+ TryAcquireError {
+ kind: ErrorKind::NoPermits,
+ }
+ }
+
+ /// Returns true if the error was caused by a closed semaphore.
+ pub fn is_closed(&self) -> bool {
+ match self.kind {
+ ErrorKind::Closed => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if the error was caused by calling `try_acquire` on a
+ /// semaphore with no available permits.
+ pub fn is_no_permits(&self) -> bool {
+ match self.kind {
+ ErrorKind::NoPermits => true,
+ _ => false,
+ }
+ }
+}
+
+impl fmt::Display for TryAcquireError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let descr = match self.kind {
+ ErrorKind::Closed => "semaphore closed",
+ ErrorKind::NoPermits => "no permits available",
+ };
+ write!(fmt, "{}", descr)
+ }
+}
+
+impl ::std::error::Error for TryAcquireError {}
+
+// ===== impl WaiterNode =====
+
+impl WaiterNode {
+ fn new() -> WaiterNode {
+ WaiterNode {
+ state: AtomicUsize::new(NodeState::new().to_usize()),
+ waker: AtomicWaker::new(),
+ next: AtomicPtr::new(ptr::null_mut()),
+ }
+ }
+
+ fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> {
+ if self.acquire2()? {
+ return Ok(true);
+ }
+
+ self.waker.register_by_ref(cx.waker());
+
+ self.acquire2()
+ }
+
+ fn acquire2(&self) -> Result<bool, AcquireError> {
+ use self::NodeState::*;
+
+ match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) {
+ Ok(_) => Ok(true),
+ Err(Closed) => Err(AcquireError::closed()),
+ Err(_) => Ok(false),
+ }
+ }
+
+ fn register(&self, cx: &mut Context<'_>) {
+ self.waker.register_by_ref(cx.waker())
+ }
+
+ /// Returns `true` if the permit has been acquired
+ fn cancel_interest(&self) -> bool {
+ use self::NodeState::*;
+
+ match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) {
+ // Successfully removed interest from the queued node. The permit
+ // has not been assigned to the node.
+ Ok(_) => false,
+ // The semaphore has been closed, there is no further action to
+ // take.
+ Err(Closed) => false,
+ // The permit has been assigned. It must be acquired in order to
+ // be released back to the semaphore.
+ Err(Assigned) => {
+ match self.acquire2() {
+ Ok(true) => true,
+ // Not a reachable state
+ Ok(false) => panic!(),
+ // The semaphore has been closed, no further action to take.
+ Err(_) => false,
+ }
+ }
+ Err(state) => panic!("unexpected state = {:?}", state),
+ }
+ }
+
+ /// Transition the state to `QueuedWaiting`.
+ ///
+ /// This step can only happen from `Queued` or from `Idle`.
+ ///
+ /// Returns `true` if transitioning into a queued state.
+ fn to_queued_waiting(&self) -> bool {
+ use self::NodeState::*;
+
+ let mut curr = NodeState::load(&self.state, Acquire);
+
+ loop {
+ debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr);
+ let next = QueuedWaiting;
+
+ match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
+ Ok(_) => {
+ if curr.is_queued() {
+ return false;
+ } else {
+ // Transitioned to queued, reset next pointer
+ self.next.store(ptr::null_mut(), Relaxed);
+ return true;
+ }
+ }
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+
+ /// Notify the waiter
+ ///
+ /// Returns `true` if the waiter accepts the notification
+ fn notify(&self, closed: bool) -> bool {
+ use self::NodeState::*;
+
+ // Assume QueuedWaiting state
+ let mut curr = QueuedWaiting;
+
+ loop {
+ let next = match curr {
+ Queued => Idle,
+ QueuedWaiting => {
+ if closed {
+ Closed
+ } else {
+ Assigned
+ }
+ }
+ actual => panic!("actual = {:?}", actual),
+ };
+
+ match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
+ Ok(_) => match curr {
+ QueuedWaiting => {
+ debug!(" + notify -- task notified");
+ self.waker.wake();
+ return true;
+ }
+ other => {
+ debug!(" + notify -- not notified; state = {:?}", other);
+ return false;
+ }
+ },
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+
+ fn revert_to_idle(&self) {
+ use self::NodeState::Idle;
+
+ // There are no other handles to the node
+ NodeState::store(&self.state, Idle, Relaxed);
+ }
+
+ #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293
+ fn into_non_null(self: Arc<WaiterNode>) -> NonNull<WaiterNode> {
+ let ptr = Arc::into_raw(self);
+ unsafe { NonNull::new_unchecked(ptr as *mut _) }
+ }
+}
+
+// ===== impl State =====
+
+/// Flag differentiating between available permits and waiter pointers.
+///
+/// If we assume pointers are properly aligned, then the least significant bit
+/// will always be zero. So, we use that bit to track if the value represents a
+/// number.
+const NUM_FLAG: usize = 0b01;
+
+const CLOSED_FLAG: usize = 0b10;
+
+const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;
+
+/// When representing "numbers", the state has to be shifted this much (to get
+/// rid of the flag bit).
+const NUM_SHIFT: usize = 2;
+
+impl SemState {
+ /// Returns a new default `State` value.
+ fn new(permits: usize, stub: &WaiterNode) -> SemState {
+ assert!(permits <= MAX_PERMITS);
+
+ if permits > 0 {
+ SemState((permits << NUM_SHIFT) | NUM_FLAG)
+ } else {
+ SemState(stub as *const _ as usize)
+ }
+ }
+
+ /// Returns a `State` tracking `ptr` as the tail of the queue.
+ fn new_ptr(tail: NonNull<WaiterNode>, closed: bool) -> SemState {
+ let mut val = tail.as_ptr() as usize;
+
+ if closed {
+ val |= CLOSED_FLAG;
+ }
+
+ SemState(val)
+ }
+
+ /// Returns the amount of remaining capacity
+ fn available_permits(self) -> usize {
+ if !self.has_available_permits() {
+ return 0;
+ }
+
+ self.0 >> NUM_SHIFT
+ }
+
+ /// Returns true if the state has permits that can be claimed by a waiter.
+ fn has_available_permits(self) -> bool {
+ self.0 & NUM_FLAG == NUM_FLAG
+ }
+
+ fn has_waiter(self, stub: &WaiterNode) -> bool {
+ !self.has_available_permits() && !self.is_stub(stub)
+ }
+
+ /// Try to acquire a permit
+ ///
+ /// # Return
+ ///
+ /// Returns `true` if the permit was acquired, `false` otherwise. If `false`
+ /// is returned, it can be assumed that `State` represents the head pointer
+ /// in the mpsc channel.
+ fn acquire_permit(&mut self, stub: &WaiterNode) -> bool {
+ if !self.has_available_permits() {
+ return false;
+ }
+
+ debug_assert!(self.waiter().is_none());
+
+ self.0 -= 1 << NUM_SHIFT;
+
+ if self.0 == NUM_FLAG {
+ // Set the state to the stub pointer.
+ self.0 = stub as *const _ as usize;
+ }
+
+ true
+ }
+
+ /// Release permits
+ ///
+ /// Returns `true` if the permits were accepted.
+ fn release_permits(&mut self, permits: usize, stub: &WaiterNode) {
+ debug_assert!(permits > 0);
+
+ if self.is_stub(stub) {
+ self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG);
+ return;
+ }
+
+ debug_assert!(self.has_available_permits());
+
+ self.0 += permits << NUM_SHIFT;
+ }
+
+ fn is_waiter(self) -> bool {
+ self.0 & NUM_FLAG == 0
+ }
+
+ /// Returns the waiter, if one is set.
+ fn waiter(self) -> Option<NonNull<WaiterNode>> {
+ if self.is_waiter() {
+ let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored");
+
+ Some(waiter)
+ } else {
+ None
+ }
+ }
+
+ /// Assumes `self` represents a pointer
+ fn as_ptr(self) -> *mut WaiterNode {
+ (self.0 & !CLOSED_FLAG) as *mut WaiterNode
+ }
+
+ /// Set to a pointer to a waiter.
+ ///
+ /// This can only be done from the full state.
+ fn set_waiter(&mut self, waiter: NonNull<WaiterNode>) {
+ let waiter = waiter.as_ptr() as usize;
+ debug_assert!(waiter & NUM_FLAG == 0);
+ debug_assert!(!self.is_closed());
+
+ self.0 = waiter;
+ }
+
+ fn is_stub(self, stub: &WaiterNode) -> bool {
+ self.as_ptr() as usize == stub as *const _ as usize
+ }
+
+ /// Load the state from an AtomicUsize.
+ fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
+ let value = cell.load(ordering);
+ debug!(" + SemState::load; value = {}", value);
+ SemState(value)
+ }
+
+ /// Swap the values
+ fn swap(self, cell: &AtomicUsize, ordering: Ordering) -> SemState {
+ let prev = SemState(cell.swap(self.to_usize(), ordering));
+ debug_assert_eq!(prev.is_closed(), self.is_closed());
+ prev
+ }
+
+ /// Compare and exchange the current value into the provided cell
+ fn compare_exchange(
+ self,
+ cell: &AtomicUsize,
+ prev: SemState,
+ success: Ordering,
+ failure: Ordering,
+ ) -> Result<SemState, SemState> {
+ debug_assert_eq!(prev.is_closed(), self.is_closed());
+
+ let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);
+
+ debug!(
+ " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}",
+ prev.to_usize(),
+ self.to_usize(),
+ res
+ );
+
+ res.map(SemState).map_err(SemState)
+ }
+
+ fn f