summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests/loom_semaphore_ll.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/tests/loom_semaphore_ll.rs')
-rw-r--r--tokio/src/sync/tests/loom_semaphore_ll.rs62
1 files changed, 52 insertions, 10 deletions
diff --git a/tokio/src/sync/tests/loom_semaphore_ll.rs b/tokio/src/sync/tests/loom_semaphore_ll.rs
index cd4314ca..b5e5efba 100644
--- a/tokio/src/sync/tests/loom_semaphore_ll.rs
+++ b/tokio/src/sync/tests/loom_semaphore_ll.rs
@@ -31,7 +31,7 @@ fn basic_usage() {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = &mut *self;
- ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap();
+ ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();
let actual = me.shared.active.fetch_add(1, SeqCst);
assert!(actual <= NUM - 1);
@@ -39,7 +39,7 @@ fn basic_usage() {
let actual = me.shared.active.fetch_sub(1, SeqCst);
assert!(actual <= NUM);
- me.waiter.release(&me.shared.semaphore);
+ me.waiter.release(1, &me.shared.semaphore);
Ready(())
}
@@ -79,17 +79,17 @@ fn release() {
thread::spawn(move || {
let mut permit = Permit::new();
- block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
});
}
let mut permit = Permit::new();
- block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
});
}
@@ -108,10 +108,10 @@ fn basic_closing() {
for _ in 0..2 {
block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, &semaphore).map_err(|_| ())
+ permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
}))?;
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
}
Ok::<(), ()>(())
@@ -136,10 +136,10 @@ fn concurrent_close() {
let mut permit = Permit::new();
block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, &semaphore).map_err(|_| ())
+ permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
}))?;
- permit.release(&semaphore);
+ permit.release(1, &semaphore);
semaphore.close();
@@ -148,3 +148,45 @@ fn concurrent_close() {
}
});
}
+
+#[test]
+fn batch() {
+ let mut b = loom::model::Builder::new();
+ b.preemption_bound = Some(1);
+
+ b.check(|| {
+ let semaphore = Arc::new(Semaphore::new(10));
+ let active = Arc::new(AtomicUsize::new(0));
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let semaphore = semaphore.clone();
+ let active = active.clone();
+
+ ths.push(thread::spawn(move || {
+ let mut permit = Permit::new();
+
+ for n in &[4, 10, 8] {
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();
+
+ active.fetch_add(*n as usize, SeqCst);
+
+ let num_active = active.load(SeqCst);
+ assert!(num_active <= 10);
+
+ thread::yield_now();
+
+ active.fetch_sub(*n as usize, SeqCst);
+
+ permit.release(*n, &semaphore);
+ }
+ }));
+ }
+
+ for th in ths.into_iter() {
+ th.join().unwrap();
+ }
+
+ assert_eq!(10, semaphore.available_permits());
+ });
+}