summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbdonlan <bdonlan@gmail.com>2020-12-07 20:55:02 -0800
committerGitHub <noreply@github.com>2020-12-07 20:55:02 -0800
commit57dffb9dfe9e4c0f12429246540add3975f4a754 (patch)
tree1890e495daa058f06c8a738de4c88b0aeea52f77
parent62023dffe5396ee1a0380f12c7530bf4ff59fe4a (diff)
rt: fix deadlock in shutdown (#3228)
Previously, the runtime shutdown logic would first-hand control over all cores to a single thread, which would sequentially shut down all tasks on the core and then wait for them to complete. This could deadlock when one task is waiting for a later core's task to complete. For example, in the newly added test, we have a `block_in_place` task that is waiting for another task to be dropped. If the latter task adds its core to the shutdown list later than the former, we end up waiting forever for the `block_in_place` task to complete. Additionally, there also was a bug wherein we'd attempt to park on the parker after shutting it down which was fixed as part of the refactors above. This change restructures the code to bring all tasks to a halt (and do any parking needed) before we collapse to a single thread to avoid this deadlock. There was also an issue in which canceled tasks would not unpark the originating thread, due to what appears to be some sort of optimization gone wrong. This has been fixed to be much more conservative in selecting when not to unpark the source thread (this may be too conservative; please take a look at the changes to `release()`). Fixes: #2789
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs61
-rw-r--r--tokio/src/time/driver/mod.rs2
-rw-r--r--tokio/tests/rt_threaded.rs14
3 files changed, 52 insertions, 25 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 400ddee3..31712e44 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -78,11 +78,12 @@ pub(super) struct Shared {
/// Coordinates idle workers
idle: Idle,
- /// Workers have have observed the shutdown signal
+ /// Cores that have observed the shutdown signal
///
/// The core is **not** placed back in the worker to avoid it from being
/// stolen by a thread that was spawned as part of `block_in_place`.
- shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
+ #[allow(clippy::vec_box)] // we're moving an already-boxed value
+ shutdown_cores: Mutex<Vec<Box<Core>>>,
}
/// Used to communicate with a worker from other threads.
@@ -157,7 +158,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
remotes: remotes.into_boxed_slice(),
inject: queue::Inject::new(),
idle: Idle::new(size),
- shutdown_workers: Mutex::new(vec![]),
+ shutdown_cores: Mutex::new(vec![]),
});
let mut launch = Launch(vec![]);
@@ -328,8 +329,10 @@ impl Context {
}
}
+ core.pre_shutdown(&self.worker);
+
// Signal shutdown
- self.worker.shared.shutdown(core, self.worker.clone());
+ self.worker.shared.shutdown(core);
Err(())
}
@@ -546,11 +549,9 @@ impl Core {
}
}
- // Shutdown the core
- fn shutdown(&mut self, worker: &Worker) {
- // Take the core
- let mut park = self.park.take().expect("park missing");
-
+ // Signals all tasks to shut down, and waits for them to complete. Must run
+ // before we enter the single-threaded phase of shutdown processing.
+ fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
for header in self.tasks.iter() {
header.shutdown();
@@ -564,8 +565,17 @@ impl Core {
}
// Wait until signalled
+ let park = self.park.as_mut().expect("park missing");
park.park().expect("park failed");
}
+ }
+
+ // Shutdown the core
+ fn shutdown(&mut self) {
+ assert!(self.tasks.is_empty());
+
+ // Take the core
+ let mut park = self.park.take().expect("park missing");
// Drain the queue
while self.next_local_task().is_some() {}
@@ -630,18 +640,23 @@ impl task::Schedule for Arc<Worker> {
use std::ptr::NonNull;
enum Immediate {
+ // Task has been synchronously removed from the Core owned by the
+ // current thread
Removed(Option<Task>),
- Core(bool),
+ // Task is owned by another thread, so we need to notify it to clean
+ // up the task later.
+ MaybeRemote,
}
let immediate = CURRENT.with(|maybe_cx| {
let cx = match maybe_cx {
Some(cx) => cx,
- None => return Immediate::Core(false),
+ None => return Immediate::MaybeRemote,
};
if !self.eq(&cx.worker) {
- return Immediate::Core(cx.core.borrow().is_some());
+ // Task owned by another core, so we need to notify it.
+ return Immediate::MaybeRemote;
}
let mut maybe_core = cx.core.borrow_mut();
@@ -656,15 +671,15 @@ impl task::Schedule for Arc<Worker> {
}
}
- Immediate::Core(false)
+ Immediate::MaybeRemote
});
// Checks if we were called from within a worker, allowing for immediate
// removal of a scheduled task. Else we have to go through the slower
// process below where we remotely mark a task as dropped.
- let worker_has_core = match immediate {
+ match immediate {
Immediate::Removed(task) => return task,
- Immediate::Core(worker_has_core) => worker_has_core,
+ Immediate::MaybeRemote => (),
};
// Track the task to be released by the worker that owns it
@@ -682,10 +697,6 @@ impl task::Schedule for Arc<Worker> {
self.remote().pending_drop.push(task);
- if worker_has_core {
- return None;
- }
-
// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
@@ -799,16 +810,16 @@ impl Shared {
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
- fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
- let mut workers = self.shutdown_workers.lock();
- workers.push((core, worker));
+ fn shutdown(&self, core: Box<Core>) {
+ let mut cores = self.shutdown_cores.lock();
+ cores.push(core);
- if workers.len() != self.remotes.len() {
+ if cores.len() != self.remotes.len() {
return;
}
- for (mut core, worker) in workers.drain(..) {
- core.shutdown(&worker);
+ for mut core in cores.drain(..) {
+ core.shutdown();
}
// Drain the injection queue
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index 917078ef..9fbc0b3c 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -189,6 +189,8 @@ where
let mut lock = self.inner.lock();
+ assert!(!lock.is_shutdown);
+
let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs
index 90ebf6a6..54ec09ad 100644
--- a/tokio/tests/rt_threaded.rs
+++ b/tokio/tests/rt_threaded.rs
@@ -382,6 +382,20 @@ fn max_threads() {
.unwrap();
}
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn hang_on_shutdown() {
+ let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
+ tokio::spawn(async move {
+ tokio::task::block_in_place(|| sync_rx.recv().ok());
+ });
+
+ tokio::spawn(async {
+ tokio::time::sleep(std::time::Duration::from_secs(2)).await;
+ drop(sync_tx);
+ });
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+}
+
fn rt() -> Runtime {
Runtime::new().unwrap()
}