diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-01-06 15:37:03 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-06 15:37:03 -0800 |
commit | 855d39f849cc16d3c68df5abf0bbb28e3351cdf0 (patch) | |
tree | 8b3c01a60f1eb589afed829547bffce5f6d9cc97 /tokio | |
parent | 798e86821f6e06fba552bd670c5887ce3b6ff698 (diff) |
Fix basic_scheduler deadlock when waking during drop (#2062)
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/task/queue.rs | 33 | ||||
-rw-r--r-- | tokio/tests/rt_basic.rs | 59 |
2 files changed, 79 insertions, 13 deletions
diff --git a/tokio/src/task/queue.rs b/tokio/src/task/queue.rs index a2badc8a..048960a4 100644 --- a/tokio/src/task/queue.rs +++ b/tokio/src/task/queue.rs @@ -261,19 +261,26 @@ where /// If the mutex around the remote queue is poisoned _and_ the current /// thread is not already panicking. This is safe to call in a `Drop` impl. fn close_remote(&self) { - #[allow(clippy::match_wild_err_arm)] - let mut lock = match self.remote_queue.lock() { - // If the lock is poisoned, but the thread is already panicking, - // avoid a double panic. This is necessary since this fn can be - // called in a drop impl. - Err(_) if std::thread::panicking() => return, - Err(_) => panic!("mutex poisoned"), - Ok(lock) => lock, - }; - lock.open = false; - - while let Some(task) = lock.queue.pop_front() { - task.shutdown(); + loop { + #[allow(clippy::match_wild_err_arm)] + let mut lock = match self.remote_queue.lock() { + // If the lock is poisoned, but the thread is already panicking, + // avoid a double panic. This is necessary since this fn can be + // called in a drop impl. + Err(_) if std::thread::panicking() => return, + Err(_) => panic!("mutex poisoned"), + Ok(lock) => lock, + }; + lock.open = false; + + if let Some(task) = lock.queue.pop_front() { + // Release lock before dropping task, in case + // task tries to re-schedule in its Drop. + drop(lock); + task.shutdown(); + } else { + return; + } } } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 38a72692..39250c4c 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -63,6 +63,65 @@ fn acquire_mutex_in_drop() { drop(rt); } +#[test] +fn wake_while_rt_is_dropping() { + use tokio::task; + + struct OnDrop<F: FnMut()>(F); + + impl<F: FnMut()> Drop for OnDrop<F> { + fn drop(&mut self) { + (self.0)() + } + } + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + let mut rt = rt(); + + let h1 = rt.handle().clone(); + + rt.handle().spawn(async move { + // Ensure a waker gets stored in oneshot 1. + let _ = rx1.await; + tx3.send(()).unwrap(); + }); + + rt.handle().spawn(async move { + // When this task is dropped, we'll be "closing remotes". + // We spawn a new task that owns the `tx1`, to move its Drop + // out of here. + // + // Importantly, the oneshot 1 has a waker already stored, so + // the eventual drop here will try to re-schedule again. + let mut opt_tx1 = Some(tx1); + let _d = OnDrop(move || { + let tx1 = opt_tx1.take().unwrap(); + h1.spawn(async move { + tx1.send(()).unwrap(); + }); + }); + let _ = rx2.await; + }); + + rt.handle().spawn(async move { + let _ = rx3.await; + // We'll never get here, but once task 3 drops, this will + // force task 2 to re-schedule since it's waiting on oneshot 2. + tx2.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); +} + fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() |