summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-03 10:34:15 -0800
committerGitHub <noreply@github.com>2020-01-03 10:34:15 -0800
commitefcbf9613f2d5048550f9c828e3be422644f1391 (patch)
treef8c77a9d21c57f80ed989d16839acffe850baf77 /tokio/src/sync/tests
parent3736467dbb74ea6d14091cf1cac3ce08e1fcb911 (diff)
sync: add batch op support to internal semaphore (#2004)
Extend internal semaphore to support batch operations. With this PR, consumers of the semaphore are able to atomically request more than one permit. This is useful for implementing a RwLock.
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r--tokio/src/sync/tests/loom_semaphore_ll.rs62
-rw-r--r--tokio/src/sync/tests/semaphore_ll.rs394
2 files changed, 416 insertions, 40 deletions
diff --git a/tokio/src/sync/tests/loom_semaphore_ll.rs b/tokio/src/sync/tests/loom_semaphore_ll.rs
index cd4314ca..b5e5efba 100644
--- a/tokio/src/sync/tests/loom_semaphore_ll.rs
+++ b/tokio/src/sync/tests/loom_semaphore_ll.rs
@@ -31,7 +31,7 @@ fn basic_usage() {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = &mut *self;
- ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap();
+ ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();
let actual = me.shared.active.fetch_add(1, SeqCst);
assert!(actual <= NUM - 1);
@@ -39,7 +39,7 @@ fn basic_usage() {
let actual = me.shared.active.fetch_sub(1, SeqCst);
assert!(actual <= NUM);
- me.waiter.release(&me.shared.semaphore);
+ me.waiter.release(1, &me.shared.semaphore);
Ready(())
}
@@ -79,17 +79,17 @@ fn release() {
thread::spawn(move || {
let mut permit = Permit::new();
- block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
});
}
let mut permit = Permit::new();
- block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
});
}
@@ -108,10 +108,10 @@ fn basic_closing() {
for _ in 0..2 {
block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, &semaphore).map_err(|_| ())
+ permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
}))?;
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
}
Ok::<(), ()>(())
@@ -136,10 +136,10 @@ fn concurrent_close() {
let mut permit = Permit::new();
block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, &semaphore).map_err(|_| ())
+ permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
}))?;
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
semaphore.close();
@@ -148,3 +148,45 @@ fn concurrent_close() {
}
});
}
+
+#[test]
+fn batch() {
+ let mut b = loom::model::Builder::new();
+ b.preemption_bound = Some(1);
+
+ b.check(|| {
+ let semaphore = Arc::new(Semaphore::new(10));
+ let active = Arc::new(AtomicUsize::new(0));
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let semaphore = semaphore.clone();
+ let active = active.clone();
+
+ ths.push(thread::spawn(move || {
+ let mut permit = Permit::new();
+
+ for n in &[4, 10, 8] {
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();
+
+ active.fetch_add(*n as usize, SeqCst);
+
+ let num_active = active.load(SeqCst);
+ assert!(num_active <= 10);
+
+ thread::yield_now();
+
+ active.fetch_sub(*n as usize, SeqCst);
+
+ permit.release(*n, &semaphore);
+ }
+ }));
+ }
+
+ for th in ths.into_iter() {
+ th.join().unwrap();
+ }
+
+ assert_eq!(10, semaphore.available_permits());
+ });
+}
diff --git a/tokio/src/sync/tests/semaphore_ll.rs b/tokio/src/sync/tests/semaphore_ll.rs
index 8dd56c85..bfb07578 100644
--- a/tokio/src/sync/tests/semaphore_ll.rs
+++ b/tokio/src/sync/tests/semaphore_ll.rs
@@ -1,9 +1,8 @@
use crate::sync::semaphore_ll::{Permit, Semaphore};
-use tokio_test::task;
-use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};
+use tokio_test::*;
#[test]
-fn available_permits() {
+fn poll_acquire_one_available() {
let s = Semaphore::new(100);
assert_eq!(s.available_permits(), 100);
@@ -11,44 +10,234 @@ fn available_permits() {
let mut permit = task::spawn(Permit::new());
assert!(!permit.is_acquired());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
assert_eq!(s.available_permits(), 99);
assert!(permit.is_acquired());
// Polling again on the same waiter does not claim a new permit
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
assert_eq!(s.available_permits(), 99);
assert!(permit.is_acquired());
}
#[test]
-fn unavailable_permits() {
+fn poll_acquire_many_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ let mut permit = task::spawn(Permit::new());
+ assert!(!permit.is_acquired());
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
+ assert_eq!(s.available_permits(), 95);
+ assert!(permit.is_acquired());
+
+ // Polling again on the same waiter does not claim a new permit
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_eq!(s.available_permits(), 95);
+ assert!(permit.is_acquired());
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
+ assert_eq!(s.available_permits(), 95);
+ assert!(permit.is_acquired());
+
+ // Polling for a larger number of permits acquires more
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 8, &s)));
+ assert_eq!(s.available_permits(), 92);
+ assert!(permit.is_acquired());
+}
+
+#[test]
+fn try_acquire_one_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ let mut permit = Permit::new();
+ assert!(!permit.is_acquired());
+
+ assert_ok!(permit.try_acquire(1, &s));
+ assert_eq!(s.available_permits(), 99);
+ assert!(permit.is_acquired());
+
+ // Polling again on the same waiter does not claim a new permit
+ assert_ok!(permit.try_acquire(1, &s));
+ assert_eq!(s.available_permits(), 99);
+ assert!(permit.is_acquired());
+}
+
+#[test]
+fn try_acquire_many_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ let mut permit = Permit::new();
+ assert!(!permit.is_acquired());
+
+ assert_ok!(permit.try_acquire(5, &s));
+ assert_eq!(s.available_permits(), 95);
+ assert!(permit.is_acquired());
+
+ // Polling again on the same waiter does not claim a new permit
+ assert_ok!(permit.try_acquire(5, &s));
+ assert_eq!(s.available_permits(), 95);
+ assert!(permit.is_acquired());
+}
+
+#[test]
+fn poll_acquire_one_unavailable() {
let s = Semaphore::new(1);
let mut permit_1 = task::spawn(Permit::new());
let mut permit_2 = task::spawn(Permit::new());
// Acquire the first permit
- assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
assert_eq!(s.available_permits(), 0);
permit_2.enter(|cx, mut p| {
// Try to acquire the second permit
- assert_pending!(p.poll_acquire(cx, &s));
+ assert_pending!(p.poll_acquire(cx, 1, &s));
});
- permit_1.release(&s);
+ permit_1.release(1, &s);
assert_eq!(s.available_permits(), 0);
assert!(permit_2.is_woken());
- assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- permit_2.release(&s);
+ permit_2.release(1, &s);
assert_eq!(s.available_permits(), 1);
}
#[test]
-fn zero_permits() {
+fn forget_acquired() {
+ let s = Semaphore::new(1);
+
+ // Polling for a permit succeeds immediately
+ let mut permit = task::spawn(Permit::new());
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+
+ assert_eq!(s.available_permits(), 0);
+
+ permit.forget(1);
+ assert_eq!(s.available_permits(), 0);
+}
+
+#[test]
+fn forget_waiting() {
+ let s = Semaphore::new(0);
+
+ // Polling for a permit succeeds immediately
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+
+ assert_eq!(s.available_permits(), 0);
+
+ permit.forget(1);
+
+ s.add_permits(1);
+
+ assert!(!permit.is_woken());
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn poll_acquire_many_unavailable() {
+ let s = Semaphore::new(5);
+
+ let mut permit_1 = task::spawn(Permit::new());
+ let mut permit_2 = task::spawn(Permit::new());
+ let mut permit_3 = task::spawn(Permit::new());
+
+ // Acquire the first permit
+ assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_eq!(s.available_permits(), 4);
+
+ permit_2.enter(|cx, mut p| {
+ // Try to acquire the second permit
+ assert_pending!(p.poll_acquire(cx, 5, &s));
+ });
+
+ assert_eq!(s.available_permits(), 0);
+
+ permit_3.enter(|cx, mut p| {
+ // Try to acquire the third permit
+ assert_pending!(p.poll_acquire(cx, 3, &s));
+ });
+
+ permit_1.release(1, &s);
+
+ assert_eq!(s.available_permits(), 0);
+ assert!(permit_2.is_woken());
+ assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
+
+ assert!(!permit_3.is_woken());
+ assert_eq!(s.available_permits(), 0);
+
+ permit_2.release(1, &s);
+ assert!(!permit_3.is_woken());
+ assert_eq!(s.available_permits(), 0);
+
+ permit_2.release(2, &s);
+ assert!(permit_3.is_woken());
+
+ assert_ready_ok!(permit_3.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
+}
+
+#[test]
+fn try_acquire_one_unavailable() {
+ let s = Semaphore::new(1);
+
+ let mut permit_1 = Permit::new();
+ let mut permit_2 = Permit::new();
+
+ // Acquire the first permit
+ assert_ok!(permit_1.try_acquire(1, &s));
+ assert_eq!(s.available_permits(), 0);
+
+ assert_err!(permit_2.try_acquire(1, &s));
+
+ permit_1.release(1, &s);
+
+ assert_eq!(s.available_permits(), 1);
+ assert_ok!(permit_2.try_acquire(1, &s));
+
+ permit_2.release(1, &s);
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn try_acquire_many_unavailable() {
+ let s = Semaphore::new(5);
+
+ let mut permit_1 = Permit::new();
+ let mut permit_2 = Permit::new();
+
+ // Acquire the first permit
+ assert_ok!(permit_1.try_acquire(1, &s));
+ assert_eq!(s.available_permits(), 4);
+
+ assert_err!(permit_2.try_acquire(5, &s));
+
+ permit_1.release(1, &s);
+ assert_eq!(s.available_permits(), 5);
+
+ assert_ok!(permit_2.try_acquire(5, &s));
+
+ permit_2.release(1, &s);
+ assert_eq!(s.available_permits(), 1);
+
+ permit_2.release(1, &s);
+ assert_eq!(s.available_permits(), 2);
+}
+
+#[test]
+fn poll_acquire_one_zero_permits() {
let s = Semaphore::new(0);
assert_eq!(s.available_permits(), 0);
@@ -56,13 +245,13 @@ fn zero_permits() {
// Try to acquire the permit
permit.enter(|cx, mut p| {
- assert_pending!(p.poll_acquire(cx, &s));
+ assert_pending!(p.poll_acquire(cx, 1, &s));
});
s.add_permits(1);
assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
}
#[test]
@@ -74,15 +263,19 @@ fn validates_max_permits() {
#[test]
fn close_semaphore_prevents_acquire() {
- let s = Semaphore::new(1);
+ let s = Semaphore::new(5);
s.close();
- assert_eq!(1, s.available_permits());
+ assert_eq!(5, s.available_permits());
- let mut permit = task::spawn(Permit::new());
+ let mut permit_1 = task::spawn(Permit::new());
+ let mut permit_2 = task::spawn(Permit::new());
- assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
- assert_eq!(1, s.available_permits());
+ assert_ready_err!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_eq!(5, s.available_permits());
+
+ assert_ready_err!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+ assert_eq!(5, s.available_permits());
}
#[test]
@@ -90,12 +283,12 @@ fn close_semaphore_notifies_permit1() {
let s = Semaphore::new(0);
let mut permit = task::spawn(Permit::new());
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
s.close();
assert!(permit.is_woken());
- assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
}
#[test]
@@ -108,29 +301,170 @@ fn close_semaphore_notifies_permit2() {
let mut permit4 = task::spawn(Permit::new());
// Acquire a couple of permits
- assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
- assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s)));
- assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
s.close();
assert!(permit3.is_woken());
assert!(permit4.is_woken());
- assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s)));
- assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
assert_eq!(0, s.available_permits());
- permit1.release(&s);
+ permit1.release(1, &s);
assert_eq!(1, s.available_permits());
- assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
+ assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- permit2.release(&s);
+ permit2.release(1, &s);
assert_eq!(2, s.available_permits());
}
+
+#[test]
+fn poll_acquire_additional_permits_while_waiting_before_assigned() {
+ let s = Semaphore::new(1);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
+
+ s.add_permits(1);
+ assert!(!permit.is_woken());
+
+ s.add_permits(1);
+ assert!(permit.is_woken());
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
+}
+
+#[test]
+fn try_acquire_additional_permits_while_waiting_before_assigned() {
+ let s = Semaphore::new(1);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+
+ assert_err!(permit.enter(|_, mut p| p.try_acquire(3, &s)));
+
+ s.add_permits(1);
+ assert!(permit.is_woken());
+
+ assert_ok!(permit.enter(|_, mut p| p.try_acquire(2, &s)));
+}
+
+#[test]
+fn poll_acquire_additional_permits_while_waiting_after_assigned_success() {
+ let s = Semaphore::new(1);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+
+ s.add_permits(2);
+
+ assert!(permit.is_woken());
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
+}
+
+#[test]
+fn poll_acquire_additional_permits_while_waiting_after_assigned_requeue() {
+ let s = Semaphore::new(1);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+
+ s.add_permits(2);
+
+ assert!(permit.is_woken());
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s)));
+
+ s.add_permits(1);
+
+ assert!(permit.is_woken());
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s)));
+}
+
+#[test]
+fn poll_acquire_fewer_permits_while_waiting() {
+ let s = Semaphore::new(1);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+ assert_eq!(s.available_permits(), 0);
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+ assert_eq!(s.available_permits(), 0);
+}
+
+#[test]
+fn poll_acquire_fewer_permits_after_assigned() {
+ let s = Semaphore::new(1);
+
+ let mut permit1 = task::spawn(Permit::new());
+ let mut permit2 = task::spawn(Permit::new());
+
+ assert_pending!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
+ assert_eq!(s.available_permits(), 0);
+
+ assert_pending!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+
+ s.add_permits(4);
+ assert!(permit1.is_woken());
+ assert!(!permit2.is_woken());
+
+ assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
+
+ assert!(permit2.is_woken());
+ assert_eq!(s.available_permits(), 1);
+
+ assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+}
+
+#[test]
+fn forget_partial_1() {
+ let s = Semaphore::new(0);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+ s.add_permits(1);
+
+ assert_eq!(0, s.available_permits());
+
+ permit.release(1, &s);
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
+
+ assert_eq!(s.available_permits(), 0);
+}
+
+#[test]
+fn forget_partial_2() {
+ let s = Semaphore::new(0);
+
+ let mut permit = task::spawn(Permit::new());
+
+ assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+ s.add_permits(1);
+
+ assert_eq!(0, s.available_permits());
+
+ permit.release(1, &s);
+
+ s.add_permits(1);
+
+ assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
+ assert_eq!(s.available_permits(), 0);
+}