summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2020-03-27 16:14:07 -0700
committerGitHub <noreply@github.com>2020-03-27 16:14:07 -0700
commit00725f6876821f2ec5246a807563e35c5e53f3e1 (patch)
tree690d5f18d3c7c84489a4e25ce51986232842fb2a /tokio/src/sync
parent5c71268bb88a1125e822f5a0a68ff996f6811736 (diff)
sync: fix possible dangling pointer in semaphore (#2340)
## Motivation When cancelling futures which are waiting to acquire semaphore permits, there is a possible dangling pointer if notified futures are dropped after the notified wakers have been split into a separate list. Because these futures' wait queue nodes are no longer in the main list guarded by the lock, their `Drop` impls will complete immediately, and they may be dropped while still in the list of tasks to notify. ## Solution This branch fixes this by popping from the wait list inside the lock. The wakers of popped nodes are temporarily stored in a stack array, so that they can be notified after the lock is released. Since the size of the stack array is fixed, we may in some cases have to loop multiple times, acquiring and releasing the lock, until all permits have been released. This may also have the possible side advantage of preventing a thread releasing a very large number of permits from starving other threads that need to enqueue waiters. I've also added a loom test that can reliably reproduce a segfault on master, but passes on this branch (after a lot of iterations). Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/batch_semaphore.rs146
-rw-r--r--tokio/src/sync/tests/loom_semaphore_batch.rs44
2 files changed, 110 insertions, 80 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index 3656c109..5d15311d 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -129,13 +129,8 @@ impl Semaphore {
return;
}
- // Assign permits to the wait queue, returning a list containing all the
- // waiters at the back of the queue that received enough permits to wake
- // up.
- let notified = self.add_permits_locked(added, self.waiters.lock().unwrap());
-
- // Once we release the lock, notify all woken waiters.
- notify_all(notified);
+ // Assign permits to the wait queue
+ self.add_permits_locked(added, self.waiters.lock().unwrap());
}
/// Closes the semaphore. This prevents the semaphore from issuing new
@@ -144,20 +139,22 @@ impl Semaphore {
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
- let notified = {
- let mut waiters = self.waiters.lock().unwrap();
- // If the semaphore's permits counter has enough permits for an
- // unqueued waiter to acquire all the permits it needs immediately,
- // it won't touch the wait list. Therefore, we have to set a bit on
- // the permit counter as well. However, we must do this while
- // holding the lock --- otherwise, if we set the bit and then wait
- // to acquire the lock we'll enter an inconsistent state where the
- // permit counter is closed, but the wait list is not.
- self.permits.fetch_or(Self::CLOSED, Release);
- waiters.closed = true;
- waiters.queue.take_all()
- };
- notify_all(notified)
+ let mut waiters = self.waiters.lock().unwrap();
+ // If the semaphore's permits counter has enough permits for an
+ // unqueued waiter to acquire all the permits it needs immediately,
+ // it won't touch the wait list. Therefore, we have to set a bit on
+ // the permit counter as well. However, we must do this while
+ // holding the lock --- otherwise, if we set the bit and then wait
+ // to acquire the lock we'll enter an inconsistent state where the
+ // permit counter is closed, but the wait list is not.
+ self.permits.fetch_or(Self::CLOSED, Release);
+ waiters.closed = true;
+ while let Some(mut waiter) = waiters.queue.pop_back() {
+ let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
+ if let Some(waker) = waker {
+ waker.wake();
+ }
+ }
}
pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
@@ -189,58 +186,60 @@ impl Semaphore {
/// Release `rem` permits to the semaphore's wait list, starting from the
/// end of the queue.
- ///
- /// This returns a new `LinkedList` containing all the waiters that received
- /// enough permits to be notified. Once the lock on the wait list is
- /// released, this list should be drained and the waiters in it notified.
///
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
- fn add_permits_locked(
- &self,
- mut rem: usize,
- mut waiters: MutexGuard<'_, Waitlist>,
- ) -> LinkedList<Waiter> {
- // Starting from the back of the wait queue, assign each waiter as many
- // permits as it needs until we run out of permits to assign.
- let mut last = None;
- for waiter in waiters.queue.iter().rev() {
- // Was the waiter assigned enough permits to wake it?
- if !waiter.assign_permits(&mut rem) {
- break;
+ fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
+ let mut wakers: [Option<Waker>; 8] = Default::default();
+ let mut lock = Some(waiters);
+ let mut is_empty = false;
+ while rem > 0 {
+ let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
+ 'inner: for slot in &mut wakers[..] {
+ // Was the waiter assigned enough permits to wake it?
+ match waiters.queue.last() {
+ Some(waiter) => {
+ if !waiter.assign_permits(&mut rem) {
+ break 'inner;
+ }
+ }
+ None => {
+ is_empty = true;
+ // If we assigned permits to all the waiters in the queue, and there are
+ // still permits left over, assign them back to the semaphore.
+ break 'inner;
+ }
+ };
+ let mut waiter = waiters.queue.pop_back().unwrap();
+ *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
}
- last = Some(NonNull::from(waiter));
- }
- // If we assigned permits to all the waiters in the queue, and there are
- // still permits left over, assign them back to the semaphore.
- if rem > 0 {
- let permits = rem << Self::PERMIT_SHIFT;
- assert!(
- permits < Self::MAX_PERMITS,
- "cannot add more than MAX_PERMITS permits ({})",
- Self::MAX_PERMITS
- );
- let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
- assert!(
- prev + permits <= Self::MAX_PERMITS,
- "number of added permits ({}) would overflow MAX_PERMITS ({})",
- rem,
- Self::MAX_PERMITS
- );
- }
+ if rem > 0 && is_empty {
+ let permits = rem << Self::PERMIT_SHIFT;
+ assert!(
+ permits < Self::MAX_PERMITS,
+ "cannot add more than MAX_PERMITS permits ({})",
+ Self::MAX_PERMITS
+ );
+ let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
+ assert!(
+ prev + permits <= Self::MAX_PERMITS,
+ "number of added permits ({}) would overflow MAX_PERMITS ({})",
+ rem,
+ Self::MAX_PERMITS
+ );
+ rem = 0;
+ }
- // Split off the queue at the last waiter that was satisfied, creating a
- // new list. Once we release the lock, we'll drain this list and notify
- // the waiters in it.
- if let Some(waiter) = last {
- // Safety: it's only safe to call `split_back` with a pointer to a
- // node in the same list as the one we call `split_back` on. Since
- // we got the waiter pointer from the list's iterator, this is fine.
- unsafe { waiters.queue.split_back(waiter) }
- } else {
- LinkedList::new()
+ drop(waiters); // release the lock
+
+ wakers
+ .iter_mut()
+ .filter_map(Option::take)
+ .for_each(Waker::wake);
}
+
+ assert_eq!(rem, 0);
}
fn poll_acquire(
@@ -354,18 +353,6 @@ impl fmt::Debug for Semaphore {
}
}
-/// Pop all waiters from `list`, starting at the end of the queue, and notify
-/// them.
-fn notify_all(mut list: LinkedList<Waiter>) {
- while let Some(waiter) = list.pop_back() {
- let waker = unsafe { waiter.as_ref().waker.with_mut(|waker| (*waker).take()) };
-
- waker
- .expect("if a node is in the wait list, it must have a waker")
- .wake();
- }
-}
-
impl Waiter {
fn new(num_permits: u16) -> Self {
Waiter {
@@ -471,8 +458,7 @@ impl Drop for Acquire<'_> {
let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
if acquired_permits > 0 {
- let notified = self.semaphore.add_permits_locked(acquired_permits, waiters);
- notify_all(notified);
+ self.semaphore.add_permits_locked(acquired_permits, waiters);
}
}
}
diff --git a/tokio/src/sync/tests/loom_semaphore_batch.rs b/tokio/src/sync/tests/loom_semaphore_batch.rs
index 4c1936c5..76a1bc00 100644
--- a/tokio/src/sync/tests/loom_semaphore_batch.rs
+++ b/tokio/src/sync/tests/loom_semaphore_batch.rs
@@ -114,6 +114,50 @@ fn concurrent_close() {
}
#[test]
+fn concurrent_cancel() {
+ async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
+ let mut acquire1 = Some(semaphore.acquire(1));
+ let mut acquire2 = Some(semaphore.acquire(1));
+ poll_fn(|cx| {
+ // poll the acquire future once, and then immediately throw
+ // it away. this simulates a situation where a future is
+ // polled and then cancelled, such as by a timeout.
+ if let Some(acquire) = acquire1.take() {
+ pin!(acquire);
+ let _ = acquire.poll(cx);
+ }
+ if let Some(acquire) = acquire2.take() {
+ pin!(acquire);
+ let _ = acquire.poll(cx);
+ }
+ Poll::Ready(())
+ })
+ .await
+ }
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(0));
+ let t1 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+ let t2 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+ let t3 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+
+ t1.join().unwrap();
+ semaphore.release(10);
+ t2.join().unwrap();
+ t3.join().unwrap();
+ });
+}
+
+#[test]
fn batch() {
let mut b = loom::model::Builder::new();
b.preemption_bound = Some(1);