diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-18 07:00:55 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-18 07:00:55 -0800 |
commit | 0d38936b35779b604770120da2e98560bbb6241f (patch) | |
tree | 843d46e999becdb580cb02655b4290acadd64474 /tokio/src/sync | |
parent | 13b6e9939e062dc01bcf34abe3d75de4b66e20e1 (diff) |
chore: refine feature flags (#1785)
Removes dependencies between Tokio feature flags. For example, `process`
should not depend on `sync` simply because it uses the `mpsc` channel.
Instead, feature flags represent **public** APIs that become available
with the feature enabled. When the feature is not enabled, the
functionality is removed. If another Tokio component requires the
functionality, it is stays as `pub(crate)`.
The threaded scheduler is now exposed under `rt-threaded`. This feature
flag only enables the threaded scheduler and does not include I/O,
networking, or time. Those features must be explictly enabled.
A `full` feature flag is added that enables all features.
`stdin`, `stdout`, `stderr` are exposed under `io-std`.
Macros are used to scope code by feature flag.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/mod.rs | 55 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 11 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/list.rs | 7 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/oneshot.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/semaphore.rs | 108 | ||||
-rw-r--r-- | tokio/src/sync/task/atomic_waker.rs | 8 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_list.rs | 4 | ||||
-rw-r--r-- | tokio/src/sync/tests/mod.rs | 29 | ||||
-rw-r--r-- | tokio/src/sync/tests/semaphore.rs | 136 |
11 files changed, 203 insertions, 165 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index cf003816..40566c0c 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -13,43 +13,40 @@ //! - [watch](watch/index.html), a single-producer, multi-consumer channel that //! only stores the **most recently** sent value. -macro_rules! debug { - ($($t:tt)*) => { - if false { - println!($($t)*); - } - } -} +cfg_sync! { + mod barrier; + pub use barrier::{Barrier, BarrierWaitResult}; -macro_rules! if_loom { - ($($t:tt)*) => {{ - #[cfg(loom)] - const LOOM: bool = true; - #[cfg(not(loom))] - const LOOM: bool = false; - - if LOOM { - $($t)* - } - }} -} + pub mod mpsc; -mod barrier; -pub use barrier::{Barrier, BarrierWaitResult}; + mod mutex; + pub use mutex::{Mutex, MutexGuard}; -pub mod mpsc; + pub mod oneshot; -mod mutex; -pub use mutex::{Mutex, MutexGuard}; + pub(crate) mod semaphore; -pub mod oneshot; + mod task; + pub(crate) use task::AtomicWaker; -pub mod semaphore; + pub mod watch; +} -mod task; -pub(crate) use task::AtomicWaker; +cfg_not_sync! { + cfg_atomic_waker! { + mod task; + pub(crate) use task::AtomicWaker; + } -pub mod watch; + cfg_rt_threaded! { + pub(crate) mod oneshot; + } + + cfg_signal! { + pub(crate) mod mpsc; + pub(crate) mod semaphore; + } +} /// Unit tests #[cfg(test)] diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 523dde75..d635e138 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -161,12 +161,13 @@ impl<T> Receiver<T> { impl<T> Unpin for Receiver<T> {} -#[cfg(feature = "stream")] -impl<T> futures_core::Stream for Receiver<T> { - type Item = T; +cfg_stream! { + impl<T> futures_core::Stream for Receiver<T> { + type Item = T; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.poll_recv(cx) + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.poll_recv(cx) + } } } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 03f35339..4030e380 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -299,12 +299,6 @@ where // second time here. try_recv!(); - debug!( - "recv; rx_closed = {:?}; is_idle = {:?}", - rx_fields.rx_closed, - self.inner.semaphore.is_idle() - ); - if rx_fields.rx_closed && self.inner.semaphore.is_idle() { Ready(None) } else { diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index eecc4da3..dc956403 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -169,7 +169,6 @@ impl<T> Tx<T> { } pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) { - debug!("+ reclaim_block({:p})", block); // The block has been removed from the linked list and ownership // is reclaimed. // @@ -206,7 +205,6 @@ impl<T> Tx<T> { } if !reused { - debug!(" + block freed {:p}", block); let _ = Box::from_raw(block.as_ptr()); } } @@ -226,7 +224,6 @@ impl<T> Rx<T> { pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> { // Advance `head`, if needed if !self.try_advancing_head() { - debug!("+ !self.try_advancing_head() -> false"); return None; } @@ -276,8 +273,6 @@ impl<T> Rx<T> { } fn reclaim_blocks(&mut self, tx: &Tx<T>) { - debug!("+ reclaim_blocks()"); - while self.free_head != self.head { unsafe { // Get a handle to the block that will be freed and update @@ -316,7 +311,6 @@ impl<T> Rx<T> { /// Effectively `Drop` all the blocks. Should only be called once, when /// the list is dropping. pub(super) unsafe fn free_blocks(&mut self) { - debug!("+ free_blocks()"); debug_assert_ne!(self.free_head, NonNull::dangling()); let mut cur = Some(self.free_head); @@ -331,7 +325,6 @@ impl<T> Rx<T> { while let Some(block) = cur { cur = block.as_ref().load_next(Relaxed); - debug!(" + free: block = {:p}", block); drop(Box::from_raw(block.as_ptr())); } } diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index 7927dde6..60ae60cd 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + //! A multi-producer, single-consumer queue for sending values across //! asynchronous tasks. //! diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 7b84f319..ed3801c8 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + //! A channel for sending a single message between asynchronous tasks. use crate::loom::cell::CausalCell; diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index b4a093f8..dab73a09 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + //! Thread-safe, asynchronous counting semaphore. //! //! A `Semaphore` instance holds a set of permits. Permits are used to @@ -24,7 +26,7 @@ use std::task::{Context, Poll}; use std::usize; /// Futures-aware semaphore. -pub struct Semaphore { +pub(crate) struct Semaphore { /// Tracks both the waiter queue tail pointer and the number of remaining /// permits. state: AtomicUsize, @@ -51,18 +53,18 @@ pub struct Semaphore { /// is the user's responsibility to ensure that `Permit::release` is called /// before dropping the permit. #[derive(Debug)] -pub struct Permit { +pub(crate) struct Permit { waiter: Option<Arc<WaiterNode>>, state: PermitState, } /// Error returned by `Permit::poll_acquire`. #[derive(Debug)] -pub struct AcquireError(()); +pub(crate) struct AcquireError(()); /// Error returned by `Permit::try_acquire`. #[derive(Debug)] -pub struct TryAcquireError { +pub(crate) struct TryAcquireError { kind: ErrorKind, } @@ -150,7 +152,7 @@ impl Semaphore { /// # Panics /// /// Panics if `permits` is zero. - pub fn new(permits: usize) -> Semaphore { + pub(crate) fn new(permits: usize) -> Semaphore { let stub = Box::new(WaiterNode::new()); let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); @@ -168,7 +170,7 @@ impl Semaphore { } /// Returns the current number of available permits - pub fn available_permits(&self) -> usize { + pub(crate) fn available_permits(&self) -> usize { let curr = SemState::load(&self.state, Acquire); curr.available_permits() } @@ -181,8 +183,6 @@ impl Semaphore { // Load the current state let mut curr = SemState::load(&self.state, Acquire); - debug!(" + poll_permit; sem-state = {:?}", curr); - // Tracks a *mut WaiterNode representing an Arc clone. // // This avoids having to bump the ref count unless required. @@ -210,8 +210,6 @@ impl Semaphore { } if !next.acquire_permit(&self.stub) { - debug!(" + poll_permit -- no permits"); - debug_assert!(curr.waiter().is_some()); if maybe_strong.is_none() { @@ -223,10 +221,7 @@ impl Semaphore { waiter.register(cx); - debug!(" + poll_permit -- to_queued_waiting"); - if !waiter.to_queued_waiting() { - debug!(" + poll_permit; waiter already queued"); // The node is alrady queued, there is no further work // to do. return Pending; @@ -243,14 +238,11 @@ impl Semaphore { next.set_waiter(maybe_strong.unwrap()); } - debug!(" + poll_permit -- pre-CAS; next = {:?}", next); - debug_assert_ne!(curr.0, 0); debug_assert_ne!(next.0, 0); match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { Ok(_) => { - debug!(" + poll_permit -- CAS ok"); match curr.waiter() { Some(prev_waiter) => { let waiter = maybe_strong.unwrap(); @@ -260,13 +252,9 @@ impl Semaphore { prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); } - debug!(" + poll_permit -- waiter pushed"); - return Pending; } None => { - debug!(" + poll_permit -- permit acquired"); - undo_strong!(); return Ready(Ok(())); @@ -282,15 +270,11 @@ impl Semaphore { /// Close the semaphore. This prevents the semaphore from issuing new /// permits and notifies all pending waiters. - pub fn close(&self) { - debug!("+ Semaphore::close"); - + pub(crate) fn close(&self) { // Acquire the `rx_lock`, setting the "closed" flag on the lock. let prev = self.rx_lock.fetch_or(1, AcqRel); - debug!(" + close -- rx_lock.fetch_add(1)"); if prev != 0 { - debug!("+ close -- locked; prev = {}", prev); // Another thread has the lock and will be responsible for notifying // pending waiters. return; @@ -300,9 +284,7 @@ impl Semaphore { } /// Add `n` new permits to the semaphore. - pub fn add_permits(&self, n: usize) { - debug!(" + add_permits; n = {}", n); - + pub(crate) fn add_permits(&self, n: usize) { if n == 0 { return; } @@ -310,10 +292,8 @@ impl Semaphore { // TODO: Handle overflow. A panic is not sufficient, the process must // abort. let prev = self.rx_lock.fetch_add(n << 1, AcqRel); - debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n); if prev != 0 { - debug!(" + add_permits -- locked; prev = {}", prev); // Another thread has the lock and will be responsible for notifying // pending waiters. return; @@ -324,11 +304,6 @@ impl Semaphore { fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { while rem > 0 || closed { - debug!( - " + add_permits_locked -- iter; rem = {}; closed = {:?}", - rem, closed - ); - if closed { SemState::fetch_set_closed(&self.state, AcqRel); } @@ -340,28 +315,16 @@ impl Semaphore { let actual = if closed { let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); - debug!( - " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}", - n, actual - ); - closed = false; actual } else { let actual = self.rx_lock.fetch_sub(n, AcqRel); - debug!( - " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}", - n, actual - ); - closed = actual & 1 == 1; actual }; rem = (actual >> 1) - rem; } - - debug!(" + add_permits; done"); } /// Release a specific amount of permits to the semaphore @@ -377,11 +340,8 @@ impl Semaphore { } }; - debug!(" + release_n -- notify"); - if waiter.notify(closed) { n = n.saturating_sub(1); - debug!(" + release_n -- dec"); } } } @@ -392,8 +352,6 @@ impl Semaphore { /// there are no more waiters to pop, `rem` is used to set the available /// permits. fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> { - debug!(" + pop; rem = {}", rem); - 'outer: loop { unsafe { let mut head = self.head.with(|head| *head); @@ -402,8 +360,6 @@ impl Semaphore { let stub = self.stub(); if head == stub { - debug!(" + pop; head == stub"); - let next = match NonNull::new(next_ptr) { Some(next) => next, None => { @@ -429,7 +385,6 @@ impl Semaphore { loop { if curr.has_waiter(&self.stub) { // Inconsistent - debug!(" + pop; inconsistent 1"); thread::yield_now(); continue 'outer; } @@ -456,8 +411,6 @@ impl Semaphore { } }; - debug!(" + pop; got next waiter"); - self.head.with_mut(|head| *head = next); head = next; next_ptr = next.as_ref().next.load(Acquire); @@ -476,7 +429,6 @@ impl Semaphore { if tail != head { // Inconsistent - debug!(" + pop; inconsistent 2"); thread::yield_now(); continue 'outer; } @@ -492,7 +444,6 @@ impl Semaphore { } // Inconsistent state, loop - debug!(" + pop; inconsistent 3"); thread::yield_now(); } } @@ -549,16 +500,7 @@ impl Permit { /// Create a new `Permit`. /// /// The permit begins in the "unacquired" state. - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::semaphore::Permit; - /// - /// let permit = Permit::new(); - /// assert!(!permit.is_acquired()); - /// ``` - pub fn new() -> Permit { + pub(crate) fn new() -> Permit { Permit { waiter: None, state: PermitState::Idle, @@ -566,13 +508,13 @@ impl Permit { } /// Returns true if the permit has been acquired - pub fn is_acquired(&self) -> bool { + pub(crate) fn is_acquired(&self) -> bool { self.state == PermitState::Acquired } /// Try to acquire the permit. If no permits are available, the current task /// is notified once a new permit becomes available. - pub fn poll_acquire( + pub(crate) fn poll_acquire( &mut self, cx: &mut Context<'_>, semaphore: &Semaphore, @@ -607,7 +549,7 @@ impl Permit { } /// Try to acquire the permit. - pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { + pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { match self.state { PermitState::Idle => {} PermitState::Waiting => { @@ -635,7 +577,7 @@ impl Permit { } /// Release a permit back to the semaphore - pub fn release(&mut self, semaphore: &Semaphore) { + pub(crate) fn release(&mut self, semaphore: &Semaphore) { if self.forget2() { semaphore.add_permits(1); } @@ -648,7 +590,7 @@ impl Permit { /// /// Repeatedly calling `forget` without associated calls to `add_permit` /// will result in the semaphore losing all permits. - pub fn forget(&mut self) { + pub(crate) fn forget(&mut self) { self.forget2(); } @@ -711,7 +653,7 @@ impl TryAcquireError { } /// Returns true if the error was caused by a closed semaphore. - pub fn is_closed(&self) -> bool { + pub(crate) fn is_closed(&self) -> bool { match self.kind { ErrorKind::Closed => true, _ => false, @@ -720,7 +662,7 @@ impl TryAcquireError { /// Returns true if the error was caused by calling `try_acquire` on a /// semaphore with no available permits. - pub fn is_no_permits(&self) -> bool { + pub(crate) fn is_no_permits(&self) -> bool { match self.kind { ErrorKind::NoPermits => true, _ => false, @@ -857,14 +799,10 @@ impl WaiterNode { match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { Ok(_) => match curr { QueuedWaiting => { - debug!(" + notify -- task notified"); self.waker.wake(); return true; } - other => { - debug!(" + notify -- not notified; state = {:?}", other); - return false; - } + _ => return false, }, Err(actual) => curr = actual, } @@ -1021,7 +959,6 @@ impl SemState { /// Load the state from an AtomicUsize. fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState { let value = cell.load(ordering); - debug!(" + SemState::load; value = {}", value); SemState(value) } @@ -1044,13 +981,6 @@ impl SemState { let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure); - debug!( - " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}", - prev.to_usize(), - self.to_usize(), - res - ); - res.map(SemState).map_err(SemState) } diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs index eaad17c1..49a0ac04 100644 --- a/tokio/src/sync/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -170,10 +170,8 @@ impl AtomicWaker { where W: WakerRef, { - debug!(" + register_task"); match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { WAITING => { - debug!(" + WAITING"); unsafe { // Locked acquired, update the waker cell self.waker.with_mut(|t| *t = Some(waker.into_waker())); @@ -214,7 +212,6 @@ impl AtomicWaker { } } WAKING => { - debug!(" + WAKING"); // Currently in the process of waking the task, i.e., // `wake` is currently being called on the old waker. // So, we call wake on the new waker. @@ -240,7 +237,6 @@ impl AtomicWaker { /// /// If `register` has not been called yet, then this does nothing. pub(crate) fn wake(&self) { - debug!(" + wake"); if let Some(waker) = self.take_waker() { waker.wake(); } @@ -249,24 +245,20 @@ impl AtomicWaker { /// Attempts to take the `Waker` value out of the `AtomicWaker` with the /// intention that the caller will wake the task later. pub(crate) fn take_waker(&self) -> Option<Waker> { - debug!(" + take_waker"); // AcqRel ordering is used in order to acquire the value of the `waker` // cell as well as to establish a `release` ordering with whatever // memory the `AtomicWaker` is associated with. match self.state.fetch_or(WAKING, AcqRel) { WAITING => { - debug!(" + WAITING"); // The waking lock has been acquired. let waker = unsafe { self.waker.with_mut(|t| (*t).take()) }; // Release the lock self.state.fetch_and(!WAKING, Release); - debug!(" + Done taking"); waker } state => { - debug!(" + state = {:?}", state); // There is a concurrent thread currently updating the // associated waker. // diff --git a/tokio/src/sync/tests/loom_list.rs b/tokio/src/sync/tests/loom_list.rs index 4f7746d5..4067f865 100644 --- a/tokio/src/sync/tests/loom_list.rs +++ b/tokio/src/sync/tests/loom_list.rs @@ -21,17 +21,14 @@ fn smoke() { for i in 0..NUM_MSG { tx.push((th, i)); } - debug!(" + tx thread done"); }); } let mut next = vec![0; NUM_TX]; loop { - debug!(" + rx.pop()"); match rx.pop(&tx) { Some(Value((th, v))) => { - debug!(" + pop() -> Some(Value({}))", v); assert_eq!(v, next[th]); next[th] += 1; @@ -43,7 +40,6 @@ fn smoke() { panic!(); } None => { - debug!(" + pop() -> None"); thread::yield_now(); } } diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index 8da739f9..06d18e9a 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -1,17 +1,12 @@ -#[cfg(not(loom))] -mod atomic_waker; - -#[cfg(loom)] -mod loom_atomic_waker; - -#[cfg(loom)] -mod loom_list; - -#[cfg(loom)] -mod loom_mpsc; - -#[cfg(loom)] -mod loom_oneshot; - -#[cfg(loom)] -mod loom_semaphore; +cfg_not_loom! { + mod atomic_waker; + mod semaphore; +} + +cfg_loom! { + mod loom_atomic_waker; + mod loom_list; + mod loom_mpsc; + mod loom_oneshot; + mod loom_semaphore; +} diff --git a/tokio/src/sync/tests/semaphore.rs b/tokio/src/sync/tests/semaphore.rs new file mode 100644 index 00000000..86dd7da5 --- /dev/null +++ b/tokio/src/sync/tests/semaphore.rs @@ -0,0 +1,136 @@ +use crate::sync::semaphore::{Permit, Semaphore}; +use tokio_test::task; +use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; + +#[test] +fn available_permits() { + let s = Semaphore::new(100); + assert_eq!(s.available_permits(), 100); + + // Polling for a permit succeeds immediately + let mut permit = task::spawn(Permit::new()); + assert!(!permit.is_acquired()); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_eq!(s.available_permits(), 99); + assert!(permit.is_acquired()); + + // Polling again on the same waiter does not claim a new permit + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_eq!(s.available_permits(), 99); + assert!(permit.is_acquired()); +} + +#[test] +fn unavailable_permits() { + let s = Semaphore::new(1); + + let mut permit_1 = task::spawn(Permit::new()); + let mut permit_2 = task::spawn(Permit::new()); + + // Acquire the first permit + assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_eq!(s.available_permits(), 0); + + permit_2.enter(|cx, mut p| { + // Try to acquire the second permit + assert_pending!(p.poll_acquire(cx, &s)); + }); + + permit_1.release(&s); + + assert_eq!(s.available_permits(), 0); + assert!(permit_2.is_woken()); + assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, &s))); + + permit_2.release(&s); + assert_eq!(s.available_permits(), 1); +} + +#[test] +fn zero_permits() { + let s = Semaphore::new(0); + assert_eq!(s.available_permits(), 0); + + let mut permit = task::spawn(Permit::new()); + + // Try to acquire the permit + permit.enter(|cx, mut p| { + assert_pending!(p.poll_acquire(cx, &s)); + }); + + s.add_permits(1); + + assert!(permit.is_woken()); + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); +} + +#[test] +#[should_panic] +fn validates_max_permits() { + use std::usize; + Semaphore::new((usize::MAX >> 2) + 1); +} + +#[test] +fn close_semaphore_prevents_acquire() { + let s = Semaphore::new(1); + s.close(); + + assert_eq!(1, s.available_permits()); + + let mut permit = task::spawn(Permit::new()); + + assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_eq!(1, s.available_permits()); +} + +#[test] +fn close_semaphore_notifies_permit1() { + let s = Semaphore::new(0); + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + + s.close(); + + assert!(permit.is_woken()); + assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); +} + +#[test] +fn close_semaphore_notifies_permit2() { + let s = Semaphore::new(2); + + let mut permit1 = task::spawn(Permit::new()); + let mut permit2 = task::spawn(Permit::new()); + let mut permit3 = task::spawn(Permit::new()); + let mut permit4 = task::spawn(Permit::new()); + + // Acquire a couple of permits + assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, &s))); + + assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s))); + + s.close(); + + assert!(permit3.is_woken()); + assert!(permit4.is_woken()); + + assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s))); + + assert_eq!(0, s.available_permits()); + + permit1.release(&s); + + assert_eq!(1, s.available_permits()); + + assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s))); + + permit2.release(&s); + + assert_eq!(2, s.available_permits()); +} |