diff options
Diffstat (limited to 'tokio/src/sync/batch_semaphore.rs')
-rw-r--r-- | tokio/src/sync/batch_semaphore.rs | 146 |
1 files changed, 66 insertions, 80 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 3656c109..5d15311d 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -129,13 +129,8 @@ impl Semaphore { return; } - // Assign permits to the wait queue, returning a list containing all the - // waiters at the back of the queue that received enough permits to wake - // up. - let notified = self.add_permits_locked(added, self.waiters.lock().unwrap()); - - // Once we release the lock, notify all woken waiters. - notify_all(notified); + // Assign permits to the wait queue + self.add_permits_locked(added, self.waiters.lock().unwrap()); } /// Closes the semaphore. This prevents the semaphore from issuing new @@ -144,20 +139,22 @@ impl Semaphore { // semaphore implementation. #[allow(dead_code)] pub(crate) fn close(&self) { - let notified = { - let mut waiters = self.waiters.lock().unwrap(); - // If the semaphore's permits counter has enough permits for an - // unqueued waiter to acquire all the permits it needs immediately, - // it won't touch the wait list. Therefore, we have to set a bit on - // the permit counter as well. However, we must do this while - // holding the lock --- otherwise, if we set the bit and then wait - // to acquire the lock we'll enter an inconsistent state where the - // permit counter is closed, but the wait list is not. - self.permits.fetch_or(Self::CLOSED, Release); - waiters.closed = true; - waiters.queue.take_all() - }; - notify_all(notified) + let mut waiters = self.waiters.lock().unwrap(); + // If the semaphore's permits counter has enough permits for an + // unqueued waiter to acquire all the permits it needs immediately, + // it won't touch the wait list. Therefore, we have to set a bit on + // the permit counter as well. However, we must do this while + // holding the lock --- otherwise, if we set the bit and then wait + // to acquire the lock we'll enter an inconsistent state where the + // permit counter is closed, but the wait list is not. + self.permits.fetch_or(Self::CLOSED, Release); + waiters.closed = true; + while let Some(mut waiter) = waiters.queue.pop_back() { + let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; + if let Some(waker) = waker { + waker.wake(); + } + } } pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> { @@ -189,58 +186,60 @@ impl Semaphore { /// Release `rem` permits to the semaphore's wait list, starting from the /// end of the queue. - /// - /// This returns a new `LinkedList` containing all the waiters that received - /// enough permits to be notified. Once the lock on the wait list is - /// released, this list should be drained and the waiters in it notified. /// /// If `rem` exceeds the number of permits needed by the wait list, the /// remainder are assigned back to the semaphore. - fn add_permits_locked( - &self, - mut rem: usize, - mut waiters: MutexGuard<'_, Waitlist>, - ) -> LinkedList<Waiter> { - // Starting from the back of the wait queue, assign each waiter as many - // permits as it needs until we run out of permits to assign. - let mut last = None; - for waiter in waiters.queue.iter().rev() { - // Was the waiter assigned enough permits to wake it? - if !waiter.assign_permits(&mut rem) { - break; + fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { + let mut wakers: [Option<Waker>; 8] = Default::default(); + let mut lock = Some(waiters); + let mut is_empty = false; + while rem > 0 { + let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap()); + 'inner: for slot in &mut wakers[..] { + // Was the waiter assigned enough permits to wake it? + match waiters.queue.last() { + Some(waiter) => { + if !waiter.assign_permits(&mut rem) { + break 'inner; + } + } + None => { + is_empty = true; + // If we assigned permits to all the waiters in the queue, and there are + // still permits left over, assign them back to the semaphore. + break 'inner; + } + }; + let mut waiter = waiters.queue.pop_back().unwrap(); + *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; } - last = Some(NonNull::from(waiter)); - } - // If we assigned permits to all the waiters in the queue, and there are - // still permits left over, assign them back to the semaphore. - if rem > 0 { - let permits = rem << Self::PERMIT_SHIFT; - assert!( - permits < Self::MAX_PERMITS, - "cannot add more than MAX_PERMITS permits ({})", - Self::MAX_PERMITS - ); - let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release); - assert!( - prev + permits <= Self::MAX_PERMITS, - "number of added permits ({}) would overflow MAX_PERMITS ({})", - rem, - Self::MAX_PERMITS - ); - } + if rem > 0 && is_empty { + let permits = rem << Self::PERMIT_SHIFT; + assert!( + permits < Self::MAX_PERMITS, + "cannot add more than MAX_PERMITS permits ({})", + Self::MAX_PERMITS + ); + let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release); + assert!( + prev + permits <= Self::MAX_PERMITS, + "number of added permits ({}) would overflow MAX_PERMITS ({})", + rem, + Self::MAX_PERMITS + ); + rem = 0; + } - // Split off the queue at the last waiter that was satisfied, creating a - // new list. Once we release the lock, we'll drain this list and notify - // the waiters in it. - if let Some(waiter) = last { - // Safety: it's only safe to call `split_back` with a pointer to a - // node in the same list as the one we call `split_back` on. Since - // we got the waiter pointer from the list's iterator, this is fine. - unsafe { waiters.queue.split_back(waiter) } - } else { - LinkedList::new() + drop(waiters); // release the lock + + wakers + .iter_mut() + .filter_map(Option::take) + .for_each(Waker::wake); } + + assert_eq!(rem, 0); } fn poll_acquire( @@ -354,18 +353,6 @@ impl fmt::Debug for Semaphore { } } -/// Pop all waiters from `list`, starting at the end of the queue, and notify -/// them. -fn notify_all(mut list: LinkedList<Waiter>) { - while let Some(waiter) = list.pop_back() { - let waker = unsafe { waiter.as_ref().waker.with_mut(|waker| (*waker).take()) }; - - waker - .expect("if a node is in the wait list, it must have a waker") - .wake(); - } -} - impl Waiter { fn new(num_permits: u16) -> Self { Waiter { @@ -471,8 +458,7 @@ impl Drop for Acquire<'_> { let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire); if acquired_permits > 0 { - let notified = self.semaphore.add_permits_locked(acquired_permits, waiters); - notify_all(notified); + self.semaphore.add_permits_locked(acquired_permits, waiters); } } } |