From 2b909d6805990abf0bc2a5dea9e7267ff87df704 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 29 Oct 2019 15:11:31 -0700 Subject: sync: move into `tokio` crate (#1705) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- tokio/src/sync/semaphore.rs | 1142 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1142 insertions(+) create mode 100644 tokio/src/sync/semaphore.rs (limited to 'tokio/src/sync/semaphore.rs') diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs new file mode 100644 index 00000000..1120be07 --- /dev/null +++ b/tokio/src/sync/semaphore.rs @@ -0,0 +1,1142 @@ +//! Thread-safe, asynchronous counting semaphore. +//! +//! A `Semaphore` instance holds a set of permits. Permits are used to +//! synchronize access to a shared resource. +//! +//! Before accessing the shared resource, callers acquire a permit from the +//! semaphore. Once the permit is acquired, the caller then enters the critical +//! section. If no permits are available, then acquiring the semaphore returns +//! `Pending`. The task is woken once a permit becomes available. + +use crate::sync::loom::{ + future::AtomicWaker, + sync::{ + atomic::{AtomicPtr, AtomicUsize}, + CausalCell, + }, + thread, +}; + +use std::fmt; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; +use std::usize; + +/// Futures-aware semaphore. +pub struct Semaphore { + /// Tracks both the waiter queue tail pointer and the number of remaining + /// permits. + state: AtomicUsize, + + /// waiter queue head pointer. + head: CausalCell>, + + /// Coordinates access to the queue head. + rx_lock: AtomicUsize, + + /// Stub waiter node used as part of the MPSC channel algorithm. + stub: Box, +} + +/// A semaphore permit +/// +/// Tracks the lifecycle of a semaphore permit. +/// +/// An instance of `Permit` is intended to be used with a **single** instance of +/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore +/// instances will result in unexpected behavior. +/// +/// `Permit` does **not** release the permit back to the semaphore on drop. It +/// is the user's responsibility to ensure that `Permit::release` is called +/// before dropping the permit. +#[derive(Debug)] +pub struct Permit { + waiter: Option>, + state: PermitState, +} + +/// Error returned by `Permit::poll_acquire`. +#[derive(Debug)] +pub struct AcquireError(()); + +/// Error returned by `Permit::try_acquire`. +#[derive(Debug)] +pub struct TryAcquireError { + kind: ErrorKind, +} + +#[derive(Debug)] +enum ErrorKind { + Closed, + NoPermits, +} + +/// Node used to notify the semaphore waiter when permit is available. +#[derive(Debug)] +struct WaiterNode { + /// Stores waiter state. + /// + /// See `NodeState` for more details. + state: AtomicUsize, + + /// Task to wake when a permit is made available. + waker: AtomicWaker, + + /// Next pointer in the queue of waiting senders. + next: AtomicPtr, +} + +/// Semaphore state +/// +/// The 2 low bits track the modes. +/// +/// - Closed +/// - Full +/// +/// When not full, the rest of the `usize` tracks the total number of messages +/// in the channel. When full, the rest of the `usize` is a pointer to the tail +/// of the "waiting senders" queue. +#[derive(Copy, Clone)] +struct SemState(usize); + +/// Permit state +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +enum PermitState { + /// The permit has not been requested. + Idle, + + /// Currently waiting for a permit to be made available and assigned to the + /// waiter. + Waiting, + + /// The permit has been acquired. + Acquired, +} + +/// Waiter node state +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[repr(usize)] +enum NodeState { + /// Not waiting for a permit and the node is not in the wait queue. + /// + /// This is the initial state. + Idle = 0, + + /// Not waiting for a permit but the node is in the wait queue. + /// + /// This happens when the waiter has previously requested a permit, but has + /// since canceled the request. The node cannot be removed by the waiter, so + /// this state informs the receiver to skip the node when it pops it from + /// the wait queue. + Queued = 1, + + /// Waiting for a permit and the node is in the wait queue. + QueuedWaiting = 2, + + /// The waiter has been assigned a permit and the node has been removed from + /// the queue. + Assigned = 3, + + /// The semaphore has been closed. No more permits will be issued. + Closed = 4, +} + +// ===== impl Semaphore ===== + +impl Semaphore { + /// Creates a new semaphore with the initial number of permits + /// + /// # Panics + /// + /// Panics if `permits` is zero. + pub fn new(permits: usize) -> Semaphore { + let stub = Box::new(WaiterNode::new()); + let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); + + // Allocations are aligned + debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0); + + let state = SemState::new(permits, &stub); + + Semaphore { + state: AtomicUsize::new(state.to_usize()), + head: CausalCell::new(ptr), + rx_lock: AtomicUsize::new(0), + stub, + } + } + + /// Returns the current number of available permits + pub fn available_permits(&self) -> usize { + let curr = SemState::load(&self.state, Acquire); + curr.available_permits() + } + + /// Poll for a permit + fn poll_permit( + &self, + mut permit: Option<(&mut Context<'_>, &mut Permit)>, + ) -> Poll> { + // Load the current state + let mut curr = SemState::load(&self.state, Acquire); + + debug!(" + poll_permit; sem-state = {:?}", curr); + + // Tracks a *mut WaiterNode representing an Arc clone. + // + // This avoids having to bump the ref count unless required. + let mut maybe_strong: Option> = None; + + macro_rules! undo_strong { + () => { + if let Some(waiter) = maybe_strong { + // The waiter was cloned, but never got queued. + // Before entering `poll_permit`, the waiter was in the + // `Idle` state. We must transition the node back to the + // idle state. + let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) }; + waiter.revert_to_idle(); + } + }; + } + + loop { + let mut next = curr; + + if curr.is_closed() { + undo_strong!(); + return Ready(Err(AcquireError::closed())); + } + + if !next.acquire_permit(&self.stub) { + debug!(" + poll_permit -- no permits"); + + debug_assert!(curr.waiter().is_some()); + + if maybe_strong.is_none() { + if let Some((ref mut cx, ref mut permit)) = permit { + // Get the Sender's waiter node, or initialize one + let waiter = permit + .waiter + .get_or_insert_with(|| Arc::new(WaiterNode::new())); + + waiter.register(cx); + + debug!(" + poll_permit -- to_queued_waiting"); + + if !waiter.to_queued_waiting() { + debug!(" + poll_permit; waiter already queued"); + // The node is alrady queued, there is no further work + // to do. + return Pending; + } + + maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); + } else { + // If no `waiter`, then the task is not registered and there + // is no further work to do. + return Pending; + } + } + + next.set_waiter(maybe_strong.unwrap()); + } + + debug!(" + poll_permit -- pre-CAS; next = {:?}", next); + + debug_assert_ne!(curr.0, 0); + debug_assert_ne!(next.0, 0); + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => { + debug!(" + poll_permit -- CAS ok"); + match curr.waiter() { + Some(prev_waiter) => { + let waiter = maybe_strong.unwrap(); + + // Finish pushing + unsafe { + prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); + } + + debug!(" + poll_permit -- waiter pushed"); + + return Pending; + } + None => { + debug!(" + poll_permit -- permit acquired"); + + undo_strong!(); + + return Ready(Ok(())); + } + } + } + Err(actual) => { + curr = actual; + } + } + } + } + + /// Close the semaphore. This prevents the semaphore from issuing new + /// permits and notifies all pending waiters. + pub fn close(&self) { + debug!("+ Semaphore::close"); + + // Acquire the `rx_lock`, setting the "closed" flag on the lock. + let prev = self.rx_lock.fetch_or(1, AcqRel); + debug!(" + close -- rx_lock.fetch_add(1)"); + + if prev != 0 { + debug!("+ close -- locked; prev = {}", prev); + // Another thread has the lock and will be responsible for notifying + // pending waiters. + return; + } + + self.add_permits_locked(0, true); + } + + /// Add `n` new permits to the semaphore. + pub fn add_permits(&self, n: usize) { + debug!(" + add_permits; n = {}", n); + + if n == 0 { + return; + } + + // TODO: Handle overflow. A panic is not sufficient, the process must + // abort. + let prev = self.rx_lock.fetch_add(n << 1, AcqRel); + debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n); + + if prev != 0 { + debug!(" + add_permits -- locked; prev = {}", prev); + // Another thread has the lock and will be responsible for notifying + // pending waiters. + return; + } + + self.add_permits_locked(n, false); + } + + fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { + while rem > 0 || closed { + debug!( + " + add_permits_locked -- iter; rem = {}; closed = {:?}", + rem, closed + ); + + if closed { + SemState::fetch_set_closed(&self.state, AcqRel); + } + + // Release the permits and notify + self.add_permits_locked2(rem, closed); + + let n = rem << 1; + + let actual = if closed { + let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); + debug!( + " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}", + n, actual + ); + + closed = false; + actual + } else { + let actual = self.rx_lock.fetch_sub(n, AcqRel); + debug!( + " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}", + n, actual + ); + + closed = actual & 1 == 1; + actual + }; + + rem = (actual >> 1) - rem; + } + + debug!(" + add_permits; done"); + } + + /// Release a specific amount of permits to the semaphore + /// + /// This function is called by `add_permits` after the add lock has been + /// acquired. + fn add_permits_locked2(&self, mut n: usize, closed: bool) { + while n > 0 || closed { + let waiter = match self.pop(n, closed) { + Some(waiter) => waiter, + None => { + return; + } + }; + + debug!(" + release_n -- notify"); + + if waiter.notify(closed) { + n = n.saturating_sub(1); + debug!(" + release_n -- dec"); + } + } + } + + /// Pop a waiter + /// + /// `rem` represents the remaining number of times the caller will pop. If + /// there are no more waiters to pop, `rem` is used to set the available + /// permits. + fn pop(&self, rem: usize, closed: bool) -> Option> { + debug!(" + pop; rem = {}", rem); + + 'outer: loop { + unsafe { + let mut head = self.head.with(|head| *head); + let mut next_ptr = head.as_ref().next.load(Acquire); + + let stub = self.stub(); + + if head == stub { + debug!(" + pop; head == stub"); + + let next = match NonNull::new(next_ptr) { + Some(next) => next, + None => { + // This loop is not part of the standard intrusive mpsc + // channel algorithm. This is where we atomically pop + // the last task and add `rem` to the remaining capacity. + // + // This modification to the pop algorithm works because, + // at this point, we have not done any work (only done + // reading). We have a *pretty* good idea that there is + // no concurrent pusher. + // + // The capacity is then atomically added by doing an + // AcqRel CAS on `state`. The `state` cell is the + // linchpin of the algorithm. + // + // By successfully CASing `head` w/ AcqRel, we ensure + // that, if any thread was racing and entered a push, we + // see that and abort pop, retrying as it is + // "inconsistent". + let mut curr = SemState::load(&self.state, Acquire); + + loop { + if curr.has_waiter(&self.stub) { + // Inconsistent + debug!(" + pop; inconsistent 1"); + thread::yield_now(); + continue 'outer; + } + + // When closing the semaphore, nodes are popped + // with `rem == 0`. In this case, we are not + // adding permits, but notifying waiters of the + // semaphore's closed state. + if rem == 0 { + debug_assert!(curr.is_closed(), "state = {:?}", curr); + return None; + } + + let mut next = curr; + next.release_permits(rem, &self.stub); + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => return None, + Err(actual) => { + curr = actual; + } + } + } + } + }; + + debug!(" + pop; got next waiter"); + + self.head.with_mut(|head| *head = next); + head = next; + next_ptr = next.as_ref().next.load(Acquire); + } + + if let Some(next) = NonNull::new(next_ptr) { + self.head.with_mut(|head| *head = next); + + return Some(Arc::from_raw(head.as_ptr())); + } + + let state = SemState::load(&self.state, Acquire); + + // This must always be a pointer as the wait list is not empty. + let tail = state.waiter().unwrap(); + + if tail != head { + // Inconsistent + debug!(" + pop; inconsistent 2"); + thread::yield_now(); + continue 'outer; + } + + self.push_stub(closed); + + next_ptr = head.as_ref().next.load(Acquire); + + if let Some(next) = NonNull::new(next_ptr) { + self.head.with_mut(|head| *head = next); + + return Some(Arc::from_raw(head.as_ptr())); + } + + // Inconsistent state, loop + debug!(" + pop; inconsistent 3"); + thread::yield_now(); + } + } + } + + unsafe fn push_stub(&self, closed: bool) { + let stub = self.stub(); + + // Set the next pointer. This does not require an atomic operation as + // this node is not accessible. The write will be flushed with the next + // operation + stub.as_ref().next.store(ptr::null_mut(), Relaxed); + + // Update the tail to point to the new node. We need to see the previous + // node in order to update the next pointer as well as release `task` + // to any other threads calling `push`. + let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel); + + debug_assert_eq!(closed, prev.is_closed()); + + // The stub is only pushed when there are pending tasks. Because of + // this, the state must *always* be in pointer mode. + let prev = prev.waiter().unwrap(); + + // We don't want the *existing* pointer to be a stub. + debug_assert_ne!(prev, stub); + + // Release `task` to the consume end. + prev.as_ref().next.store(stub.as_ptr(), Release); + } + + fn stub(&self) -> NonNull { + unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) } + } +} + +impl fmt::Debug for Semaphore { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Semaphore") + .field("state", &SemState::load(&self.state, Relaxed)) + .field("head", &self.head.with(|ptr| ptr)) + .field("rx_lock", &self.rx_lock.load(Relaxed)) + .field("stub", &self.stub) + .finish() + } +} + +unsafe impl Send for Semaphore {} +unsafe impl Sync for Semaphore {} + +// ===== impl Permit ===== + +impl Permit { + /// Create a new `Permit`. + /// + /// The permit begins in the "unacquired" state. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::semaphore::Permit; + /// + /// let permit = Permit::new(); + /// assert!(!permit.is_acquired()); + /// ``` + pub fn new() -> Permit { + Permit { + waiter: None, + state: PermitState::Idle, + } + } + + /// Returns true if the permit has been acquired + pub fn is_acquired(&self) -> bool { + self.state == PermitState::Acquired + } + + /// Try to acquire the permit. If no permits are available, the current task + /// is notified once a new permit becomes available. + pub fn poll_acquire( + &mut self, + cx: &mut Context<'_>, + semaphore: &Semaphore, + ) -> Poll> { + match self.state { + PermitState::Idle => {} + PermitState::Waiting => { + let waiter = self.waiter.as_ref().unwrap(); + + if waiter.acquire(cx)? { + self.state = PermitState::Acquired; + return Ready(Ok(())); + } else { + return Pending; + } + } + PermitState::Acquired => { + return Ready(Ok(())); + } + } + + match semaphore.poll_permit(Some((cx, self)))? { + Ready(()) => { + self.state = PermitState::Acquired; + Ready(Ok(())) + } + Pending => { + self.state = PermitState::Waiting; + Pending + } + } + } + + /// Try to acquire the permit. + pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { + match self.state { + PermitState::Idle => {} + PermitState::Waiting => { + let waiter = self.waiter.as_ref().unwrap(); + + if waiter.acquire2().map_err(to_try_acquire)? { + self.state = PermitState::Acquired; + return Ok(()); + } else { + return Err(TryAcquireError::no_permits()); + } + } + PermitState::Acquired => { + return Ok(()); + } + } + + match semaphore.poll_permit(None).map_err(to_try_acquire)? { + Ready(()) => { + self.state = PermitState::Acquired; + Ok(()) + } + Pending => Err(TryAcquireError::no_permits()), + } + } + + /// Release a permit back to the semaphore + pub fn release(&mut self, semaphore: &Semaphore) { + if self.forget2() { + semaphore.add_permits(1); + } + } + + /// Forget the permit **without** releasing it back to the semaphore. + /// + /// After calling `forget`, `poll_acquire` is able to acquire new permit + /// from the sempahore. + /// + /// Repeatedly calling `forget` without associated calls to `add_permit` + /// will result in the semaphore losing all permits. + pub fn forget(&mut self) { + self.forget2(); + } + + /// Returns `true` if the permit was acquired + fn forget2(&mut self) -> bool { + match self.state { + PermitState::Idle => false, + PermitState::Waiting => { + let ret = self.waiter.as_ref().unwrap().cancel_interest(); + self.state = PermitState::Idle; + ret + } + PermitState::Acquired => { + self.state = PermitState::Idle; + true + } + } + } +} + +impl Default for Permit { + fn default() -> Self { + Self::new() + } +} + +// ===== impl AcquireError ==== + +impl AcquireError { + fn closed() -> AcquireError { + AcquireError(()) + } +} + +fn to_try_acquire(_: AcquireError) -> TryAcquireError { + TryAcquireError::closed() +} + +impl fmt::Display for AcquireError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "semaphore closed") + } +} + +impl ::std::error::Error for AcquireError {} + +// ===== impl TryAcquireError ===== + +impl TryAcquireError { + fn closed() -> TryAcquireError { + TryAcquireError { + kind: ErrorKind::Closed, + } + } + + fn no_permits() -> TryAcquireError { + TryAcquireError { + kind: ErrorKind::NoPermits, + } + } + + /// Returns true if the error was caused by a closed semaphore. + pub fn is_closed(&self) -> bool { + match self.kind { + ErrorKind::Closed => true, + _ => false, + } + } + + /// Returns true if the error was caused by calling `try_acquire` on a + /// semaphore with no available permits. + pub fn is_no_permits(&self) -> bool { + match self.kind { + ErrorKind::NoPermits => true, + _ => false, + } + } +} + +impl fmt::Display for TryAcquireError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let descr = match self.kind { + ErrorKind::Closed => "semaphore closed", + ErrorKind::NoPermits => "no permits available", + }; + write!(fmt, "{}", descr) + } +} + +impl ::std::error::Error for TryAcquireError {} + +// ===== impl WaiterNode ===== + +impl WaiterNode { + fn new() -> WaiterNode { + WaiterNode { + state: AtomicUsize::new(NodeState::new().to_usize()), + waker: AtomicWaker::new(), + next: AtomicPtr::new(ptr::null_mut()), + } + } + + fn acquire(&self, cx: &mut Context<'_>) -> Result { + if self.acquire2()? { + return Ok(true); + } + + self.waker.register_by_ref(cx.waker()); + + self.acquire2() + } + + fn acquire2(&self) -> Result { + use self::NodeState::*; + + match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) { + Ok(_) => Ok(true), + Err(Closed) => Err(AcquireError::closed()), + Err(_) => Ok(false), + } + } + + fn register(&self, cx: &mut Context<'_>) { + self.waker.register_by_ref(cx.waker()) + } + + /// Returns `true` if the permit has been acquired + fn cancel_interest(&self) -> bool { + use self::NodeState::*; + + match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) { + // Successfully removed interest from the queued node. The permit + // has not been assigned to the node. + Ok(_) => false, + // The semaphore has been closed, there is no further action to + // take. + Err(Closed) => false, + // The permit has been assigned. It must be acquired in order to + // be released back to the semaphore. + Err(Assigned) => { + match self.acquire2() { + Ok(true) => true, + // Not a reachable state + Ok(false) => panic!(), + // The semaphore has been closed, no further action to take. + Err(_) => false, + } + } + Err(state) => panic!("unexpected state = {:?}", state), + } + } + + /// Transition the state to `QueuedWaiting`. + /// + /// This step can only happen from `Queued` or from `Idle`. + /// + /// Returns `true` if transitioning into a queued state. + fn to_queued_waiting(&self) -> bool { + use self::NodeState::*; + + let mut curr = NodeState::load(&self.state, Acquire); + + loop { + debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr); + let next = QueuedWaiting; + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => { + if curr.is_queued() { + return false; + } else { + // Transitioned to queued, reset next pointer + self.next.store(ptr::null_mut(), Relaxed); + return true; + } + } + Err(actual) => { + curr = actual; + } + } + } + } + + /// Notify the waiter + /// + /// Returns `true` if the waiter accepts the notification + fn notify(&self, closed: bool) -> bool { + use self::NodeState::*; + + // Assume QueuedWaiting state + let mut curr = QueuedWaiting; + + loop { + let next = match curr { + Queued => Idle, + QueuedWaiting => { + if closed { + Closed + } else { + Assigned + } + } + actual => panic!("actual = {:?}", actual), + }; + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => match curr { + QueuedWaiting => { + debug!(" + notify -- task notified"); + self.waker.wake(); + return true; + } + other => { + debug!(" + notify -- not notified; state = {:?}", other); + return false; + } + }, + Err(actual) => curr = actual, + } + } + } + + fn revert_to_idle(&self) { + use self::NodeState::Idle; + + // There are no other handles to the node + NodeState::store(&self.state, Idle, Relaxed); + } + + #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293 + fn into_non_null(self: Arc) -> NonNull { + let ptr = Arc::into_raw(self); + unsafe { NonNull::new_unchecked(ptr as *mut _) } + } +} + +// ===== impl State ===== + +/// Flag differentiating between available permits and waiter pointers. +/// +/// If we assume pointers are properly aligned, then the least significant bit +/// will always be zero. So, we use that bit to track if the value represents a +/// number. +const NUM_FLAG: usize = 0b01; + +const CLOSED_FLAG: usize = 0b10; + +const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT; + +/// When representing "numbers", the state has to be shifted this much (to get +/// rid of the flag bit). +const NUM_SHIFT: usize = 2; + +impl SemState { + /// Returns a new default `State` value. + fn new(permits: usize, stub: &WaiterNode) -> SemState { + assert!(permits <= MAX_PERMITS); + + if permits > 0 { + SemState((permits << NUM_SHIFT) | NUM_FLAG) + } else { + SemState(stub as *const _ as usize) + } + } + + /// Returns a `State` tracking `ptr` as the tail of the queue. + fn new_ptr(tail: NonNull, closed: bool) -> SemState { + let mut val = tail.as_ptr() as usize; + + if closed { + val |= CLOSED_FLAG; + } + + SemState(val) + } + + /// Returns the amount of remaining capacity + fn available_permits(self) -> usize { + if !self.has_available_permits() { + return 0; + } + + self.0 >> NUM_SHIFT + } + + /// Returns true if the state has permits that can be claimed by a waiter. + fn has_available_permits(self) -> bool { + self.0 & NUM_FLAG == NUM_FLAG + } + + fn has_waiter(self, stub: &WaiterNode) -> bool { + !self.has_available_permits() && !self.is_stub(stub) + } + + /// Try to acquire a permit + /// + /// # Return + /// + /// Returns `true` if the permit was acquired, `false` otherwise. If `false` + /// is returned, it can be assumed that `State` represents the head pointer + /// in the mpsc channel. + fn acquire_permit(&mut self, stub: &WaiterNode) -> bool { + if !self.has_available_permits() { + return false; + } + + debug_assert!(self.waiter().is_none()); + + self.0 -= 1 << NUM_SHIFT; + + if self.0 == NUM_FLAG { + // Set the state to the stub pointer. + self.0 = stub as *const _ as usize; + } + + true + } + + /// Release permits + /// + /// Returns `true` if the permits were accepted. + fn release_permits(&mut self, permits: usize, stub: &WaiterNode) { + debug_assert!(permits > 0); + + if self.is_stub(stub) { + self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG); + return; + } + + debug_assert!(self.has_available_permits()); + + self.0 += permits << NUM_SHIFT; + } + + fn is_waiter(self) -> bool { + self.0 & NUM_FLAG == 0 + } + + /// Returns the waiter, if one is set. + fn waiter(self) -> Option> { + if self.is_waiter() { + let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored"); + + Some(waiter) + } else { + None + } + } + + /// Assumes `self` represents a pointer + fn as_ptr(self) -> *mut WaiterNode { + (self.0 & !CLOSED_FLAG) as *mut WaiterNode + } + + /// Set to a pointer to a waiter. + /// + /// This can only be done from the full state. + fn set_waiter(&mut self, waiter: NonNull) { + let waiter = waiter.as_ptr() as usize; + debug_assert!(waiter & NUM_FLAG == 0); + debug_assert!(!self.is_closed()); + + self.0 = waiter; + } + + fn is_stub(self, stub: &WaiterNode) -> bool { + self.as_ptr() as usize == stub as *const _ as usize + } + + /// Load the state from an AtomicUsize. + fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState { + let value = cell.load(ordering); + debug!(" + SemState::load; value = {}", value); + SemState(value) + } + + /// Swap the values + fn swap(self, cell: &AtomicUsize, ordering: Ordering) -> SemState { + let prev = SemState(cell.swap(self.to_usize(), ordering)); + debug_assert_eq!(prev.is_closed(), self.is_closed()); + prev + } + + /// Compare and exchange the current value into the provided cell + fn compare_exchange( + self, + cell: &AtomicUsize, + prev: SemState, + success: Ordering, + failure: Ordering, + ) -> Result { + debug_assert_eq!(prev.is_closed(), self.is_closed()); + + let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure); + + debug!( + " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}", + prev.to_usize(), + self.to_usize(), + res + ); + + res.map(SemState).map_err(SemState) + } + + fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState { + let value = cell.fetch_or(CLOSED_FLAG, ordering); + SemState(value) + } + + fn is_closed(self) -> bool { + self.0 & CLOSED_FLAG == CLOSED_FLAG + } + + /// Converts the state into a `usize` representation. + fn to_usize(self) -> usize { + self.0 + } +} + +impl fmt::Debug for SemState { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut fmt = fmt.debug_struct("SemState"); + + if self.is_waiter() { + fmt.field("state", &""); + } else { + fmt.field("permits", &self.available_permits()); + } + + fmt.finish() + } +} + +// ===== impl NodeState ===== + +impl NodeState { + fn new() -> NodeState { + NodeState::Idle + } + + fn from_usize(value: usize) -> NodeState { + use self::NodeState::*; + + match value { + 0 => Idle, + 1 => Queued, + 2 => QueuedWaiting, + 3 => Assigned, + 4 => Closed, + _ => panic!(), + } + } + + fn load(cell: &AtomicUsize, ordering: Ordering) -> NodeState { + NodeState::from_usize(cell.load(ordering)) + } + + /// Store a value + fn store(cell: &AtomicUsize, value: NodeState, ordering: Ordering) { + cell.store(value.to_usize(), ordering); + } + + fn compare_exchange( + self, + cell: &AtomicUsize, + prev: NodeState, + success: Ordering, + failure: Ordering, + ) -> Result { + cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure) + .map(NodeState::from_usize) + .map_err(NodeState::from_usize) + } + + /// Returns `true` if `self` represents a queued state. + fn is_queued(self) -> bool { + use self::NodeState::*; + + match self { + Queued | QueuedWaiting => true, + _ => false, + } + } + + fn to_usize(self) -> usize { + self as usize + } +} -- cgit v1.2.3