diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-09 11:49:18 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-09 11:49:18 -0800 |
commit | 275769b5b90e9cf33c8d37f1ba176cacb508399e (patch) | |
tree | bcd1dc8ff53be79c7effb48a2761b4841aee9a39 /tokio/src/runtime | |
parent | b70615b299b8f10fd9a4ff0cf3d098b0c3f5298e (diff) |
rt: fix shutdown deadlock in threaded scheduler (#2082)
Previously, when the threaded scheduler was in the shutdown process, it
would hold a lock while dropping in-flight tasks. If those tasks
included a drop handler that attempted to wake a second task, the wake
operation would attempt to acquire a lock held by the scheduler. This
results in a deadlock.
Dropping the lock before dropping tasks resolves the problem.
Fixes #2046
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r-- | tokio/src/runtime/thread_pool/queue/global.rs | 1 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 27 |
2 files changed, 28 insertions, 0 deletions
diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs index a6f49c01..36dcc729 100644 --- a/tokio/src/runtime/thread_pool/queue/global.rs +++ b/tokio/src/runtime/thread_pool/queue/global.rs @@ -92,6 +92,7 @@ impl<T: 'static> Queue<T> { // Check if the queue is closed. This must happen in the lock. let len = self.len.unsync_load(); if len & CLOSED == CLOSED { + drop(p); f(Err(task)); return; } diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index aee66e7f..0394db16 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -168,6 +168,33 @@ fn complete_block_on_under_load() { }); } +#[test] +fn shutdown_with_notification() { + use crate::stream::StreamExt; + use crate::sync::{mpsc, oneshot}; + + loom::model(|| { + let rt = mk_pool(2); + let (done_tx, done_rx) = oneshot::channel::<()>(); + + rt.spawn(async move { + let (mut tx, mut rx) = mpsc::channel::<()>(10); + + crate::spawn(async move { + crate::task::spawn_blocking(move || { + let _ = tx.try_send(()); + }); + + let _ = done_rx.await; + }); + + while let Some(_) = rx.next().await {} + + let _ = done_tx.send(()); + }); + }); +} + fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new() .threaded_scheduler() |