summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/batch_semaphore.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/batch_semaphore.rs')
-rw-r--r--tokio/src/sync/batch_semaphore.rs146
1 files changed, 66 insertions, 80 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index 3656c109..5d15311d 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -129,13 +129,8 @@ impl Semaphore {
return;
}
- // Assign permits to the wait queue, returning a list containing all the
- // waiters at the back of the queue that received enough permits to wake
- // up.
- let notified = self.add_permits_locked(added, self.waiters.lock().unwrap());
-
- // Once we release the lock, notify all woken waiters.
- notify_all(notified);
+ // Assign permits to the wait queue
+ self.add_permits_locked(added, self.waiters.lock().unwrap());
}
/// Closes the semaphore. This prevents the semaphore from issuing new
@@ -144,20 +139,22 @@ impl Semaphore {
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
- let notified = {
- let mut waiters = self.waiters.lock().unwrap();
- // If the semaphore's permits counter has enough permits for an
- // unqueued waiter to acquire all the permits it needs immediately,
- // it won't touch the wait list. Therefore, we have to set a bit on
- // the permit counter as well. However, we must do this while
- // holding the lock --- otherwise, if we set the bit and then wait
- // to acquire the lock we'll enter an inconsistent state where the
- // permit counter is closed, but the wait list is not.
- self.permits.fetch_or(Self::CLOSED, Release);
- waiters.closed = true;
- waiters.queue.take_all()
- };
- notify_all(notified)
+ let mut waiters = self.waiters.lock().unwrap();
+ // If the semaphore's permits counter has enough permits for an
+ // unqueued waiter to acquire all the permits it needs immediately,
+ // it won't touch the wait list. Therefore, we have to set a bit on
+ // the permit counter as well. However, we must do this while
+ // holding the lock --- otherwise, if we set the bit and then wait
+ // to acquire the lock we'll enter an inconsistent state where the
+ // permit counter is closed, but the wait list is not.
+ self.permits.fetch_or(Self::CLOSED, Release);
+ waiters.closed = true;
+ while let Some(mut waiter) = waiters.queue.pop_back() {
+ let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
+ if let Some(waker) = waker {
+ waker.wake();
+ }
+ }
}
pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
@@ -189,58 +186,60 @@ impl Semaphore {
/// Release `rem` permits to the semaphore's wait list, starting from the
/// end of the queue.
- ///
- /// This returns a new `LinkedList` containing all the waiters that received
- /// enough permits to be notified. Once the lock on the wait list is
- /// released, this list should be drained and the waiters in it notified.
///
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
- fn add_permits_locked(
- &self,
- mut rem: usize,
- mut waiters: MutexGuard<'_, Waitlist>,
- ) -> LinkedList<Waiter> {
- // Starting from the back of the wait queue, assign each waiter as many
- // permits as it needs until we run out of permits to assign.
- let mut last = None;
- for waiter in waiters.queue.iter().rev() {
- // Was the waiter assigned enough permits to wake it?
- if !waiter.assign_permits(&mut rem) {
- break;
+ fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
+ let mut wakers: [Option<Waker>; 8] = Default::default();
+ let mut lock = Some(waiters);
+ let mut is_empty = false;
+ while rem > 0 {
+ let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
+ 'inner: for slot in &mut wakers[..] {
+ // Was the waiter assigned enough permits to wake it?
+ match waiters.queue.last() {
+ Some(waiter) => {
+ if !waiter.assign_permits(&mut rem) {
+ break 'inner;
+ }
+ }
+ None => {
+ is_empty = true;
+ // If we assigned permits to all the waiters in the queue, and there are
+ // still permits left over, assign them back to the semaphore.
+ break 'inner;
+ }
+ };
+ let mut waiter = waiters.queue.pop_back().unwrap();
+ *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
}
- last = Some(NonNull::from(waiter));
- }
- // If we assigned permits to all the waiters in the queue, and there are
- // still permits left over, assign them back to the semaphore.
- if rem > 0 {
- let permits = rem << Self::PERMIT_SHIFT;
- assert!(
- permits < Self::MAX_PERMITS,
- "cannot add more than MAX_PERMITS permits ({})",
- Self::MAX_PERMITS
- );
- let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
- assert!(
- prev + permits <= Self::MAX_PERMITS,
- "number of added permits ({}) would overflow MAX_PERMITS ({})",
- rem,
- Self::MAX_PERMITS
- );
- }
+ if rem > 0 && is_empty {
+ let permits = rem << Self::PERMIT_SHIFT;
+ assert!(
+ permits < Self::MAX_PERMITS,
+ "cannot add more than MAX_PERMITS permits ({})",
+ Self::MAX_PERMITS
+ );
+ let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
+ assert!(
+ prev + permits <= Self::MAX_PERMITS,
+ "number of added permits ({}) would overflow MAX_PERMITS ({})",
+ rem,
+ Self::MAX_PERMITS
+ );
+ rem = 0;
+ }
- // Split off the queue at the last waiter that was satisfied, creating a
- // new list. Once we release the lock, we'll drain this list and notify
- // the waiters in it.
- if let Some(waiter) = last {
- // Safety: it's only safe to call `split_back` with a pointer to a
- // node in the same list as the one we call `split_back` on. Since
- // we got the waiter pointer from the list's iterator, this is fine.
- unsafe { waiters.queue.split_back(waiter) }
- } else {
- LinkedList::new()
+ drop(waiters); // release the lock
+
+ wakers
+ .iter_mut()
+ .filter_map(Option::take)
+ .for_each(Waker::wake);
}
+
+ assert_eq!(rem, 0);
}
fn poll_acquire(
@@ -354,18 +353,6 @@ impl fmt::Debug for Semaphore {
}
}
-/// Pop all waiters from `list`, starting at the end of the queue, and notify
-/// them.
-fn notify_all(mut list: LinkedList<Waiter>) {
- while let Some(waiter) = list.pop_back() {
- let waker = unsafe { waiter.as_ref().waker.with_mut(|waker| (*waker).take()) };
-
- waker
- .expect("if a node is in the wait list, it must have a waker")
- .wake();
- }
-}
-
impl Waiter {
fn new(num_permits: u16) -> Self {
Waiter {
@@ -471,8 +458,7 @@ impl Drop for Acquire<'_> {
let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
if acquired_permits > 0 {
- let notified = self.semaphore.add_permits_locked(acquired_permits, waiters);
- notify_all(notified);
+ self.semaphore.add_permits_locked(acquired_permits, waiters);
}
}
}