diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-18 07:00:55 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-18 07:00:55 -0800 |
commit | 0d38936b35779b604770120da2e98560bbb6241f (patch) | |
tree | 843d46e999becdb580cb02655b4290acadd64474 /tokio/src/sync/semaphore.rs | |
parent | 13b6e9939e062dc01bcf34abe3d75de4b66e20e1 (diff) |
chore: refine feature flags (#1785)
Removes dependencies between Tokio feature flags. For example, `process`
should not depend on `sync` simply because it uses the `mpsc` channel.
Instead, feature flags represent **public** APIs that become available
with the feature enabled. When the feature is not enabled, the
functionality is removed. If another Tokio component requires the
functionality, it is stays as `pub(crate)`.
The threaded scheduler is now exposed under `rt-threaded`. This feature
flag only enables the threaded scheduler and does not include I/O,
networking, or time. Those features must be explictly enabled.
A `full` feature flag is added that enables all features.
`stdin`, `stdout`, `stderr` are exposed under `io-std`.
Macros are used to scope code by feature flag.
Diffstat (limited to 'tokio/src/sync/semaphore.rs')
-rw-r--r-- | tokio/src/sync/semaphore.rs | 108 |
1 files changed, 19 insertions, 89 deletions
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index b4a093f8..dab73a09 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + //! Thread-safe, asynchronous counting semaphore. //! //! A `Semaphore` instance holds a set of permits. Permits are used to @@ -24,7 +26,7 @@ use std::task::{Context, Poll}; use std::usize; /// Futures-aware semaphore. -pub struct Semaphore { +pub(crate) struct Semaphore { /// Tracks both the waiter queue tail pointer and the number of remaining /// permits. state: AtomicUsize, @@ -51,18 +53,18 @@ pub struct Semaphore { /// is the user's responsibility to ensure that `Permit::release` is called /// before dropping the permit. #[derive(Debug)] -pub struct Permit { +pub(crate) struct Permit { waiter: Option<Arc<WaiterNode>>, state: PermitState, } /// Error returned by `Permit::poll_acquire`. #[derive(Debug)] -pub struct AcquireError(()); +pub(crate) struct AcquireError(()); /// Error returned by `Permit::try_acquire`. #[derive(Debug)] -pub struct TryAcquireError { +pub(crate) struct TryAcquireError { kind: ErrorKind, } @@ -150,7 +152,7 @@ impl Semaphore { /// # Panics /// /// Panics if `permits` is zero. - pub fn new(permits: usize) -> Semaphore { + pub(crate) fn new(permits: usize) -> Semaphore { let stub = Box::new(WaiterNode::new()); let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); @@ -168,7 +170,7 @@ impl Semaphore { } /// Returns the current number of available permits - pub fn available_permits(&self) -> usize { + pub(crate) fn available_permits(&self) -> usize { let curr = SemState::load(&self.state, Acquire); curr.available_permits() } @@ -181,8 +183,6 @@ impl Semaphore { // Load the current state let mut curr = SemState::load(&self.state, Acquire); - debug!(" + poll_permit; sem-state = {:?}", curr); - // Tracks a *mut WaiterNode representing an Arc clone. // // This avoids having to bump the ref count unless required. @@ -210,8 +210,6 @@ impl Semaphore { } if !next.acquire_permit(&self.stub) { - debug!(" + poll_permit -- no permits"); - debug_assert!(curr.waiter().is_some()); if maybe_strong.is_none() { @@ -223,10 +221,7 @@ impl Semaphore { waiter.register(cx); - debug!(" + poll_permit -- to_queued_waiting"); - if !waiter.to_queued_waiting() { - debug!(" + poll_permit; waiter already queued"); // The node is alrady queued, there is no further work // to do. return Pending; @@ -243,14 +238,11 @@ impl Semaphore { next.set_waiter(maybe_strong.unwrap()); } - debug!(" + poll_permit -- pre-CAS; next = {:?}", next); - debug_assert_ne!(curr.0, 0); debug_assert_ne!(next.0, 0); match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { Ok(_) => { - debug!(" + poll_permit -- CAS ok"); match curr.waiter() { Some(prev_waiter) => { let waiter = maybe_strong.unwrap(); @@ -260,13 +252,9 @@ impl Semaphore { prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); } - debug!(" + poll_permit -- waiter pushed"); - return Pending; } None => { - debug!(" + poll_permit -- permit acquired"); - undo_strong!(); return Ready(Ok(())); @@ -282,15 +270,11 @@ impl Semaphore { /// Close the semaphore. This prevents the semaphore from issuing new /// permits and notifies all pending waiters. - pub fn close(&self) { - debug!("+ Semaphore::close"); - + pub(crate) fn close(&self) { // Acquire the `rx_lock`, setting the "closed" flag on the lock. let prev = self.rx_lock.fetch_or(1, AcqRel); - debug!(" + close -- rx_lock.fetch_add(1)"); if prev != 0 { - debug!("+ close -- locked; prev = {}", prev); // Another thread has the lock and will be responsible for notifying // pending waiters. return; @@ -300,9 +284,7 @@ impl Semaphore { } /// Add `n` new permits to the semaphore. - pub fn add_permits(&self, n: usize) { - debug!(" + add_permits; n = {}", n); - + pub(crate) fn add_permits(&self, n: usize) { if n == 0 { return; } @@ -310,10 +292,8 @@ impl Semaphore { // TODO: Handle overflow. A panic is not sufficient, the process must // abort. let prev = self.rx_lock.fetch_add(n << 1, AcqRel); - debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n); if prev != 0 { - debug!(" + add_permits -- locked; prev = {}", prev); // Another thread has the lock and will be responsible for notifying // pending waiters. return; @@ -324,11 +304,6 @@ impl Semaphore { fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { while rem > 0 || closed { - debug!( - " + add_permits_locked -- iter; rem = {}; closed = {:?}", - rem, closed - ); - if closed { SemState::fetch_set_closed(&self.state, AcqRel); } @@ -340,28 +315,16 @@ impl Semaphore { let actual = if closed { let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); - debug!( - " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}", - n, actual - ); - closed = false; actual } else { let actual = self.rx_lock.fetch_sub(n, AcqRel); - debug!( - " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}", - n, actual - ); - closed = actual & 1 == 1; actual }; rem = (actual >> 1) - rem; } - - debug!(" + add_permits; done"); } /// Release a specific amount of permits to the semaphore @@ -377,11 +340,8 @@ impl Semaphore { } }; - debug!(" + release_n -- notify"); - if waiter.notify(closed) { n = n.saturating_sub(1); - debug!(" + release_n -- dec"); } } } @@ -392,8 +352,6 @@ impl Semaphore { /// there are no more waiters to pop, `rem` is used to set the available /// permits. fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> { - debug!(" + pop; rem = {}", rem); - 'outer: loop { unsafe { let mut head = self.head.with(|head| *head); @@ -402,8 +360,6 @@ impl Semaphore { let stub = self.stub(); if head == stub { - debug!(" + pop; head == stub"); - let next = match NonNull::new(next_ptr) { Some(next) => next, None => { @@ -429,7 +385,6 @@ impl Semaphore { loop { if curr.has_waiter(&self.stub) { // Inconsistent - debug!(" + pop; inconsistent 1"); thread::yield_now(); continue 'outer; } @@ -456,8 +411,6 @@ impl Semaphore { } }; - debug!(" + pop; got next waiter"); - self.head.with_mut(|head| *head = next); head = next; next_ptr = next.as_ref().next.load(Acquire); @@ -476,7 +429,6 @@ impl Semaphore { if tail != head { // Inconsistent - debug!(" + pop; inconsistent 2"); thread::yield_now(); continue 'outer; } @@ -492,7 +444,6 @@ impl Semaphore { } // Inconsistent state, loop - debug!(" + pop; inconsistent 3"); thread::yield_now(); } } @@ -549,16 +500,7 @@ impl Permit { /// Create a new `Permit`. /// /// The permit begins in the "unacquired" state. - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::semaphore::Permit; - /// - /// let permit = Permit::new(); - /// assert!(!permit.is_acquired()); - /// ``` - pub fn new() -> Permit { + pub(crate) fn new() -> Permit { Permit { waiter: None, state: PermitState::Idle, @@ -566,13 +508,13 @@ impl Permit { } /// Returns true if the permit has been acquired - pub fn is_acquired(&self) -> bool { + pub(crate) fn is_acquired(&self) -> bool { self.state == PermitState::Acquired } /// Try to acquire the permit. If no permits are available, the current task /// is notified once a new permit becomes available. - pub fn poll_acquire( + pub(crate) fn poll_acquire( &mut self, cx: &mut Context<'_>, semaphore: &Semaphore, @@ -607,7 +549,7 @@ impl Permit { } /// Try to acquire the permit. - pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { + pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { match self.state { PermitState::Idle => {} PermitState::Waiting => { @@ -635,7 +577,7 @@ impl Permit { } /// Release a permit back to the semaphore - pub fn release(&mut self, semaphore: &Semaphore) { + pub(crate) fn release(&mut self, semaphore: &Semaphore) { if self.forget2() { semaphore.add_permits(1); } @@ -648,7 +590,7 @@ impl Permit { /// /// Repeatedly calling `forget` without associated calls to `add_permit` /// will result in the semaphore losing all permits. - pub fn forget(&mut self) { + pub(crate) fn forget(&mut self) { self.forget2(); } @@ -711,7 +653,7 @@ impl TryAcquireError { } /// Returns true if the error was caused by a closed semaphore. - pub fn is_closed(&self) -> bool { + pub(crate) fn is_closed(&self) -> bool { match self.kind { ErrorKind::Closed => true, _ => false, @@ -720,7 +662,7 @@ impl TryAcquireError { /// Returns true if the error was caused by calling `try_acquire` on a /// semaphore with no available permits. - pub fn is_no_permits(&self) -> bool { + pub(crate) fn is_no_permits(&self) -> bool { match self.kind { ErrorKind::NoPermits => true, _ => false, @@ -857,14 +799,10 @@ impl WaiterNode { match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { Ok(_) => match curr { QueuedWaiting => { - debug!(" + notify -- task notified"); self.waker.wake(); return true; } - other => { - debug!(" + notify -- not notified; state = {:?}", other); - return false; - } + _ => return false, }, Err(actual) => curr = actual, } @@ -1021,7 +959,6 @@ impl SemState { /// Load the state from an AtomicUsize. fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState { let value = cell.load(ordering); - debug!(" + SemState::load; value = {}", value); SemState(value) } @@ -1044,13 +981,6 @@ impl SemState { let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure); - debug!( - " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}", - prev.to_usize(), - self.to_usize(), - res - ); - res.map(SemState).map_err(SemState) } |