summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-08 21:23:10 -0800
committerGitHub <noreply@github.com>2020-01-08 21:23:10 -0800
commit6406328176cdecf15cad69b327597a4d4d0b8e20 (patch)
treeb802f2ac711188d4338169d7ff175bf917147126 /tokio
parentf28c9f0d17a4dca2003bbee57a09f62c3795c2d2 (diff)
rt: fix threaded scheduler shutdown deadlock (#2074)
Previously, if an IO event was received during the runtime shutdown process, it was possible to enter a deadlock. This was due to the scheduler shutdown logic not expecting tasks to get scheduled once the worker was in the shutdown process. This patch fixes the deadlock by checking the queues for new tasks after each call to park. If a new task is received, it is forcefully shutdown. Fixes #2061
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs40
-rw-r--r--tokio/tests/rt_basic.rs59
-rw-r--r--tokio/tests/rt_common.rs94
3 files changed, 120 insertions, 73 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 44df8b74..298ef06d 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -533,27 +533,41 @@ impl GenerationGuard<'_> {
// Transition all tasks owned by the worker to canceled.
self.owned().owned_tasks.shutdown();
- // First, drain all tasks from both the local & global queue.
- while let Some(task) = self.owned().work_queue.pop_local_first() {
- task.shutdown();
- }
-
- // Notify all workers in case they have pending tasks to drop
- //
- // Not super efficient, but we are also shutting down.
- self.worker.slices.notify_all();
+ // Always notify the first time around. This flushes any released tasks
+ // that happened before the call to `Worker::shutdown`
+ let mut notify = true;
// The worker can only shutdown once there are no further owned tasks.
- while !self.owned().owned_tasks.is_empty() {
+ loop {
+ // First, drain all tasks from both the local & global queue.
+ while let Some(task) = self.owned().work_queue.pop_local_first() {
+ notify = true;
+ task.shutdown();
+ }
+
+ if notify {
+ // If any tasks are shutdown, they may be pushed on another
+ // worker's `pending_drop` stack. However, we don't know which
+ // workers need to be notified, so we just notify all of them.
+ // Since this is a shutdown process, excessive notification is
+ // not a huge deal.
+ self.worker.slices.notify_all();
+ notify = false;
+ }
+
+ // Try draining more tasks
+ self.drain_tasks_pending_drop();
+
+ if self.owned().owned_tasks.is_empty() {
+ break;
+ }
+
// Wait until task that this worker owns are released.
//
// `transition_to_parked` is not called as we are not working
// anymore. When a task is released, the owning worker is unparked
// directly.
self.park_mut().park().expect("park failed");
-
- // Try draining more tasks
- self.drain_tasks_pending_drop();
}
}
diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs
index 39250c4c..38a72692 100644
--- a/tokio/tests/rt_basic.rs
+++ b/tokio/tests/rt_basic.rs
@@ -63,65 +63,6 @@ fn acquire_mutex_in_drop() {
drop(rt);
}
-#[test]
-fn wake_while_rt_is_dropping() {
- use tokio::task;
-
- struct OnDrop<F: FnMut()>(F);
-
- impl<F: FnMut()> Drop for OnDrop<F> {
- fn drop(&mut self) {
- (self.0)()
- }
- }
-
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = oneshot::channel();
- let (tx3, rx3) = oneshot::channel();
-
- let mut rt = rt();
-
- let h1 = rt.handle().clone();
-
- rt.handle().spawn(async move {
- // Ensure a waker gets stored in oneshot 1.
- let _ = rx1.await;
- tx3.send(()).unwrap();
- });
-
- rt.handle().spawn(async move {
- // When this task is dropped, we'll be "closing remotes".
- // We spawn a new task that owns the `tx1`, to move its Drop
- // out of here.
- //
- // Importantly, the oneshot 1 has a waker already stored, so
- // the eventual drop here will try to re-schedule again.
- let mut opt_tx1 = Some(tx1);
- let _d = OnDrop(move || {
- let tx1 = opt_tx1.take().unwrap();
- h1.spawn(async move {
- tx1.send(()).unwrap();
- });
- });
- let _ = rx2.await;
- });
-
- rt.handle().spawn(async move {
- let _ = rx3.await;
- // We'll never get here, but once task 3 drops, this will
- // force task 2 to re-schedule since it's waiting on oneshot 2.
- tx2.send(()).unwrap();
- });
-
- // Tick the loop
- rt.block_on(async {
- task::yield_now().await;
- });
-
- // Drop the rt
- drop(rt);
-}
-
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 15f5de6c..31edd10a 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -41,7 +41,7 @@ fn send_sync_bound() {
}
rt_test! {
- use tokio::net::{TcpListener, TcpStream};
+ use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
@@ -618,6 +618,98 @@ rt_test! {
}
#[test]
+ fn wake_while_rt_is_dropping() {
+ use tokio::task;
+
+ struct OnDrop<F: FnMut()>(F);
+
+ impl<F: FnMut()> Drop for OnDrop<F> {
+ fn drop(&mut self) {
+ (self.0)()
+ }
+ }
+
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ let mut rt = rt();
+
+ let h1 = rt.handle().clone();
+
+ rt.handle().spawn(async move {
+ // Ensure a waker gets stored in oneshot 1.
+ let _ = rx1.await;
+ tx3.send(()).unwrap();
+ });
+
+ rt.handle().spawn(async move {
+ // When this task is dropped, we'll be "closing remotes".
+ // We spawn a new task that owns the `tx1`, to move its Drop
+ // out of here.
+ //
+ // Importantly, the oneshot 1 has a waker already stored, so
+ // the eventual drop here will try to re-schedule again.
+ let mut opt_tx1 = Some(tx1);
+ let _d = OnDrop(move || {
+ let tx1 = opt_tx1.take().unwrap();
+ h1.spawn(async move {
+ tx1.send(()).unwrap();
+ });
+ });
+ let _ = rx2.await;
+ });
+
+ rt.handle().spawn(async move {
+ let _ = rx3.await;
+ // We'll never get here, but once task 3 drops, this will
+ // force task 2 to re-schedule since it's waiting on oneshot 2.
+ tx2.send(()).unwrap();
+ });
+
+ // Tick the loop
+ rt.block_on(async {
+ task::yield_now().await;
+ });
+
+ // Drop the rt
+ drop(rt);
+ }
+
+ #[test]
+ fn io_notify_while_shutting_down() {
+ use std::net::Ipv6Addr;
+
+ for _ in 1..100 {
+ let mut runtime = rt();
+
+ runtime.block_on(async {
+ let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
+ let addr = socket.local_addr().unwrap();
+ let (mut recv_half, mut send_half) = socket.split();
+
+ tokio::spawn(async move {
+ let mut buf = [0];
+ loop {
+ recv_half.recv_from(&mut buf).await.unwrap();
+ std::thread::sleep(Duration::from_millis(2));
+ }
+ });
+
+ tokio::spawn(async move {
+ let buf = [0];
+ loop {
+ send_half.send_to(&buf, &addr).await.unwrap();
+ tokio::time::delay_for(Duration::from_millis(1)).await;
+ }
+ });
+
+ tokio::time::delay_for(Duration::from_millis(5)).await;
+ });
+ }
+ }
+
+ #[test]
fn runtime_in_thread_local() {
use std::cell::RefCell;
use std::thread;