summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-09 11:49:18 -0800
committerGitHub <noreply@github.com>2020-01-09 11:49:18 -0800
commit275769b5b90e9cf33c8d37f1ba176cacb508399e (patch)
treebcd1dc8ff53be79c7effb48a2761b4841aee9a39 /tokio/src/runtime
parentb70615b299b8f10fd9a4ff0cf3d098b0c3f5298e (diff)
rt: fix shutdown deadlock in threaded scheduler (#2082)
Previously, when the threaded scheduler was in the shutdown process, it would hold a lock while dropping in-flight tasks. If those tasks included a drop handler that attempted to wake a second task, the wake operation would attempt to acquire a lock held by the scheduler. This results in a deadlock. Dropping the lock before dropping tasks resolves the problem. Fixes #2046
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/thread_pool/queue/global.rs1
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs27
2 files changed, 28 insertions, 0 deletions
diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs
index a6f49c01..36dcc729 100644
--- a/tokio/src/runtime/thread_pool/queue/global.rs
+++ b/tokio/src/runtime/thread_pool/queue/global.rs
@@ -92,6 +92,7 @@ impl<T: 'static> Queue<T> {
// Check if the queue is closed. This must happen in the lock.
let len = self.len.unsync_load();
if len & CLOSED == CLOSED {
+ drop(p);
f(Err(task));
return;
}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index aee66e7f..0394db16 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -168,6 +168,33 @@ fn complete_block_on_under_load() {
});
}
+#[test]
+fn shutdown_with_notification() {
+ use crate::stream::StreamExt;
+ use crate::sync::{mpsc, oneshot};
+
+ loom::model(|| {
+ let rt = mk_pool(2);
+ let (done_tx, done_rx) = oneshot::channel::<()>();
+
+ rt.spawn(async move {
+ let (mut tx, mut rx) = mpsc::channel::<()>(10);
+
+ crate::spawn(async move {
+ crate::task::spawn_blocking(move || {
+ let _ = tx.try_send(());
+ });
+
+ let _ = done_rx.await;
+ });
+
+ while let Some(_) = rx.next().await {}
+
+ let _ = done_tx.send(());
+ });
+ });
+}
+
fn mk_pool(num_threads: usize) -> Runtime {
runtime::Builder::new()
.threaded_scheduler()