From efcbf9613f2d5048550f9c828e3be422644f1391 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 3 Jan 2020 10:34:15 -0800 Subject: sync: add batch op support to internal semaphore (#2004) Extend internal semaphore to support batch operations. With this PR, consumers of the semaphore are able to atomically request more than one permit. This is useful for implementing a RwLock. --- tokio/src/sync/tests/loom_semaphore_ll.rs | 62 ++++- tokio/src/sync/tests/semaphore_ll.rs | 394 +++++++++++++++++++++++++++--- 2 files changed, 416 insertions(+), 40 deletions(-) (limited to 'tokio/src/sync/tests') diff --git a/tokio/src/sync/tests/loom_semaphore_ll.rs b/tokio/src/sync/tests/loom_semaphore_ll.rs index cd4314ca..b5e5efba 100644 --- a/tokio/src/sync/tests/loom_semaphore_ll.rs +++ b/tokio/src/sync/tests/loom_semaphore_ll.rs @@ -31,7 +31,7 @@ fn basic_usage() { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let me = &mut *self; - ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap(); + ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap(); let actual = me.shared.active.fetch_add(1, SeqCst); assert!(actual <= NUM - 1); @@ -39,7 +39,7 @@ fn basic_usage() { let actual = me.shared.active.fetch_sub(1, SeqCst); assert!(actual <= NUM); - me.waiter.release(&me.shared.semaphore); + me.waiter.release(1, &me.shared.semaphore); Ready(()) } @@ -79,17 +79,17 @@ fn release() { thread::spawn(move || { let mut permit = Permit::new(); - block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); + block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap(); - permit.release(&semaphore); + permit.release(1, &semaphore); }); } let mut permit = Permit::new(); - block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); + block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap(); - permit.release(&semaphore); + permit.release(1, &semaphore); }); } @@ -108,10 +108,10 @@ fn basic_closing() { for _ in 0..2 { block_on(poll_fn(|cx| { - permit.poll_acquire(cx, &semaphore).map_err(|_| ()) + permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ()) }))?; - permit.release(&semaphore); + permit.release(1, &semaphore); } Ok::<(), ()>(()) @@ -136,10 +136,10 @@ fn concurrent_close() { let mut permit = Permit::new(); block_on(poll_fn(|cx| { - permit.poll_acquire(cx, &semaphore).map_err(|_| ()) + permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ()) }))?; - permit.release(&semaphore); + permit.release(1, &semaphore); semaphore.close(); @@ -148,3 +148,45 @@ fn concurrent_close() { } }); } + +#[test] +fn batch() { + let mut b = loom::model::Builder::new(); + b.preemption_bound = Some(1); + + b.check(|| { + let semaphore = Arc::new(Semaphore::new(10)); + let active = Arc::new(AtomicUsize::new(0)); + let mut ths = vec![]; + + for _ in 0..2 { + let semaphore = semaphore.clone(); + let active = active.clone(); + + ths.push(thread::spawn(move || { + let mut permit = Permit::new(); + + for n in &[4, 10, 8] { + block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap(); + + active.fetch_add(*n as usize, SeqCst); + + let num_active = active.load(SeqCst); + assert!(num_active <= 10); + + thread::yield_now(); + + active.fetch_sub(*n as usize, SeqCst); + + permit.release(*n, &semaphore); + } + })); + } + + for th in ths.into_iter() { + th.join().unwrap(); + } + + assert_eq!(10, semaphore.available_permits()); + }); +} diff --git a/tokio/src/sync/tests/semaphore_ll.rs b/tokio/src/sync/tests/semaphore_ll.rs index 8dd56c85..bfb07578 100644 --- a/tokio/src/sync/tests/semaphore_ll.rs +++ b/tokio/src/sync/tests/semaphore_ll.rs @@ -1,9 +1,8 @@ use crate::sync::semaphore_ll::{Permit, Semaphore}; -use tokio_test::task; -use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; +use tokio_test::*; #[test] -fn available_permits() { +fn poll_acquire_one_available() { let s = Semaphore::new(100); assert_eq!(s.available_permits(), 100); @@ -11,44 +10,234 @@ fn available_permits() { let mut permit = task::spawn(Permit::new()); assert!(!permit.is_acquired()); - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); assert_eq!(s.available_permits(), 99); assert!(permit.is_acquired()); // Polling again on the same waiter does not claim a new permit - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); assert_eq!(s.available_permits(), 99); assert!(permit.is_acquired()); } #[test] -fn unavailable_permits() { +fn poll_acquire_many_available() { + let s = Semaphore::new(100); + assert_eq!(s.available_permits(), 100); + + // Polling for a permit succeeds immediately + let mut permit = task::spawn(Permit::new()); + assert!(!permit.is_acquired()); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); + assert_eq!(s.available_permits(), 95); + assert!(permit.is_acquired()); + + // Polling again on the same waiter does not claim a new permit + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_eq!(s.available_permits(), 95); + assert!(permit.is_acquired()); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); + assert_eq!(s.available_permits(), 95); + assert!(permit.is_acquired()); + + // Polling for a larger number of permits acquires more + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 8, &s))); + assert_eq!(s.available_permits(), 92); + assert!(permit.is_acquired()); +} + +#[test] +fn try_acquire_one_available() { + let s = Semaphore::new(100); + assert_eq!(s.available_permits(), 100); + + // Polling for a permit succeeds immediately + let mut permit = Permit::new(); + assert!(!permit.is_acquired()); + + assert_ok!(permit.try_acquire(1, &s)); + assert_eq!(s.available_permits(), 99); + assert!(permit.is_acquired()); + + // Polling again on the same waiter does not claim a new permit + assert_ok!(permit.try_acquire(1, &s)); + assert_eq!(s.available_permits(), 99); + assert!(permit.is_acquired()); +} + +#[test] +fn try_acquire_many_available() { + let s = Semaphore::new(100); + assert_eq!(s.available_permits(), 100); + + // Polling for a permit succeeds immediately + let mut permit = Permit::new(); + assert!(!permit.is_acquired()); + + assert_ok!(permit.try_acquire(5, &s)); + assert_eq!(s.available_permits(), 95); + assert!(permit.is_acquired()); + + // Polling again on the same waiter does not claim a new permit + assert_ok!(permit.try_acquire(5, &s)); + assert_eq!(s.available_permits(), 95); + assert!(permit.is_acquired()); +} + +#[test] +fn poll_acquire_one_unavailable() { let s = Semaphore::new(1); let mut permit_1 = task::spawn(Permit::new()); let mut permit_2 = task::spawn(Permit::new()); // Acquire the first permit - assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); assert_eq!(s.available_permits(), 0); permit_2.enter(|cx, mut p| { // Try to acquire the second permit - assert_pending!(p.poll_acquire(cx, &s)); + assert_pending!(p.poll_acquire(cx, 1, &s)); }); - permit_1.release(&s); + permit_1.release(1, &s); assert_eq!(s.available_permits(), 0); assert!(permit_2.is_woken()); - assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - permit_2.release(&s); + permit_2.release(1, &s); assert_eq!(s.available_permits(), 1); } #[test] -fn zero_permits() { +fn forget_acquired() { + let s = Semaphore::new(1); + + // Polling for a permit succeeds immediately + let mut permit = task::spawn(Permit::new()); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + + assert_eq!(s.available_permits(), 0); + + permit.forget(1); + assert_eq!(s.available_permits(), 0); +} + +#[test] +fn forget_waiting() { + let s = Semaphore::new(0); + + // Polling for a permit succeeds immediately + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + + assert_eq!(s.available_permits(), 0); + + permit.forget(1); + + s.add_permits(1); + + assert!(!permit.is_woken()); + assert_eq!(s.available_permits(), 1); +} + +#[test] +fn poll_acquire_many_unavailable() { + let s = Semaphore::new(5); + + let mut permit_1 = task::spawn(Permit::new()); + let mut permit_2 = task::spawn(Permit::new()); + let mut permit_3 = task::spawn(Permit::new()); + + // Acquire the first permit + assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_eq!(s.available_permits(), 4); + + permit_2.enter(|cx, mut p| { + // Try to acquire the second permit + assert_pending!(p.poll_acquire(cx, 5, &s)); + }); + + assert_eq!(s.available_permits(), 0); + + permit_3.enter(|cx, mut p| { + // Try to acquire the third permit + assert_pending!(p.poll_acquire(cx, 3, &s)); + }); + + permit_1.release(1, &s); + + assert_eq!(s.available_permits(), 0); + assert!(permit_2.is_woken()); + assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); + + assert!(!permit_3.is_woken()); + assert_eq!(s.available_permits(), 0); + + permit_2.release(1, &s); + assert!(!permit_3.is_woken()); + assert_eq!(s.available_permits(), 0); + + permit_2.release(2, &s); + assert!(permit_3.is_woken()); + + assert_ready_ok!(permit_3.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); +} + +#[test] +fn try_acquire_one_unavailable() { + let s = Semaphore::new(1); + + let mut permit_1 = Permit::new(); + let mut permit_2 = Permit::new(); + + // Acquire the first permit + assert_ok!(permit_1.try_acquire(1, &s)); + assert_eq!(s.available_permits(), 0); + + assert_err!(permit_2.try_acquire(1, &s)); + + permit_1.release(1, &s); + + assert_eq!(s.available_permits(), 1); + assert_ok!(permit_2.try_acquire(1, &s)); + + permit_2.release(1, &s); + assert_eq!(s.available_permits(), 1); +} + +#[test] +fn try_acquire_many_unavailable() { + let s = Semaphore::new(5); + + let mut permit_1 = Permit::new(); + let mut permit_2 = Permit::new(); + + // Acquire the first permit + assert_ok!(permit_1.try_acquire(1, &s)); + assert_eq!(s.available_permits(), 4); + + assert_err!(permit_2.try_acquire(5, &s)); + + permit_1.release(1, &s); + assert_eq!(s.available_permits(), 5); + + assert_ok!(permit_2.try_acquire(5, &s)); + + permit_2.release(1, &s); + assert_eq!(s.available_permits(), 1); + + permit_2.release(1, &s); + assert_eq!(s.available_permits(), 2); +} + +#[test] +fn poll_acquire_one_zero_permits() { let s = Semaphore::new(0); assert_eq!(s.available_permits(), 0); @@ -56,13 +245,13 @@ fn zero_permits() { // Try to acquire the permit permit.enter(|cx, mut p| { - assert_pending!(p.poll_acquire(cx, &s)); + assert_pending!(p.poll_acquire(cx, 1, &s)); }); s.add_permits(1); assert!(permit.is_woken()); - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); } #[test] @@ -74,15 +263,19 @@ fn validates_max_permits() { #[test] fn close_semaphore_prevents_acquire() { - let s = Semaphore::new(1); + let s = Semaphore::new(5); s.close(); - assert_eq!(1, s.available_permits()); + assert_eq!(5, s.available_permits()); - let mut permit = task::spawn(Permit::new()); + let mut permit_1 = task::spawn(Permit::new()); + let mut permit_2 = task::spawn(Permit::new()); - assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); - assert_eq!(1, s.available_permits()); + assert_ready_err!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_eq!(5, s.available_permits()); + + assert_ready_err!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + assert_eq!(5, s.available_permits()); } #[test] @@ -90,12 +283,12 @@ fn close_semaphore_notifies_permit1() { let s = Semaphore::new(0); let mut permit = task::spawn(Permit::new()); - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); s.close(); assert!(permit.is_woken()); - assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); } #[test] @@ -108,29 +301,170 @@ fn close_semaphore_notifies_permit2() { let mut permit4 = task::spawn(Permit::new()); // Acquire a couple of permits - assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s))); - assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s))); - assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); s.close(); assert!(permit3.is_woken()); assert!(permit4.is_woken()); - assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s))); - assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); assert_eq!(0, s.available_permits()); - permit1.release(&s); + permit1.release(1, &s); assert_eq!(1, s.available_permits()); - assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s))); + assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - permit2.release(&s); + permit2.release(1, &s); assert_eq!(2, s.available_permits()); } + +#[test] +fn poll_acquire_additional_permits_while_waiting_before_assigned() { + let s = Semaphore::new(1); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); + + s.add_permits(1); + assert!(!permit.is_woken()); + + s.add_permits(1); + assert!(permit.is_woken()); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); +} + +#[test] +fn try_acquire_additional_permits_while_waiting_before_assigned() { + let s = Semaphore::new(1); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + + assert_err!(permit.enter(|_, mut p| p.try_acquire(3, &s))); + + s.add_permits(1); + assert!(permit.is_woken()); + + assert_ok!(permit.enter(|_, mut p| p.try_acquire(2, &s))); +} + +#[test] +fn poll_acquire_additional_permits_while_waiting_after_assigned_success() { + let s = Semaphore::new(1); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + + s.add_permits(2); + + assert!(permit.is_woken()); + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); +} + +#[test] +fn poll_acquire_additional_permits_while_waiting_after_assigned_requeue() { + let s = Semaphore::new(1); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + + s.add_permits(2); + + assert!(permit.is_woken()); + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s))); + + s.add_permits(1); + + assert!(permit.is_woken()); + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s))); +} + +#[test] +fn poll_acquire_fewer_permits_while_waiting() { + let s = Semaphore::new(1); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + assert_eq!(s.available_permits(), 0); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + assert_eq!(s.available_permits(), 0); +} + +#[test] +fn poll_acquire_fewer_permits_after_assigned() { + let s = Semaphore::new(1); + + let mut permit1 = task::spawn(Permit::new()); + let mut permit2 = task::spawn(Permit::new()); + + assert_pending!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); + assert_eq!(s.available_permits(), 0); + + assert_pending!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + + s.add_permits(4); + assert!(permit1.is_woken()); + assert!(!permit2.is_woken()); + + assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); + + assert!(permit2.is_woken()); + assert_eq!(s.available_permits(), 1); + + assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); +} + +#[test] +fn forget_partial_1() { + let s = Semaphore::new(0); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + s.add_permits(1); + + assert_eq!(0, s.available_permits()); + + permit.release(1, &s); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); + + assert_eq!(s.available_permits(), 0); +} + +#[test] +fn forget_partial_2() { + let s = Semaphore::new(0); + + let mut permit = task::spawn(Permit::new()); + + assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + s.add_permits(1); + + assert_eq!(0, s.available_permits()); + + permit.release(1, &s); + + s.add_permits(1); + + assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); + assert_eq!(s.available_permits(), 0); +} -- cgit v1.2.3