summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-18 07:00:55 -0800
committerGitHub <noreply@github.com>2019-11-18 07:00:55 -0800
commit0d38936b35779b604770120da2e98560bbb6241f (patch)
tree843d46e999becdb580cb02655b4290acadd64474 /tokio/src/sync
parent13b6e9939e062dc01bcf34abe3d75de4b66e20e1 (diff)
chore: refine feature flags (#1785)
Removes dependencies between Tokio feature flags. For example, `process` should not depend on `sync` simply because it uses the `mpsc` channel. Instead, feature flags represent **public** APIs that become available with the feature enabled. When the feature is not enabled, the functionality is removed. If another Tokio component requires the functionality, it is stays as `pub(crate)`. The threaded scheduler is now exposed under `rt-threaded`. This feature flag only enables the threaded scheduler and does not include I/O, networking, or time. Those features must be explictly enabled. A `full` feature flag is added that enables all features. `stdin`, `stdout`, `stderr` are exposed under `io-std`. Macros are used to scope code by feature flag.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mod.rs55
-rw-r--r--tokio/src/sync/mpsc/bounded.rs11
-rw-r--r--tokio/src/sync/mpsc/chan.rs6
-rw-r--r--tokio/src/sync/mpsc/list.rs7
-rw-r--r--tokio/src/sync/mpsc/mod.rs2
-rw-r--r--tokio/src/sync/oneshot.rs2
-rw-r--r--tokio/src/sync/semaphore.rs108
-rw-r--r--tokio/src/sync/task/atomic_waker.rs8
-rw-r--r--tokio/src/sync/tests/loom_list.rs4
-rw-r--r--tokio/src/sync/tests/mod.rs29
-rw-r--r--tokio/src/sync/tests/semaphore.rs136
11 files changed, 203 insertions, 165 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index cf003816..40566c0c 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -13,43 +13,40 @@
//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
//! only stores the **most recently** sent value.
-macro_rules! debug {
- ($($t:tt)*) => {
- if false {
- println!($($t)*);
- }
- }
-}
+cfg_sync! {
+ mod barrier;
+ pub use barrier::{Barrier, BarrierWaitResult};
-macro_rules! if_loom {
- ($($t:tt)*) => {{
- #[cfg(loom)]
- const LOOM: bool = true;
- #[cfg(not(loom))]
- const LOOM: bool = false;
-
- if LOOM {
- $($t)*
- }
- }}
-}
+ pub mod mpsc;
-mod barrier;
-pub use barrier::{Barrier, BarrierWaitResult};
+ mod mutex;
+ pub use mutex::{Mutex, MutexGuard};
-pub mod mpsc;
+ pub mod oneshot;
-mod mutex;
-pub use mutex::{Mutex, MutexGuard};
+ pub(crate) mod semaphore;
-pub mod oneshot;
+ mod task;
+ pub(crate) use task::AtomicWaker;
-pub mod semaphore;
+ pub mod watch;
+}
-mod task;
-pub(crate) use task::AtomicWaker;
+cfg_not_sync! {
+ cfg_atomic_waker! {
+ mod task;
+ pub(crate) use task::AtomicWaker;
+ }
-pub mod watch;
+ cfg_rt_threaded! {
+ pub(crate) mod oneshot;
+ }
+
+ cfg_signal! {
+ pub(crate) mod mpsc;
+ pub(crate) mod semaphore;
+ }
+}
/// Unit tests
#[cfg(test)]
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 523dde75..d635e138 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -161,12 +161,13 @@ impl<T> Receiver<T> {
impl<T> Unpin for Receiver<T> {}
-#[cfg(feature = "stream")]
-impl<T> futures_core::Stream for Receiver<T> {
- type Item = T;
+cfg_stream! {
+ impl<T> futures_core::Stream for Receiver<T> {
+ type Item = T;
- fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.poll_recv(cx)
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.poll_recv(cx)
+ }
}
}
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 03f35339..4030e380 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -299,12 +299,6 @@ where
// second time here.
try_recv!();
- debug!(
- "recv; rx_closed = {:?}; is_idle = {:?}",
- rx_fields.rx_closed,
- self.inner.semaphore.is_idle()
- );
-
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ready(None)
} else {
diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs
index eecc4da3..dc956403 100644
--- a/tokio/src/sync/mpsc/list.rs
+++ b/tokio/src/sync/mpsc/list.rs
@@ -169,7 +169,6 @@ impl<T> Tx<T> {
}
pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
- debug!("+ reclaim_block({:p})", block);
// The block has been removed from the linked list and ownership
// is reclaimed.
//
@@ -206,7 +205,6 @@ impl<T> Tx<T> {
}
if !reused {
- debug!(" + block freed {:p}", block);
let _ = Box::from_raw(block.as_ptr());
}
}
@@ -226,7 +224,6 @@ impl<T> Rx<T> {
pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
// Advance `head`, if needed
if !self.try_advancing_head() {
- debug!("+ !self.try_advancing_head() -> false");
return None;
}
@@ -276,8 +273,6 @@ impl<T> Rx<T> {
}
fn reclaim_blocks(&mut self, tx: &Tx<T>) {
- debug!("+ reclaim_blocks()");
-
while self.free_head != self.head {
unsafe {
// Get a handle to the block that will be freed and update
@@ -316,7 +311,6 @@ impl<T> Rx<T> {
/// Effectively `Drop` all the blocks. Should only be called once, when
/// the list is dropping.
pub(super) unsafe fn free_blocks(&mut self) {
- debug!("+ free_blocks()");
debug_assert_ne!(self.free_head, NonNull::dangling());
let mut cur = Some(self.free_head);
@@ -331,7 +325,6 @@ impl<T> Rx<T> {
while let Some(block) = cur {
cur = block.as_ref().load_next(Relaxed);
- debug!(" + free: block = {:p}", block);
drop(Box::from_raw(block.as_ptr()));
}
}
diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs
index 7927dde6..60ae60cd 100644
--- a/tokio/src/sync/mpsc/mod.rs
+++ b/tokio/src/sync/mpsc/mod.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
//! A multi-producer, single-consumer queue for sending values across
//! asynchronous tasks.
//!
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs
index 7b84f319..ed3801c8 100644
--- a/tokio/src/sync/oneshot.rs
+++ b/tokio/src/sync/oneshot.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
//! A channel for sending a single message between asynchronous tasks.
use crate::loom::cell::CausalCell;
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
index b4a093f8..dab73a09 100644
--- a/tokio/src/sync/semaphore.rs
+++ b/tokio/src/sync/semaphore.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
//! Thread-safe, asynchronous counting semaphore.
//!
//! A `Semaphore` instance holds a set of permits. Permits are used to
@@ -24,7 +26,7 @@ use std::task::{Context, Poll};
use std::usize;
/// Futures-aware semaphore.
-pub struct Semaphore {
+pub(crate) struct Semaphore {
/// Tracks both the waiter queue tail pointer and the number of remaining
/// permits.
state: AtomicUsize,
@@ -51,18 +53,18 @@ pub struct Semaphore {
/// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit.
#[derive(Debug)]
-pub struct Permit {
+pub(crate) struct Permit {
waiter: Option<Arc<WaiterNode>>,
state: PermitState,
}
/// Error returned by `Permit::poll_acquire`.
#[derive(Debug)]
-pub struct AcquireError(());
+pub(crate) struct AcquireError(());
/// Error returned by `Permit::try_acquire`.
#[derive(Debug)]
-pub struct TryAcquireError {
+pub(crate) struct TryAcquireError {
kind: ErrorKind,
}
@@ -150,7 +152,7 @@ impl Semaphore {
/// # Panics
///
/// Panics if `permits` is zero.
- pub fn new(permits: usize) -> Semaphore {
+ pub(crate) fn new(permits: usize) -> Semaphore {
let stub = Box::new(WaiterNode::new());
let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
@@ -168,7 +170,7 @@ impl Semaphore {
}
/// Returns the current number of available permits
- pub fn available_permits(&self) -> usize {
+ pub(crate) fn available_permits(&self) -> usize {
let curr = SemState::load(&self.state, Acquire);
curr.available_permits()
}
@@ -181,8 +183,6 @@ impl Semaphore {
// Load the current state
let mut curr = SemState::load(&self.state, Acquire);
- debug!(" + poll_permit; sem-state = {:?}", curr);
-
// Tracks a *mut WaiterNode representing an Arc clone.
//
// This avoids having to bump the ref count unless required.
@@ -210,8 +210,6 @@ impl Semaphore {
}
if !next.acquire_permit(&self.stub) {
- debug!(" + poll_permit -- no permits");
-
debug_assert!(curr.waiter().is_some());
if maybe_strong.is_none() {
@@ -223,10 +221,7 @@ impl Semaphore {
waiter.register(cx);
- debug!(" + poll_permit -- to_queued_waiting");
-
if !waiter.to_queued_waiting() {
- debug!(" + poll_permit; waiter already queued");
// The node is alrady queued, there is no further work
// to do.
return Pending;
@@ -243,14 +238,11 @@ impl Semaphore {
next.set_waiter(maybe_strong.unwrap());
}
- debug!(" + poll_permit -- pre-CAS; next = {:?}", next);
-
debug_assert_ne!(curr.0, 0);
debug_assert_ne!(next.0, 0);
match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
Ok(_) => {
- debug!(" + poll_permit -- CAS ok");
match curr.waiter() {
Some(prev_waiter) => {
let waiter = maybe_strong.unwrap();
@@ -260,13 +252,9 @@ impl Semaphore {
prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
}
- debug!(" + poll_permit -- waiter pushed");
-
return Pending;
}
None => {
- debug!(" + poll_permit -- permit acquired");
-
undo_strong!();
return Ready(Ok(()));
@@ -282,15 +270,11 @@ impl Semaphore {
/// Close the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
- pub fn close(&self) {
- debug!("+ Semaphore::close");
-
+ pub(crate) fn close(&self) {
// Acquire the `rx_lock`, setting the "closed" flag on the lock.
let prev = self.rx_lock.fetch_or(1, AcqRel);
- debug!(" + close -- rx_lock.fetch_add(1)");
if prev != 0 {
- debug!("+ close -- locked; prev = {}", prev);
// Another thread has the lock and will be responsible for notifying
// pending waiters.
return;
@@ -300,9 +284,7 @@ impl Semaphore {
}
/// Add `n` new permits to the semaphore.
- pub fn add_permits(&self, n: usize) {
- debug!(" + add_permits; n = {}", n);
-
+ pub(crate) fn add_permits(&self, n: usize) {
if n == 0 {
return;
}
@@ -310,10 +292,8 @@ impl Semaphore {
// TODO: Handle overflow. A panic is not sufficient, the process must
// abort.
let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
- debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n);
if prev != 0 {
- debug!(" + add_permits -- locked; prev = {}", prev);
// Another thread has the lock and will be responsible for notifying
// pending waiters.
return;
@@ -324,11 +304,6 @@ impl Semaphore {
fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
while rem > 0 || closed {
- debug!(
- " + add_permits_locked -- iter; rem = {}; closed = {:?}",
- rem, closed
- );
-
if closed {
SemState::fetch_set_closed(&self.state, AcqRel);
}
@@ -340,28 +315,16 @@ impl Semaphore {
let actual = if closed {
let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
- debug!(
- " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}",
- n, actual
- );
-
closed = false;
actual
} else {
let actual = self.rx_lock.fetch_sub(n, AcqRel);
- debug!(
- " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}",
- n, actual
- );
-
closed = actual & 1 == 1;
actual
};
rem = (actual >> 1) - rem;
}
-
- debug!(" + add_permits; done");
}
/// Release a specific amount of permits to the semaphore
@@ -377,11 +340,8 @@ impl Semaphore {
}
};
- debug!(" + release_n -- notify");
-
if waiter.notify(closed) {
n = n.saturating_sub(1);
- debug!(" + release_n -- dec");
}
}
}
@@ -392,8 +352,6 @@ impl Semaphore {
/// there are no more waiters to pop, `rem` is used to set the available
/// permits.
fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
- debug!(" + pop; rem = {}", rem);
-
'outer: loop {
unsafe {
let mut head = self.head.with(|head| *head);
@@ -402,8 +360,6 @@ impl Semaphore {
let stub = self.stub();
if head == stub {
- debug!(" + pop; head == stub");
-
let next = match NonNull::new(next_ptr) {
Some(next) => next,
None => {
@@ -429,7 +385,6 @@ impl Semaphore {
loop {
if curr.has_waiter(&self.stub) {
// Inconsistent
- debug!(" + pop; inconsistent 1");
thread::yield_now();
continue 'outer;
}
@@ -456,8 +411,6 @@ impl Semaphore {
}
};
- debug!(" + pop; got next waiter");
-
self.head.with_mut(|head| *head = next);
head = next;
next_ptr = next.as_ref().next.load(Acquire);
@@ -476,7 +429,6 @@ impl Semaphore {
if tail != head {
// Inconsistent
- debug!(" + pop; inconsistent 2");
thread::yield_now();
continue 'outer;
}
@@ -492,7 +444,6 @@ impl Semaphore {
}
// Inconsistent state, loop
- debug!(" + pop; inconsistent 3");
thread::yield_now();
}
}
@@ -549,16 +500,7 @@ impl Permit {
/// Create a new `Permit`.
///
/// The permit begins in the "unacquired" state.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::sync::semaphore::Permit;
- ///
- /// let permit = Permit::new();
- /// assert!(!permit.is_acquired());
- /// ```
- pub fn new() -> Permit {
+ pub(crate) fn new() -> Permit {
Permit {
waiter: None,
state: PermitState::Idle,
@@ -566,13 +508,13 @@ impl Permit {
}
/// Returns true if the permit has been acquired
- pub fn is_acquired(&self) -> bool {
+ pub(crate) fn is_acquired(&self) -> bool {
self.state == PermitState::Acquired
}
/// Try to acquire the permit. If no permits are available, the current task
/// is notified once a new permit becomes available.
- pub fn poll_acquire(
+ pub(crate) fn poll_acquire(
&mut self,
cx: &mut Context<'_>,
semaphore: &Semaphore,
@@ -607,7 +549,7 @@ impl Permit {
}
/// Try to acquire the permit.
- pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
+ pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
match self.state {
PermitState::Idle => {}
PermitState::Waiting => {
@@ -635,7 +577,7 @@ impl Permit {
}
/// Release a permit back to the semaphore
- pub fn release(&mut self, semaphore: &Semaphore) {
+ pub(crate) fn release(&mut self, semaphore: &Semaphore) {
if self.forget2() {
semaphore.add_permits(1);
}
@@ -648,7 +590,7 @@ impl Permit {
///
/// Repeatedly calling `forget` without associated calls to `add_permit`
/// will result in the semaphore losing all permits.
- pub fn forget(&mut self) {
+ pub(crate) fn forget(&mut self) {
self.forget2();
}
@@ -711,7 +653,7 @@ impl TryAcquireError {
}
/// Returns true if the error was caused by a closed semaphore.
- pub fn is_closed(&self) -> bool {
+ pub(crate) fn is_closed(&self) -> bool {
match self.kind {
ErrorKind::Closed => true,
_ => false,
@@ -720,7 +662,7 @@ impl TryAcquireError {
/// Returns true if the error was caused by calling `try_acquire` on a
/// semaphore with no available permits.
- pub fn is_no_permits(&self) -> bool {
+ pub(crate) fn is_no_permits(&self) -> bool {
match self.kind {
ErrorKind::NoPermits => true,
_ => false,
@@ -857,14 +799,10 @@ impl WaiterNode {
match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
Ok(_) => match curr {
QueuedWaiting => {
- debug!(" + notify -- task notified");
self.waker.wake();
return true;
}
- other => {
- debug!(" + notify -- not notified; state = {:?}", other);
- return false;
- }
+ _ => return false,
},
Err(actual) => curr = actual,
}
@@ -1021,7 +959,6 @@ impl SemState {
/// Load the state from an AtomicUsize.
fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
let value = cell.load(ordering);
- debug!(" + SemState::load; value = {}", value);
SemState(value)
}
@@ -1044,13 +981,6 @@ impl SemState {
let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);
- debug!(
- " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}",
- prev.to_usize(),
- self.to_usize(),
- res
- );
-
res.map(SemState).map_err(SemState)
}
diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs
index eaad17c1..49a0ac04 100644
--- a/tokio/src/sync/task/atomic_waker.rs
+++ b/tokio/src/sync/task/atomic_waker.rs
@@ -170,10 +170,8 @@ impl AtomicWaker {
where
W: WakerRef,
{
- debug!(" + register_task");
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
WAITING => {
- debug!(" + WAITING");
unsafe {
// Locked acquired, update the waker cell
self.waker.with_mut(|t| *t = Some(waker.into_waker()));
@@ -214,7 +212,6 @@ impl AtomicWaker {
}
}
WAKING => {
- debug!(" + WAKING");
// Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old waker.
// So, we call wake on the new waker.
@@ -240,7 +237,6 @@ impl AtomicWaker {
///
/// If `register` has not been called yet, then this does nothing.
pub(crate) fn wake(&self) {
- debug!(" + wake");
if let Some(waker) = self.take_waker() {
waker.wake();
}
@@ -249,24 +245,20 @@ impl AtomicWaker {
/// Attempts to take the `Waker` value out of the `AtomicWaker` with the
/// intention that the caller will wake the task later.
pub(crate) fn take_waker(&self) -> Option<Waker> {
- debug!(" + take_waker");
// AcqRel ordering is used in order to acquire the value of the `waker`
// cell as well as to establish a `release` ordering with whatever
// memory the `AtomicWaker` is associated with.
match self.state.fetch_or(WAKING, AcqRel) {
WAITING => {
- debug!(" + WAITING");
// The waking lock has been acquired.
let waker = unsafe { self.waker.with_mut(|t| (*t).take()) };
// Release the lock
self.state.fetch_and(!WAKING, Release);
- debug!(" + Done taking");
waker
}
state => {
- debug!(" + state = {:?}", state);
// There is a concurrent thread currently updating the
// associated waker.
//
diff --git a/tokio/src/sync/tests/loom_list.rs b/tokio/src/sync/tests/loom_list.rs
index 4f7746d5..4067f865 100644
--- a/tokio/src/sync/tests/loom_list.rs
+++ b/tokio/src/sync/tests/loom_list.rs
@@ -21,17 +21,14 @@ fn smoke() {
for i in 0..NUM_MSG {
tx.push((th, i));
}
- debug!(" + tx thread done");
});
}
let mut next = vec![0; NUM_TX];
loop {
- debug!(" + rx.pop()");
match rx.pop(&tx) {
Some(Value((th, v))) => {
- debug!(" + pop() -> Some(Value({}))", v);
assert_eq!(v, next[th]);
next[th] += 1;
@@ -43,7 +40,6 @@ fn smoke() {
panic!();
}
None => {
- debug!(" + pop() -> None");
thread::yield_now();
}
}
diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs
index 8da739f9..06d18e9a 100644
--- a/tokio/src/sync/tests/mod.rs
+++ b/tokio/src/sync/tests/mod.rs
@@ -1,17 +1,12 @@
-#[cfg(not(loom))]
-mod atomic_waker;
-
-#[cfg(loom)]
-mod loom_atomic_waker;
-
-#[cfg(loom)]
-mod loom_list;
-
-#[cfg(loom)]
-mod loom_mpsc;
-
-#[cfg(loom)]
-mod loom_oneshot;
-
-#[cfg(loom)]
-mod loom_semaphore;
+cfg_not_loom! {
+ mod atomic_waker;
+ mod semaphore;
+}
+
+cfg_loom! {
+ mod loom_atomic_waker;
+ mod loom_list;
+ mod loom_mpsc;
+ mod loom_oneshot;
+ mod loom_semaphore;
+}
diff --git a/tokio/src/sync/tests/semaphore.rs b/tokio/src/sync/tests/semaphore.rs
new file mode 100644
index 00000000..86dd7da5
--- /dev/null
+++ b/tokio/src/sync/tests/semaphore.rs
@@ -0,0 +1,136 @@
+use crate::sync::semaphore::{Permit, Semaphore};
+use tokio_test::task;
+use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};
+
+#[test]
+fn available_permits() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ let mut permit = task::spawn(Permit::new());
+ assert!(!permit.is_acquired());
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_eq!(s.available_permits(), 99);
+ assert!(permit.is_acquired());
+
+ // Polling again on the same waiter does not claim a new permit
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_eq!(s.available_permits(), 99);
+ assert!(permit.is_acquired());
+}
+
+#[test]
+fn unavailable_permits() {
+ let s = Semaphore::new(1);
+
+ let mut permit_1 = task::spawn(Permit::new());
+ let mut permit_2 = task::spawn(Permit::new());
+
+ // Acquire the first permit
+ assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_eq!(s.available_permits(), 0);
+
+ permit_2.enter(|cx, mut p| {
+ // Try to acquire the second permit
+ assert_pending!(p.poll_acquire(cx, &s));
+ });
+
+ permit_1.release(&s);
+
+ assert_eq!(s.available_permits(), 0);
+ assert!(permit_2.is_woken());
+ assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+
+ permit_2.release(&s);
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn zero_permits() {
+ let s = Semaphore::new(0);
+ assert_eq!(s.available_permits(), 0);
+
+ let mut permit = task::spawn(Permit::new());
+
+ // Try to acquire the permit
+ permit.enter(|cx, mut p| {
+ assert_pending!(p.poll_acquire(cx, &s));
+ });
+
+ s.add_permits(1);
+
+ assert!(permit.is_woken());
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+}
+
+#[test]
+#[should_panic]
+fn validates_max_permits() {
+ use std::usize;
+ Semaphore::new((usize::MAX >> 2) + 1);
+}
+
+#[test]
+fn close_semaphore_prevents_acquire() {
+ let s = Semaphore::new(1);
+ s.close();
+
+ assert_eq!(1, s.available_permits());
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_eq!(1, s.available_permits());
+}
+
+#[test]
+fn close_semaphore_notifies_permit1() {
+ let s = Semaphore::new(0);
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+
+ s.close();
+
+ assert!(permit.is_woken());
+ assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+}
+
+#[test]
+fn close_semaphore_notifies_permit2() {
+ let s = Semaphore::new(2);
+
+ let mut permit1 = task::spawn(Permit::new());
+ let mut permit2 = task::spawn(Permit::new());
+ let mut permit3 = task::spawn(Permit::new());
+ let mut permit4 = task::spawn(Permit::new());
+
+ // Acquire a couple of permits
+ assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+
+ assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+
+ s.close();
+
+ assert!(permit3.is_woken());
+ assert!(permit4.is_woken());
+
+ assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+
+ assert_eq!(0, s.available_permits());
+
+ permit1.release(&s);
+
+ assert_eq!(1, s.available_permits());
+
+ assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+
+ permit2.release(&s);
+
+ assert_eq!(2, s.available_permits());
+}