summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:38 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:38 -0700
commitcf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch)
tree39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/src/sync/tests
parent4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff)
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs14
-rw-r--r--tokio/src/sync/tests/loom_semaphore_ll.rs192
-rw-r--r--tokio/src/sync/tests/mod.rs2
-rw-r--r--tokio/src/sync/tests/semaphore_ll.rs470
4 files changed, 7 insertions, 671 deletions
diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs
index 6a1a6abe..e8db2dea 100644
--- a/tokio/src/sync/tests/loom_mpsc.rs
+++ b/tokio/src/sync/tests/loom_mpsc.rs
@@ -7,17 +7,17 @@ use loom::thread;
#[test]
fn closing_tx() {
loom::model(|| {
- let (mut tx, mut rx) = mpsc::channel(16);
+ let (tx, mut rx) = mpsc::channel(16);
thread::spawn(move || {
tx.try_send(()).unwrap();
drop(tx);
});
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_some());
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
@@ -32,10 +32,10 @@ fn closing_unbounded_tx() {
drop(tx);
});
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_some());
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
@@ -53,7 +53,7 @@ fn dropping_tx() {
}
drop(tx);
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
@@ -71,7 +71,7 @@ fn dropping_unbounded_tx() {
}
drop(tx);
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
diff --git a/tokio/src/sync/tests/loom_semaphore_ll.rs b/tokio/src/sync/tests/loom_semaphore_ll.rs
deleted file mode 100644
index b5e5efba..00000000
--- a/tokio/src/sync/tests/loom_semaphore_ll.rs
+++ /dev/null
@@ -1,192 +0,0 @@
-use crate::sync::semaphore_ll::*;
-
-use futures::future::poll_fn;
-use loom::future::block_on;
-use loom::thread;
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::Arc;
-use std::task::Poll::Ready;
-use std::task::{Context, Poll};
-
-#[test]
-fn basic_usage() {
- const NUM: usize = 2;
-
- struct Actor {
- waiter: Permit,
- shared: Arc<Shared>,
- }
-
- struct Shared {
- semaphore: Semaphore,
- active: AtomicUsize,
- }
-
- impl Future for Actor {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- let me = &mut *self;
-
- ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();
-
- let actual = me.shared.active.fetch_add(1, SeqCst);
- assert!(actual <= NUM - 1);
-
- let actual = me.shared.active.fetch_sub(1, SeqCst);
- assert!(actual <= NUM);
-
- me.waiter.release(1, &me.shared.semaphore);
-
- Ready(())
- }
- }
-
- loom::model(|| {
- let shared = Arc::new(Shared {
- semaphore: Semaphore::new(NUM),
- active: AtomicUsize::new(0),
- });
-
- for _ in 0..NUM {
- let shared = shared.clone();
-
- thread::spawn(move || {
- block_on(Actor {
- waiter: Permit::new(),
- shared,
- });
- });
- }
-
- block_on(Actor {
- waiter: Permit::new(),
- shared,
- });
- });
-}
-
-#[test]
-fn release() {
- loom::model(|| {
- let semaphore = Arc::new(Semaphore::new(1));
-
- {
- let semaphore = semaphore.clone();
- thread::spawn(move || {
- let mut permit = Permit::new();
-
- block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
-
- permit.release(1, &semaphore);
- });
- }
-
- let mut permit = Permit::new();
-
- block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
-
- permit.release(1, &semaphore);
- });
-}
-
-#[test]
-fn basic_closing() {
- const NUM: usize = 2;
-
- loom::model(|| {
- let semaphore = Arc::new(Semaphore::new(1));
-
- for _ in 0..NUM {
- let semaphore = semaphore.clone();
-
- thread::spawn(move || {
- let mut permit = Permit::new();
-
- for _ in 0..2 {
- block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
- }))?;
-
- permit.release(1, &semaphore);
- }
-
- Ok::<(), ()>(())
- });
- }
-
- semaphore.close();
- });
-}
-
-#[test]
-fn concurrent_close() {
- const NUM: usize = 3;
-
- loom::model(|| {
- let semaphore = Arc::new(Semaphore::new(1));
-
- for _ in 0..NUM {
- let semaphore = semaphore.clone();
-
- thread::spawn(move || {
- let mut permit = Permit::new();
-
- block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
- }))?;
-
- permit.release(1, &semaphore);
-
- semaphore.close();
-
- Ok::<(), ()>(())
- });
- }
- });
-}
-
-#[test]
-fn batch() {
- let mut b = loom::model::Builder::new();
- b.preemption_bound = Some(1);
-
- b.check(|| {
- let semaphore = Arc::new(Semaphore::new(10));
- let active = Arc::new(AtomicUsize::new(0));
- let mut ths = vec![];
-
- for _ in 0..2 {
- let semaphore = semaphore.clone();
- let active = active.clone();
-
- ths.push(thread::spawn(move || {
- let mut permit = Permit::new();
-
- for n in &[4, 10, 8] {
- block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();
-
- active.fetch_add(*n as usize, SeqCst);
-
- let num_active = active.load(SeqCst);
- assert!(num_active <= 10);
-
- thread::yield_now();
-
- active.fetch_sub(*n as usize, SeqCst);
-
- permit.release(*n, &semaphore);
- }
- }));
- }
-
- for th in ths.into_iter() {
- th.join().unwrap();
- }
-
- assert_eq!(10, semaphore.available_permits());
- });
-}
diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs
index c637cb6b..a78be6f3 100644
--- a/tokio/src/sync/tests/mod.rs
+++ b/tokio/src/sync/tests/mod.rs
@@ -1,6 +1,5 @@
cfg_not_loom! {
mod atomic_waker;
- mod semaphore_ll;
mod semaphore_batch;
}
@@ -12,6 +11,5 @@ cfg_loom! {
mod loom_notify;
mod loom_oneshot;
mod loom_semaphore_batch;
- mod loom_semaphore_ll;
mod loom_watch;
}
diff --git a/tokio/src/sync/tests/semaphore_ll.rs b/tokio/src/sync/tests/semaphore_ll.rs
deleted file mode 100644
index bfb07578..00000000
--- a/tokio/src/sync/tests/semaphore_ll.rs
+++ /dev/null
@@ -1,470 +0,0 @@
-use crate::sync::semaphore_ll::{Permit, Semaphore};
-use tokio_test::*;
-
-#[test]
-fn poll_acquire_one_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
- assert!(!permit.is_acquired());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn poll_acquire_many_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
- assert!(!permit.is_acquired());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- // Polling for a larger number of permits acquires more
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 8, &s)));
- assert_eq!(s.available_permits(), 92);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn try_acquire_one_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = Permit::new();
- assert!(!permit.is_acquired());
-
- assert_ok!(permit.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ok!(permit.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn try_acquire_many_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = Permit::new();
- assert!(!permit.is_acquired());
-
- assert_ok!(permit.try_acquire(5, &s));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ok!(permit.try_acquire(5, &s));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn poll_acquire_one_unavailable() {
- let s = Semaphore::new(1);
-
- let mut permit_1 = task::spawn(Permit::new());
- let mut permit_2 = task::spawn(Permit::new());
-
- // Acquire the first permit
- assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 0);
-
- permit_2.enter(|cx, mut p| {
- // Try to acquire the second permit
- assert_pending!(p.poll_acquire(cx, 1, &s));
- });
-
- permit_1.release(1, &s);
-
- assert_eq!(s.available_permits(), 0);
- assert!(permit_2.is_woken());
- assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 1);
-}
-
-#[test]
-fn forget_acquired() {
- let s = Semaphore::new(1);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(s.available_permits(), 0);
-
- permit.forget(1);
- assert_eq!(s.available_permits(), 0);
-}
-
-#[test]
-fn forget_waiting() {
- let s = Semaphore::new(0);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(s.available_permits(), 0);
-
- permit.forget(1);
-
- s.add_permits(1);
-
- assert!(!permit.is_woken());
- assert_eq!(s.available_permits(), 1);
-}
-
-#[test]
-fn poll_acquire_many_unavailable() {
- let s = Semaphore::new(5);
-
- let mut permit_1 = task::spawn(Permit::new());
- let mut permit_2 = task::spawn(Permit::new());
- let mut permit_3 = task::spawn(Permit::new());
-
- // Acquire the first permit
- assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 4);
-
- permit_2.enter(|cx, mut p| {
- // Try to acquire the second permit
- assert_pending!(p.poll_acquire(cx, 5, &s));
- });
-
- assert_eq!(s.available_permits(), 0);
-
- permit_3.enter(|cx, mut p| {
- // Try to acquire the third permit
- assert_pending!(p.poll_acquire(cx, 3, &s));
- });
-
- permit_1.release(1, &s);
-
- assert_eq!(s.available_permits(), 0);
- assert!(permit_2.is_woken());
- assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
-
- assert!(!permit_3.is_woken());
- assert_eq!(s.available_permits(), 0);
-
- permit_2.release(1, &s);
- assert!(!permit_3.is_woken());
- assert_eq!(s.available_permits(), 0);
-
- permit_2.release(2, &s);
- assert!(permit_3.is_woken());
-
- assert_ready_ok!(permit_3.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-}
-
-#[test]
-fn try_acquire_one_unavailable() {
- let s = Semaphore::new(1);
-
- let mut permit_1 = Permit::new();
- let mut permit_2 = Permit::new();
-
- // Acquire the first permit
- assert_ok!(permit_1.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 0);
-
- assert_err!(permit_2.try_acquire(1, &s));
-
- permit_1.release(1, &s);
-
- assert_eq!(s.available_permits(), 1);
- assert_ok!(permit_2.try_acquire(1, &s));
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 1);
-}
-
-#[test]
-fn try_acquire_many_unavailable() {
- let s = Semaphore::new(5);
-
- let mut permit_1 = Permit::new();
- let mut permit_2 = Permit::new();
-
- // Acquire the first permit
- assert_ok!(permit_1.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 4);
-
- assert_err!(permit_2.try_acquire(5, &s));
-
- permit_1.release(1, &s);
- assert_eq!(s.available_permits(), 5);
-
- assert_ok!(permit_2.try_acquire(5, &s));
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 1);
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 2);
-}
-
-#[test]
-fn poll_acquire_one_zero_permits() {
- let s = Semaphore::new(0);
- assert_eq!(s.available_permits(), 0);
-
- let mut permit = task::spawn(Permit::new());
-
- // Try to acquire the permit
- permit.enter(|cx, mut p| {
- assert_pending!(p.poll_acquire(cx, 1, &s));
- });
-
- s.add_permits(1);
-
- assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-}
-
-#[test]
-#[should_panic]
-fn validates_max_permits() {
- use std::usize;
- Semaphore::new((usize::MAX >> 2) + 1);
-}
-
-#[test]
-fn close_semaphore_prevents_acquire() {
- let s = Semaphore::new(5);
- s.close();
-
- assert_eq!(5, s.available_permits());
-
- let mut permit_1 = task::spawn(Permit::new());
- let mut permit_2 = task::spawn(Permit::new());
-
- assert_ready_err!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(5, s.available_permits());
-
- assert_ready_err!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_eq!(5, s.available_permits());
-}
-
-#[test]
-fn close_semaphore_notifies_permit1() {
- let s = Semaphore::new(0);
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- s.close();
-
- assert!(permit.is_woken());
- assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-}
-
-#[test]
-fn close_semaphore_notifies_permit2() {
- let s = Semaphore::new(2);
-
- let mut permit1 = task::spawn(Permit::new());
- let mut permit2 = task::spawn(Permit::new());
- let mut permit3 = task::spawn(Permit::new());
- let mut permit4 = task::spawn(Permit::new());
-
- // Acquire a couple of permits
- assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- s.close();
-
- assert!(permit3.is_woken());
- assert!(permit4.is_woken());
-
- assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(0, s.available_permits());
-
- permit1.release(1, &s);
-
- assert_eq!(1, s.available_permits());
-
- assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- permit2.release(1, &s);
-
- assert_eq!(2, s.available_permits());
-}
-
-#[test]
-fn poll_acquire_additional_permits_while_waiting_before_assigned() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-
- s.add_permits(1);
- assert!(!permit.is_woken());
-
- s.add_permits(1);
- assert!(permit.is_woken());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-}
-
-#[test]
-fn try_acquire_additional_permits_while_waiting_before_assigned() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
-
- assert_err!(permit.enter(|_, mut p| p.try_acquire(3, &s)));
-
- s.add_permits(1);
- assert!(permit.is_woken());
-
- assert_ok!(permit.enter(|_, mut p| p.try_acquire(2, &s)));
-}
-
-#[test]
-fn poll_acquire_additional_permits_while_waiting_after_assigned_success() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
-
- s.add_permits(2);
-
- assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-}
-
-#[test]
-fn poll_acquire_additional_permits_while_waiting_after_assigned_requeue() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
-
- s.add_permits(2);
-
- assert!(permit.is_woken());
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s)));
-
- s.add_permits(1);
-
- assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s)));
-}
-
-#[test]
-fn poll_acquire_fewer_permits_while_waiting() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_eq!(s.available_permits(), 0);
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 0);
-}
-
-#[test]
-fn poll_acquire_fewer_permits_after_assigned() {
- let s = Semaphore::new(1);
-
- let mut permit1 = task::spawn(Permit::new());
- let mut permit2 = task::spawn(Permit::new());
-
- assert_pending!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
- assert_eq!(s.available_permits(), 0);
-
- assert_pending!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- s.add_permits(4);
- assert!(permit1.is_woken());
- assert!(!permit2.is_woken());
-
- assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-
- assert!(permit2.is_woken());
- assert_eq!(s.available_permits(), 1);
-
- assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-}
-
-#[test]
-fn forget_partial_1() {
- let s = Semaphore::new(0);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- s.add_permits(1);
-
- assert_eq!(0, s.available_permits());
-
- permit.release(1, &s);
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(s.available_permits(), 0);
-}
-
-#[test]
-fn forget_partial_2() {
- let s = Semaphore::new(0);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- s.add_permits(1);
-
- assert_eq!(0, s.available_permits());
-
- permit.release(1, &s);
-
- s.add_permits(1);
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_eq!(s.available_permits(), 0);
-}