summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/sync/mod.rs6
-rw-r--r--tokio/src/sync/mpsc/bounded.rs2
-rw-r--r--tokio/src/sync/mpsc/chan.rs6
-rw-r--r--tokio/src/sync/mutex.rs6
-rw-r--r--tokio/src/sync/semaphore.rs1089
-rw-r--r--tokio/src/sync/semaphore_ll.rs1070
-rw-r--r--tokio/src/sync/tests/loom_semaphore_ll.rs (renamed from tokio/src/sync/tests/loom_semaphore.rs)2
-rw-r--r--tokio/src/sync/tests/mod.rs4
-rw-r--r--tokio/src/sync/tests/semaphore_ll.rs (renamed from tokio/src/sync/tests/semaphore.rs)2
-rw-r--r--tokio/tests/sync_semaphore.rs81
10 files changed, 1221 insertions, 1047 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index df31f8df..a05accbf 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -26,7 +26,9 @@ cfg_sync! {
pub mod oneshot;
- pub(crate) mod semaphore;
+ pub(crate) mod semaphore_ll;
+ mod semaphore;
+ pub use semaphore::{Semaphore, SemaphorePermit};
mod task;
pub(crate) use task::AtomicWaker;
@@ -48,7 +50,7 @@ cfg_not_sync! {
cfg_signal! {
pub(crate) mod mpsc;
- pub(crate) mod semaphore;
+ pub(crate) mod semaphore_ll;
}
}
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 5cca1596..7294e4d5 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,6 +1,6 @@
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
-use crate::sync::semaphore;
+use crate::sync::semaphore_ll as semaphore;
use std::fmt;
use std::task::{Context, Poll};
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index b6e94d5a..7a15e8b3 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -382,7 +382,7 @@ impl<T, S> Drop for Chan<T, S> {
}
}
-use crate::sync::semaphore::TryAcquireError;
+use crate::sync::semaphore_ll::TryAcquireError;
impl From<TryAcquireError> for TrySendError {
fn from(src: TryAcquireError) -> TrySendError {
@@ -398,9 +398,9 @@ impl From<TryAcquireError> for TrySendError {
// ===== impl Semaphore for (::Semaphore, capacity) =====
-use crate::sync::semaphore::Permit;
+use crate::sync::semaphore_ll::Permit;
-impl Semaphore for (crate::sync::semaphore::Semaphore, usize) {
+impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
type Permit = Permit;
fn new_permit() -> Permit {
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index bee00df4..ec59d695 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -35,7 +35,7 @@
//! [`MutexGuard`]: struct.MutexGuard.html
use crate::future::poll_fn;
-use crate::sync::semaphore;
+use crate::sync::semaphore_ll as semaphore;
use std::cell::UnsafeCell;
use std::error::Error;
@@ -74,7 +74,7 @@ unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
/// An enumeration of possible errors associated with a `TryLockResult`
-/// which can occur while trying to aquire a lock from the `try_lock`
+/// which can occur while trying to acquire a lock from the `try_lock`
/// method on a `Mutex`.
#[derive(Debug)]
pub enum TryLockError {
@@ -129,7 +129,7 @@ impl<T> Mutex<T> {
guard
}
- /// Try to aquire the lock
+ /// Try to acquire the lock
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
let mut permit = semaphore::Permit::new();
match permit.try_acquire(&self.s) {
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
index dab73a09..c1e9ef3e 100644
--- a/tokio/src/sync/semaphore.rs
+++ b/tokio/src/sync/semaphore.rs
@@ -1,1070 +1,91 @@
-#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+use super::semaphore_ll as ll; // low level implementation
+use crate::future::poll_fn;
-//! Thread-safe, asynchronous counting semaphore.
-//!
-//! A `Semaphore` instance holds a set of permits. Permits are used to
-//! synchronize access to a shared resource.
-//!
-//! Before accessing the shared resource, callers acquire a permit from the
-//! semaphore. Once the permit is acquired, the caller then enters the critical
-//! section. If no permits are available, then acquiring the semaphore returns
-//! `Pending`. The task is woken once a permit becomes available.
-
-use crate::loom::{
- cell::CausalCell,
- future::AtomicWaker,
- sync::atomic::{AtomicPtr, AtomicUsize},
- thread,
-};
-
-use std::fmt;
-use std::ptr::{self, NonNull};
-use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
-use std::sync::Arc;
-use std::task::Poll::{Pending, Ready};
-use std::task::{Context, Poll};
-use std::usize;
-
-/// Futures-aware semaphore.
-pub(crate) struct Semaphore {
- /// Tracks both the waiter queue tail pointer and the number of remaining
- /// permits.
- state: AtomicUsize,
-
- /// waiter queue head pointer.
- head: CausalCell<NonNull<WaiterNode>>,
-
- /// Coordinates access to the queue head.
- rx_lock: AtomicUsize,
-
- /// Stub waiter node used as part of the MPSC channel algorithm.
- stub: Box<WaiterNode>,
-}
-
-/// A semaphore permit
-///
-/// Tracks the lifecycle of a semaphore permit.
+/// Counting semaphore performing asynchronous permit aquisition.
///
-/// An instance of `Permit` is intended to be used with a **single** instance of
-/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
-/// instances will result in unexpected behavior.
+/// A semaphore maintains a set of permits. Permits are used to synchronize
+/// access to a shared resource. A semaphore differs from a mutex in that it
+/// can allow more than one concurrent caller to access the shared resource at a
+/// time.
///
-/// `Permit` does **not** release the permit back to the semaphore on drop. It
-/// is the user's responsibility to ensure that `Permit::release` is called
-/// before dropping the permit.
+/// When `acquire` is called and the semaphore has remaining permits, the
+/// function immediately returns a permit. However, if no remaining permits are
+/// available, `acquire` (asynchronously) waits until an outstanding permit is
+/// dropped. At this point, the freed permit is assigned to the caller.
#[derive(Debug)]
-pub(crate) struct Permit {
- waiter: Option<Arc<WaiterNode>>,
- state: PermitState,
+pub struct Semaphore {
+ /// The low level semaphore
+ ll_sem: ll::Semaphore,
}
-/// Error returned by `Permit::poll_acquire`.
+/// A permit from the semaphore
+#[must_use]
#[derive(Debug)]
-pub(crate) struct AcquireError(());
-
-/// Error returned by `Permit::try_acquire`.
-#[derive(Debug)]
-pub(crate) struct TryAcquireError {
- kind: ErrorKind,
-}
-
-#[derive(Debug)]
-enum ErrorKind {
- Closed,
- NoPermits,
-}
-
-/// Node used to notify the semaphore waiter when permit is available.
-#[derive(Debug)]
-struct WaiterNode {
- /// Stores waiter state.
- ///
- /// See `NodeState` for more details.
- state: AtomicUsize,
-
- /// Task to wake when a permit is made available.
- waker: AtomicWaker,
-
- /// Next pointer in the queue of waiting senders.
- next: AtomicPtr<WaiterNode>,
+pub struct SemaphorePermit<'a> {
+ sem: &'a Semaphore,
+ // the low level permit
+ ll_permit: ll::Permit,
}
-/// Semaphore state
-///
-/// The 2 low bits track the modes.
+/// Error returned from the [`Semaphore::try_acquire`] function.
///
-/// - Closed
-/// - Full
+/// A `try_acquire` operation can only fail if the semaphore has no available
+/// permits.
///
-/// When not full, the rest of the `usize` tracks the total number of messages
-/// in the channel. When full, the rest of the `usize` is a pointer to the tail
-/// of the "waiting senders" queue.
-#[derive(Copy, Clone)]
-struct SemState(usize);
-
-/// Permit state
-#[derive(Debug, Copy, Clone, Eq, PartialEq)]
-enum PermitState {
- /// The permit has not been requested.
- Idle,
-
- /// Currently waiting for a permit to be made available and assigned to the
- /// waiter.
- Waiting,
-
- /// The permit has been acquired.
- Acquired,
-}
-
-/// Waiter node state
-#[derive(Debug, Copy, Clone, Eq, PartialEq)]
-#[repr(usize)]
-enum NodeState {
- /// Not waiting for a permit and the node is not in the wait queue.
- ///
- /// This is the initial state.
- Idle = 0,
-
- /// Not waiting for a permit but the node is in the wait queue.
- ///
- /// This happens when the waiter has previously requested a permit, but has
- /// since canceled the request. The node cannot be removed by the waiter, so
- /// this state informs the receiver to skip the node when it pops it from
- /// the wait queue.
- Queued = 1,
-
- /// Waiting for a permit and the node is in the wait queue.
- QueuedWaiting = 2,
-
- /// The waiter has been assigned a permit and the node has been removed from
- /// the queue.
- Assigned = 3,
-
- /// The semaphore has been closed. No more permits will be issued.
- Closed = 4,
-}
-
-// ===== impl Semaphore =====
+/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
+#[derive(Debug)]
+pub struct TryAcquireError(());
impl Semaphore {
/// Creates a new semaphore with the initial number of permits
- ///
- /// # Panics
- ///
- /// Panics if `permits` is zero.
- pub(crate) fn new(permits: usize) -> Semaphore {
- let stub = Box::new(WaiterNode::new());
- let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
-
- // Allocations are aligned
- debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);
-
- let state = SemState::new(permits, &stub);
-
- Semaphore {
- state: AtomicUsize::new(state.to_usize()),
- head: CausalCell::new(ptr),
- rx_lock: AtomicUsize::new(0),
- stub,
+ pub fn new(permits: usize) -> Self {
+ Self {
+ ll_sem: ll::Semaphore::new(permits),
}
}
/// Returns the current number of available permits
- pub(crate) fn available_permits(&self) -> usize {
- let curr = SemState::load(&self.state, Acquire);
- curr.available_permits()
- }
-
- /// Poll for a permit
- fn poll_permit(
- &self,
- mut permit: Option<(&mut Context<'_>, &mut Permit)>,
- ) -> Poll<Result<(), AcquireError>> {
- // Load the current state
- let mut curr = SemState::load(&self.state, Acquire);
-
- // Tracks a *mut WaiterNode representing an Arc clone.
- //
- // This avoids having to bump the ref count unless required.
- let mut maybe_strong: Option<NonNull<WaiterNode>> = None;
-
- macro_rules! undo_strong {
- () => {
- if let Some(waiter) = maybe_strong {
- // The waiter was cloned, but never got queued.
- // Before entering `poll_permit`, the waiter was in the
- // `Idle` state. We must transition the node back to the
- // idle state.
- let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) };
- waiter.revert_to_idle();
- }
- };
- }
-
- loop {
- let mut next = curr;
-
- if curr.is_closed() {
- undo_strong!();
- return Ready(Err(AcquireError::closed()));
- }
-
- if !next.acquire_permit(&self.stub) {
- debug_assert!(curr.waiter().is_some());
-
- if maybe_strong.is_none() {
- if let Some((ref mut cx, ref mut permit)) = permit {
- // Get the Sender's waiter node, or initialize one
- let waiter = permit
- .waiter
- .get_or_insert_with(|| Arc::new(WaiterNode::new()));
-
- waiter.register(cx);
-
- if !waiter.to_queued_waiting() {
- // The node is alrady queued, there is no further work
- // to do.
- return Pending;
- }
-
- maybe_strong = Some(WaiterNode::into_non_null(waiter.clone()));
- } else {
- // If no `waiter`, then the task is not registered and there
- // is no further work to do.
- return Pending;
- }
- }
-
- next.set_waiter(maybe_strong.unwrap());
- }
-
- debug_assert_ne!(curr.0, 0);
- debug_assert_ne!(next.0, 0);
-
- match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
- Ok(_) => {
- match curr.waiter() {
- Some(prev_waiter) => {
- let waiter = maybe_strong.unwrap();
-
- // Finish pushing
- unsafe {
- prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
- }
-
- return Pending;
- }
- None => {
- undo_strong!();
-
- return Ready(Ok(()));
- }
- }
- }
- Err(actual) => {
- curr = actual;
- }
- }
- }
- }
-
- /// Close the semaphore. This prevents the semaphore from issuing new
- /// permits and notifies all pending waiters.
- pub(crate) fn close(&self) {
- // Acquire the `rx_lock`, setting the "closed" flag on the lock.
- let prev = self.rx_lock.fetch_or(1, AcqRel);
-
- if prev != 0 {
- // Another thread has the lock and will be responsible for notifying
- // pending waiters.
- return;
- }
-
- self.add_permits_locked(0, true);
+ pub fn available_permits(&self) -> usize {
+ self.ll_sem.available_permits()
}
/// Add `n` new permits to the semaphore.
- pub(crate) fn add_permits(&self, n: usize) {
- if n == 0 {
- return;
- }
-
- // TODO: Handle overflow. A panic is not sufficient, the process must
- // abort.
- let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
-
- if prev != 0 {
- // Another thread has the lock and will be responsible for notifying
- // pending waiters.
- return;
- }
-
- self.add_permits_locked(n, false);
+ pub fn add_permits(&self, n: usize) {
+ self.ll_sem.add_permits(n);
}
- fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
- while rem > 0 || closed {
- if closed {
- SemState::fetch_set_closed(&self.state, AcqRel);
- }
-
- // Release the permits and notify
- self.add_permits_locked2(rem, closed);
-
- let n = rem << 1;
-
- let actual = if closed {
- let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
- closed = false;
- actual
- } else {
- let actual = self.rx_lock.fetch_sub(n, AcqRel);
- closed = actual & 1 == 1;
- actual
- };
-
- rem = (actual >> 1) - rem;
- }
- }
-
- /// Release a specific amount of permits to the semaphore
- ///
- /// This function is called by `add_permits` after the add lock has been
- /// acquired.
- fn add_permits_locked2(&self, mut n: usize, closed: bool) {
- while n > 0 || closed {
- let waiter = match self.pop(n, closed) {
- Some(waiter) => waiter,
- None => {
- return;
- }
- };
-
- if waiter.notify(closed) {
- n = n.saturating_sub(1);
- }
- }
- }
-
- /// Pop a waiter
- ///
- /// `rem` represents the remaining number of times the caller will pop. If
- /// there are no more waiters to pop, `rem` is used to set the available
- /// permits.
- fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
- 'outer: loop {
- unsafe {
- let mut head = self.head.with(|head| *head);
- let mut next_ptr = head.as_ref().next.load(Acquire);
-
- let stub = self.stub();
-
- if head == stub {
- let next = match NonNull::new(next_ptr) {
- Some(next) => next,
- None => {
- // This loop is not part of the standard intrusive mpsc
- // channel algorithm. This is where we atomically pop
- // the last task and add `rem` to the remaining capacity.
- //
- // This modification to the pop algorithm works because,
- // at this point, we have not done any work (only done
- // reading). We have a *pretty* good idea that there is
- // no concurrent pusher.
- //
- // The capacity is then atomically added by doing an
- // AcqRel CAS on `state`. The `state` cell is the
- // linchpin of the algorithm.
- //
- // By successfully CASing `head` w/ AcqRel, we ensure
- // that, if any thread was racing and entered a push, we
- // see that and abort pop, retrying as it is
- // "inconsistent".
- let mut curr = SemState::load(&self.state, Acquire);
-
- loop {
- if curr.has_waiter(&self.stub) {
- // Inconsistent
- thread::yield_now();
- continue 'outer;
- }
-
- // When closing the semaphore, nodes are popped
- // with `rem == 0`. In this case, we are not
- // adding permits, but notifying waiters of the
- // semaphore's closed state.
- if rem == 0 {
- debug_assert!(curr.is_closed(), "state = {:?}", curr);
- return None;
- }
-
- let mut next = curr;
- next.release_permits(rem, &self.stub);
-
- match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
- Ok(_) => return None,
- Err(actual) => {
- curr = actual;
- }
- }
- }
- }
- };
-
- self.head.with_mut(|head| *head = next);
- head = next;
- next_ptr = next.as_ref().next.load(Acquire);
- }
-
- if let Some(next) = NonNull::new(next_ptr) {
- self.head.with_mut(|head| *head = next);
-
- return Some(Arc::from_raw(head.as_ptr()));
- }
-
- let state = SemState::load(&self.state, Acquire);
-
- // This must always be a pointer as the wait list is not empty.
- let tail = state.waiter().unwrap();
-
- if tail != head {
- // Inconsistent
- thread::yield_now();
- continue 'outer;
- }
-
- self.push_stub(closed);
-
- next_ptr = head.as_ref().next.load(Acquire);
-
- if let Some(next) = NonNull::new(next_ptr) {
- self.head.with_mut(|head| *head = next);
-
- return Some(Arc::from_raw(head.as_ptr()));
- }
-
- // Inconsistent state, loop
- thread::yield_now();
- }
- }
- }
-
- unsafe fn push_stub(&self, closed: bool) {
- let stub = self.stub();
-
- // Set the next pointer. This does not require an atomic operation as
- // this node is not accessible. The write will be flushed with the next
- // operation
- stub.as_ref().next.store(ptr::null_mut(), Relaxed);
-
- // Update the tail to point to the new node. We need to see the previous
- // node in order to update the next pointer as well as release `task`
- // to any other threads calling `push`.
- let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel);
-
- debug_assert_eq!(closed, prev.is_closed());
-
- // The stub is only pushed when there are pending tasks. Because of
- // this, the state must *always* be in pointer mode.
- let prev = prev.waiter().unwrap();
-
- // We don't want the *existing* pointer to be a stub.
- debug_assert_ne!(prev, stub);
-
- // Release `task` to the consume end.
- prev.as_ref().next.store(stub.as_ptr(), Release);
- }
-
- fn stub(&self) -> NonNull<WaiterNode> {
- unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
- }
-}
-
-impl fmt::Debug for Semaphore {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Semaphore")
- .field("state", &SemState::load(&self.state, Relaxed))
- .field("head", &self.head.with(|ptr| ptr))
- .field("rx_lock", &self.rx_lock.load(Relaxed))
- .field("stub", &self.stub)
- .finish()
- }
-}
-
-unsafe impl Send for Semaphore {}
-unsafe impl Sync for Semaphore {}
-
-// ===== impl Permit =====
-
-impl Permit {
- /// Create a new `Permit`.
- ///
- /// The permit begins in the "unacquired" state.
- pub(crate) fn new() -> Permit {
- Permit {
- waiter: None,
- state: PermitState::Idle,
- }
- }
-
- /// Returns true if the permit has been acquired
- pub(crate) fn is_acquired(&self) -> bool {
- self.state == PermitState::Acquired
- }
-
- /// Try to acquire the permit. If no permits are available, the current task
- /// is notified once a new permit becomes available.
- pub(crate) fn poll_acquire(
- &mut self,
- cx: &mut Context<'_>,
- semaphore: &Semaphore,
- ) -> Poll<Result<(), AcquireError>> {
- match self.state {
- PermitState::Idle => {}
- PermitState::Waiting => {
- let waiter = self.waiter.as_ref().unwrap();
-
- if waiter.acquire(cx)? {
- self.state = PermitState::Acquired;
- return Ready(Ok(()));
- } else {
- return Pending;
- }
- }
- PermitState::Acquired => {
- return Ready(Ok(()));
- }
- }
-
- match semaphore.poll_permit(Some((cx, self)))? {
- Ready(()) => {
- self.state = PermitState::Acquired;
- Ready(Ok(()))
- }
- Pending => {
- self.state = PermitState::Waiting;
- Pending
- }
- }
- }
-
- /// Try to acquire the permit.
- pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
- match self.state {
- PermitState::Idle => {}
- PermitState::Waiting => {
- let waiter = self.waiter.as_ref().unwrap();
-
- if waiter.acquire2().map_err(to_try_acquire)? {
- self.state = PermitState::Acquired;
- return Ok(());
- } else {
- return Err(TryAcquireError::no_permits());
- }
- }
- PermitState::Acquired => {
- return Ok(());
- }
- }
-
- match semaphore.poll_permit(None).map_err(to_try_acquire)? {
- Ready(()) => {
- self.state = PermitState::Acquired;
- Ok(())
- }
- Pending => Err(TryAcquireError::no_permits()),
- }
- }
-
- /// Release a permit back to the semaphore
- pub(crate) fn release(&mut self, semaphore: &Semaphore) {
- if self.forget2() {
- semaphore.add_permits(1);
- }
- }
-
- /// Forget the permit **without** releasing it back to the semaphore.
- ///
- /// After calling `forget`, `poll_acquire` is able to acquire new permit
- /// from the sempahore.
- ///
- /// Repeatedly calling `forget` without associated calls to `add_permit`
- /// will result in the semaphore losing all permits.
- pub(crate) fn forget(&mut self) {
- self.forget2();
- }
-
- /// Returns `true` if the permit was acquired
- fn forget2(&mut self) -> bool {
- match self.state {
- PermitState::Idle => false,
- PermitState::Waiting => {
- let ret = self.waiter.as_ref().unwrap().cancel_interest();
- self.state = PermitState::Idle;
- ret
- }
- PermitState::Acquired => {
- self.state = PermitState::Idle;
- true
- }
- }
- }
-}
-
-impl Default for Permit {
- fn default() -> Self {
- Self::new()
- }
-}
-
-// ===== impl AcquireError ====
-
-impl AcquireError {
- fn closed() -> AcquireError {
- AcquireError(())
- }
-}
-
-fn to_try_acquire(_: AcquireError) -> TryAcquireError {
- TryAcquireError::closed()
-}
-
-impl fmt::Display for AcquireError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "semaphore closed")
- }
-}
-
-impl ::std::error::Error for AcquireError {}
-
-// ===== impl TryAcquireError =====
-
-impl TryAcquireError {
- fn closed() -> TryAcquireError {
- TryAcquireError {
- kind: ErrorKind::Closed,
- }
- }
-
- fn no_permits() -> TryAcquireError {
- TryAcquireError {
- kind: ErrorKind::NoPermits,
- }
- }
-
- /// Returns true if the error was caused by a closed semaphore.
- pub(crate) fn is_closed(&self) -> bool {
- match self.kind {
- ErrorKind::Closed => true,
- _ => false,
- }
- }
-
- /// Returns true if the error was caused by calling `try_acquire` on a
- /// semaphore with no available permits.
- pub(crate) fn is_no_permits(&self) -> bool {
- match self.kind {
- ErrorKind::NoPermits => true,
- _ => false,
- }
- }
-}
-
-impl fmt::Display for TryAcquireError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- let descr = match self.kind {
- ErrorKind::Closed => "semaphore closed",
- ErrorKind::NoPermits => "no permits available",
+ /// Acquire permit from the semaphore
+ pub async fn acquire(&self) -> SemaphorePermit<'_> {
+ let mut permit = SemaphorePermit {
+ sem: &self,
+ ll_permit: ll::Permit::new(),
};
- write!(fmt, "{}", descr)
- }
-}
-
-impl ::std::error::Error for TryAcquireError {}
-
-// ===== impl WaiterNode =====
-
-impl WaiterNode {
- fn new() -> WaiterNode {
- WaiterNode {
- state: AtomicUsize::new(NodeState::new().to_usize()),
- waker: AtomicWaker::new(),
- next: AtomicPtr::new(ptr::null_mut()),
- }
- }
-
- fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> {
- if self.acquire2()? {
- return Ok(true);
- }
-
- self.waker.register_by_ref(cx.waker());
-
- self.acquire2()
- }
-
- fn acquire2(&self) -> Result<bool, AcquireError> {
- use self::NodeState::*;
-
- match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) {
- Ok(_) => Ok(true),
- Err(Closed) => Err(AcquireError::closed()),
- Err(_) => Ok(false),
- }
- }
-
- fn register(&self, cx: &mut Context<'_>) {
- self.waker.register_by_ref(cx.waker())
- }
-
- /// Returns `true` if the permit has been acquired
- fn cancel_interest(&self) -> bool {
- use self::NodeState::*;
-
- match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) {
- // Successfully removed interest from the queued node. The permit
- // has not been assigned to the node.
- Ok(_) => false,
- // The semaphore has been closed, there is no further action to
- // take.
- Err(Closed) => false,
- // The permit has been assigned. It must be acquired in order to
- // be released back to the semaphore.
- Err(Assigned) => {
- match self.acquire2() {
- Ok(true) => true,
- // Not a reachable state
- Ok(false) => panic!(),
- // The semaphore has been closed, no further action to take.
- Err(_) => false,
- }
- }
- Err(state) => panic!("unexpected state = {:?}", state),
- }
- }
-
- /// Transition the state to `QueuedWaiting`.
- ///
- /// This step can only happen from `Queued` or from `Idle`.
- ///
- /// Returns `true` if transitioning into a queued state.
- fn to_queued_waiting(&self) -> bool {
- use self::NodeState::*;
-
- let mut curr = NodeState::load(&self.state, Acquire);
-
- loop {
- debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr);
- let next = QueuedWaiting;
-
- match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
- Ok(_) => {
- if curr.is_queued() {
- return false;
- } else {
- // Transitioned to queued, reset next pointer
- self.next.store(ptr::null_mut(), Relaxed);
- return true;
- }
- }
- Err(actual) => {
- curr = actual;
- }
- }
- }
- }
-
- /// Notify the waiter
- ///
- /// Returns `true` if the waiter accepts the notification
- fn notify(&self, closed: bool) -> bool {
- use self::NodeState::*;
-
- // Assume QueuedWaiting state
- let mut curr = QueuedWaiting;
-
- loop {
- let next = match curr {
- Queued => Idle,
- QueuedWaiting => {
- if closed {
- Closed
- } else {
- Assigned
- }
- }
- actual => panic!("actual = {:?}", actual),
- };
-
- match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
- Ok(_) => match curr {
- QueuedWaiting => {
- self.waker.wake();
- return true;
- }
- _ => return false,
- },
- Err(actual) => curr = actual,
- }
- }
- }
-
- fn revert_to_idle(&self) {
- use self::NodeState::Idle;
-
- // There are no other handles to the node
- NodeState::store(&self.state, Idle, Relaxed);
- }
-
- #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293
- fn into_non_null(self: Arc<WaiterNode>) -> NonNull<WaiterNode> {
- let ptr = Arc::into_raw(self);
- unsafe { NonNull::new_unchecked(ptr as *mut _) }
- }
-}
-
-// ===== impl State =====
-
-/// Flag differentiating between available permits and waiter pointers.
-///
-/// If we assume pointers are properly aligned, then the least significant bit
-/// will always be zero. So, we use that bit to track if the value represents a
-/// number.
-const NUM_FLAG: usize = 0b01;
-
-const CLOSED_FLAG: usize = 0b10;
-
-const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;
-
-/// When representing "numbers", the state has to be shifted this much (to get
-/// rid of the flag bit).
-const NUM_SHIFT: usize = 2;
-
-impl SemState {
- /// Returns a new default `State` value.
- fn new(permits: usize, stub: &WaiterNode) -> SemState {
- assert!(permits <= MAX_PERMITS);
-
- if permits > 0 {
- SemState((permits << NUM_SHIFT) | NUM_FLAG)
- } else {
- SemState(stub as *const _ as usize)
- }
+ poll_fn(|cx| permit.ll_permit.poll_acquire(cx, &self.ll_sem)).await.unwrap();
+ permit
}
- /// Returns a `State` tracking `ptr` as the tail of the queue.
- fn new_ptr(tail: NonNull<WaiterNode>, closed: bool) -> SemState {
- let mut val = tail.as_ptr() as usize;
-
- if closed {
- val |= CLOSED_FLAG;
- }
-
- SemState(val)
- }
-
- /// Returns the amount of remaining capacity
- fn available_permits(self) -> usize {
- if !self.has_available_permits() {
- return 0;
- }