summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/semaphore.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-18 07:00:55 -0800
committerGitHub <noreply@github.com>2019-11-18 07:00:55 -0800
commit0d38936b35779b604770120da2e98560bbb6241f (patch)
tree843d46e999becdb580cb02655b4290acadd64474 /tokio/src/sync/semaphore.rs
parent13b6e9939e062dc01bcf34abe3d75de4b66e20e1 (diff)
chore: refine feature flags (#1785)
Removes dependencies between Tokio feature flags. For example, `process` should not depend on `sync` simply because it uses the `mpsc` channel. Instead, feature flags represent **public** APIs that become available with the feature enabled. When the feature is not enabled, the functionality is removed. If another Tokio component requires the functionality, it is stays as `pub(crate)`. The threaded scheduler is now exposed under `rt-threaded`. This feature flag only enables the threaded scheduler and does not include I/O, networking, or time. Those features must be explictly enabled. A `full` feature flag is added that enables all features. `stdin`, `stdout`, `stderr` are exposed under `io-std`. Macros are used to scope code by feature flag.
Diffstat (limited to 'tokio/src/sync/semaphore.rs')
-rw-r--r--tokio/src/sync/semaphore.rs108
1 files changed, 19 insertions, 89 deletions
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
index b4a093f8..dab73a09 100644
--- a/tokio/src/sync/semaphore.rs
+++ b/tokio/src/sync/semaphore.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
//! Thread-safe, asynchronous counting semaphore.
//!
//! A `Semaphore` instance holds a set of permits. Permits are used to
@@ -24,7 +26,7 @@ use std::task::{Context, Poll};
use std::usize;
/// Futures-aware semaphore.
-pub struct Semaphore {
+pub(crate) struct Semaphore {
/// Tracks both the waiter queue tail pointer and the number of remaining
/// permits.
state: AtomicUsize,
@@ -51,18 +53,18 @@ pub struct Semaphore {
/// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit.
#[derive(Debug)]
-pub struct Permit {
+pub(crate) struct Permit {
waiter: Option<Arc<WaiterNode>>,
state: PermitState,
}
/// Error returned by `Permit::poll_acquire`.
#[derive(Debug)]
-pub struct AcquireError(());
+pub(crate) struct AcquireError(());
/// Error returned by `Permit::try_acquire`.
#[derive(Debug)]
-pub struct TryAcquireError {
+pub(crate) struct TryAcquireError {
kind: ErrorKind,
}
@@ -150,7 +152,7 @@ impl Semaphore {
/// # Panics
///
/// Panics if `permits` is zero.
- pub fn new(permits: usize) -> Semaphore {
+ pub(crate) fn new(permits: usize) -> Semaphore {
let stub = Box::new(WaiterNode::new());
let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
@@ -168,7 +170,7 @@ impl Semaphore {
}
/// Returns the current number of available permits
- pub fn available_permits(&self) -> usize {
+ pub(crate) fn available_permits(&self) -> usize {
let curr = SemState::load(&self.state, Acquire);
curr.available_permits()
}
@@ -181,8 +183,6 @@ impl Semaphore {
// Load the current state
let mut curr = SemState::load(&self.state, Acquire);
- debug!(" + poll_permit; sem-state = {:?}", curr);
-
// Tracks a *mut WaiterNode representing an Arc clone.
//
// This avoids having to bump the ref count unless required.
@@ -210,8 +210,6 @@ impl Semaphore {
}
if !next.acquire_permit(&self.stub) {
- debug!(" + poll_permit -- no permits");
-
debug_assert!(curr.waiter().is_some());
if maybe_strong.is_none() {
@@ -223,10 +221,7 @@ impl Semaphore {
waiter.register(cx);
- debug!(" + poll_permit -- to_queued_waiting");
-
if !waiter.to_queued_waiting() {
- debug!(" + poll_permit; waiter already queued");
// The node is alrady queued, there is no further work
// to do.
return Pending;
@@ -243,14 +238,11 @@ impl Semaphore {
next.set_waiter(maybe_strong.unwrap());
}
- debug!(" + poll_permit -- pre-CAS; next = {:?}", next);
-
debug_assert_ne!(curr.0, 0);
debug_assert_ne!(next.0, 0);
match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
Ok(_) => {
- debug!(" + poll_permit -- CAS ok");
match curr.waiter() {
Some(prev_waiter) => {
let waiter = maybe_strong.unwrap();
@@ -260,13 +252,9 @@ impl Semaphore {
prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
}
- debug!(" + poll_permit -- waiter pushed");
-
return Pending;
}
None => {
- debug!(" + poll_permit -- permit acquired");
-
undo_strong!();
return Ready(Ok(()));
@@ -282,15 +270,11 @@ impl Semaphore {
/// Close the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
- pub fn close(&self) {
- debug!("+ Semaphore::close");
-
+ pub(crate) fn close(&self) {
// Acquire the `rx_lock`, setting the "closed" flag on the lock.
let prev = self.rx_lock.fetch_or(1, AcqRel);
- debug!(" + close -- rx_lock.fetch_add(1)");
if prev != 0 {
- debug!("+ close -- locked; prev = {}", prev);
// Another thread has the lock and will be responsible for notifying
// pending waiters.
return;
@@ -300,9 +284,7 @@ impl Semaphore {
}
/// Add `n` new permits to the semaphore.
- pub fn add_permits(&self, n: usize) {
- debug!(" + add_permits; n = {}", n);
-
+ pub(crate) fn add_permits(&self, n: usize) {
if n == 0 {
return;
}
@@ -310,10 +292,8 @@ impl Semaphore {
// TODO: Handle overflow. A panic is not sufficient, the process must
// abort.
let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
- debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n);
if prev != 0 {
- debug!(" + add_permits -- locked; prev = {}", prev);
// Another thread has the lock and will be responsible for notifying
// pending waiters.
return;
@@ -324,11 +304,6 @@ impl Semaphore {
fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
while rem > 0 || closed {
- debug!(
- " + add_permits_locked -- iter; rem = {}; closed = {:?}",
- rem, closed
- );
-
if closed {
SemState::fetch_set_closed(&self.state, AcqRel);
}
@@ -340,28 +315,16 @@ impl Semaphore {
let actual = if closed {
let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
- debug!(
- " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}",
- n, actual
- );
-
closed = false;
actual
} else {
let actual = self.rx_lock.fetch_sub(n, AcqRel);
- debug!(
- " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}",
- n, actual
- );
-
closed = actual & 1 == 1;
actual
};
rem = (actual >> 1) - rem;
}
-
- debug!(" + add_permits; done");
}
/// Release a specific amount of permits to the semaphore
@@ -377,11 +340,8 @@ impl Semaphore {
}
};
- debug!(" + release_n -- notify");
-
if waiter.notify(closed) {
n = n.saturating_sub(1);
- debug!(" + release_n -- dec");
}
}
}
@@ -392,8 +352,6 @@ impl Semaphore {
/// there are no more waiters to pop, `rem` is used to set the available
/// permits.
fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
- debug!(" + pop; rem = {}", rem);
-
'outer: loop {
unsafe {
let mut head = self.head.with(|head| *head);
@@ -402,8 +360,6 @@ impl Semaphore {
let stub = self.stub();
if head == stub {
- debug!(" + pop; head == stub");
-
let next = match NonNull::new(next_ptr) {
Some(next) => next,
None => {
@@ -429,7 +385,6 @@ impl Semaphore {
loop {
if curr.has_waiter(&self.stub) {
// Inconsistent
- debug!(" + pop; inconsistent 1");
thread::yield_now();
continue 'outer;
}
@@ -456,8 +411,6 @@ impl Semaphore {
}
};
- debug!(" + pop; got next waiter");
-
self.head.with_mut(|head| *head = next);
head = next;
next_ptr = next.as_ref().next.load(Acquire);
@@ -476,7 +429,6 @@ impl Semaphore {
if tail != head {
// Inconsistent
- debug!(" + pop; inconsistent 2");
thread::yield_now();
continue 'outer;
}
@@ -492,7 +444,6 @@ impl Semaphore {
}
// Inconsistent state, loop
- debug!(" + pop; inconsistent 3");
thread::yield_now();
}
}
@@ -549,16 +500,7 @@ impl Permit {
/// Create a new `Permit`.
///
/// The permit begins in the "unacquired" state.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::sync::semaphore::Permit;
- ///
- /// let permit = Permit::new();
- /// assert!(!permit.is_acquired());
- /// ```
- pub fn new() -> Permit {
+ pub(crate) fn new() -> Permit {
Permit {
waiter: None,
state: PermitState::Idle,
@@ -566,13 +508,13 @@ impl Permit {
}
/// Returns true if the permit has been acquired
- pub fn is_acquired(&self) -> bool {
+ pub(crate) fn is_acquired(&self) -> bool {
self.state == PermitState::Acquired
}
/// Try to acquire the permit. If no permits are available, the current task
/// is notified once a new permit becomes available.
- pub fn poll_acquire(
+ pub(crate) fn poll_acquire(
&mut self,
cx: &mut Context<'_>,
semaphore: &Semaphore,
@@ -607,7 +549,7 @@ impl Permit {
}
/// Try to acquire the permit.
- pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
+ pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
match self.state {
PermitState::Idle => {}
PermitState::Waiting => {
@@ -635,7 +577,7 @@ impl Permit {
}
/// Release a permit back to the semaphore
- pub fn release(&mut self, semaphore: &Semaphore) {
+ pub(crate) fn release(&mut self, semaphore: &Semaphore) {
if self.forget2() {
semaphore.add_permits(1);
}
@@ -648,7 +590,7 @@ impl Permit {
///
/// Repeatedly calling `forget` without associated calls to `add_permit`
/// will result in the semaphore losing all permits.
- pub fn forget(&mut self) {
+ pub(crate) fn forget(&mut self) {
self.forget2();
}
@@ -711,7 +653,7 @@ impl TryAcquireError {
}
/// Returns true if the error was caused by a closed semaphore.
- pub fn is_closed(&self) -> bool {
+ pub(crate) fn is_closed(&self) -> bool {
match self.kind {
ErrorKind::Closed => true,
_ => false,
@@ -720,7 +662,7 @@ impl TryAcquireError {
/// Returns true if the error was caused by calling `try_acquire` on a
/// semaphore with no available permits.
- pub fn is_no_permits(&self) -> bool {
+ pub(crate) fn is_no_permits(&self) -> bool {
match self.kind {
ErrorKind::NoPermits => true,
_ => false,
@@ -857,14 +799,10 @@ impl WaiterNode {
match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
Ok(_) => match curr {
QueuedWaiting => {
- debug!(" + notify -- task notified");
self.waker.wake();
return true;
}
- other => {
- debug!(" + notify -- not notified; state = {:?}", other);
- return false;
- }
+ _ => return false,
},
Err(actual) => curr = actual,
}
@@ -1021,7 +959,6 @@ impl SemState {
/// Load the state from an AtomicUsize.
fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
let value = cell.load(ordering);
- debug!(" + SemState::load; value = {}", value);
SemState(value)
}
@@ -1044,13 +981,6 @@ impl SemState {
let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);
- debug!(
- " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}",
- prev.to_usize(),
- self.to_usize(),
- res
- );
-
res.map(SemState).map_err(SemState)
}