diff options
Diffstat (limited to 'tokio/src/sync/tests/loom_semaphore_ll.rs')
-rw-r--r-- | tokio/src/sync/tests/loom_semaphore_ll.rs | 62 |
1 files changed, 52 insertions, 10 deletions
diff --git a/tokio/src/sync/tests/loom_semaphore_ll.rs b/tokio/src/sync/tests/loom_semaphore_ll.rs index cd4314ca..b5e5efba 100644 --- a/tokio/src/sync/tests/loom_semaphore_ll.rs +++ b/tokio/src/sync/tests/loom_semaphore_ll.rs @@ -31,7 +31,7 @@ fn basic_usage() { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let me = &mut *self; - ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap(); + ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap(); let actual = me.shared.active.fetch_add(1, SeqCst); assert!(actual <= NUM - 1); @@ -39,7 +39,7 @@ fn basic_usage() { let actual = me.shared.active.fetch_sub(1, SeqCst); assert!(actual <= NUM); - me.waiter.release(&me.shared.semaphore); + me.waiter.release(1, &me.shared.semaphore); Ready(()) } @@ -79,17 +79,17 @@ fn release() { thread::spawn(move || { let mut permit = Permit::new(); - block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); + block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap(); - permit.release(&semaphore); + permit.release(1, &semaphore); }); } let mut permit = Permit::new(); - block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); + block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap(); - permit.release(&semaphore); + permit.release(1, &semaphore); }); } @@ -108,10 +108,10 @@ fn basic_closing() { for _ in 0..2 { block_on(poll_fn(|cx| { - permit.poll_acquire(cx, &semaphore).map_err(|_| ()) + permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ()) }))?; - permit.release(&semaphore); + permit.release(1, &semaphore); } Ok::<(), ()>(()) @@ -136,10 +136,10 @@ fn concurrent_close() { let mut permit = Permit::new(); block_on(poll_fn(|cx| { - permit.poll_acquire(cx, &semaphore).map_err(|_| ()) + permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ()) }))?; - permit.release(&semaphore); + permit.release(1, &semaphore); semaphore.close(); @@ -148,3 +148,45 @@ fn concurrent_close() { } }); } + +#[test] +fn batch() { + let mut b = loom::model::Builder::new(); + b.preemption_bound = Some(1); + + b.check(|| { + let semaphore = Arc::new(Semaphore::new(10)); + let active = Arc::new(AtomicUsize::new(0)); + let mut ths = vec![]; + + for _ in 0..2 { + let semaphore = semaphore.clone(); + let active = active.clone(); + + ths.push(thread::spawn(move || { + let mut permit = Permit::new(); + + for n in &[4, 10, 8] { + block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap(); + + active.fetch_add(*n as usize, SeqCst); + + let num_active = active.load(SeqCst); + assert!(num_active <= 10); + + thread::yield_now(); + + active.fetch_sub(*n as usize, SeqCst); + + permit.release(*n, &semaphore); + } + })); + } + + for th in ths.into_iter() { + th.join().unwrap(); + } + + assert_eq!(10, semaphore.available_permits()); + }); +} |