summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs61
-rw-r--r--tokio/src/time/driver/mod.rs2
-rw-r--r--tokio/tests/rt_threaded.rs14
3 files changed, 52 insertions, 25 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 400ddee3..31712e44 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -78,11 +78,12 @@ pub(super) struct Shared {
/// Coordinates idle workers
idle: Idle,
- /// Workers have have observed the shutdown signal
+ /// Cores that have observed the shutdown signal
///
/// The core is **not** placed back in the worker to avoid it from being
/// stolen by a thread that was spawned as part of `block_in_place`.
- shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
+ #[allow(clippy::vec_box)] // we're moving an already-boxed value
+ shutdown_cores: Mutex<Vec<Box<Core>>>,
}
/// Used to communicate with a worker from other threads.
@@ -157,7 +158,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
remotes: remotes.into_boxed_slice(),
inject: queue::Inject::new(),
idle: Idle::new(size),
- shutdown_workers: Mutex::new(vec![]),
+ shutdown_cores: Mutex::new(vec![]),
});
let mut launch = Launch(vec![]);
@@ -328,8 +329,10 @@ impl Context {
}
}
+ core.pre_shutdown(&self.worker);
+
// Signal shutdown
- self.worker.shared.shutdown(core, self.worker.clone());
+ self.worker.shared.shutdown(core);
Err(())
}
@@ -546,11 +549,9 @@ impl Core {
}
}
- // Shutdown the core
- fn shutdown(&mut self, worker: &Worker) {
- // Take the core
- let mut park = self.park.take().expect("park missing");
-
+ // Signals all tasks to shut down, and waits for them to complete. Must run
+ // before we enter the single-threaded phase of shutdown processing.
+ fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
for header in self.tasks.iter() {
header.shutdown();
@@ -564,8 +565,17 @@ impl Core {
}
// Wait until signalled
+ let park = self.park.as_mut().expect("park missing");
park.park().expect("park failed");
}
+ }
+
+ // Shutdown the core
+ fn shutdown(&mut self) {
+ assert!(self.tasks.is_empty());
+
+ // Take the core
+ let mut park = self.park.take().expect("park missing");
// Drain the queue
while self.next_local_task().is_some() {}
@@ -630,18 +640,23 @@ impl task::Schedule for Arc<Worker> {
use std::ptr::NonNull;
enum Immediate {
+ // Task has been synchronously removed from the Core owned by the
+ // current thread
Removed(Option<Task>),
- Core(bool),
+ // Task is owned by another thread, so we need to notify it to clean
+ // up the task later.
+ MaybeRemote,
}
let immediate = CURRENT.with(|maybe_cx| {
let cx = match maybe_cx {
Some(cx) => cx,
- None => return Immediate::Core(false),
+ None => return Immediate::MaybeRemote,
};
if !self.eq(&cx.worker) {
- return Immediate::Core(cx.core.borrow().is_some());
+ // Task owned by another core, so we need to notify it.
+ return Immediate::MaybeRemote;
}
let mut maybe_core = cx.core.borrow_mut();
@@ -656,15 +671,15 @@ impl task::Schedule for Arc<Worker> {
}
}
- Immediate::Core(false)
+ Immediate::MaybeRemote
});
// Checks if we were called from within a worker, allowing for immediate
// removal of a scheduled task. Else we have to go through the slower
// process below where we remotely mark a task as dropped.
- let worker_has_core = match immediate {
+ match immediate {
Immediate::Removed(task) => return task,
- Immediate::Core(worker_has_core) => worker_has_core,
+ Immediate::MaybeRemote => (),
};
// Track the task to be released by the worker that owns it
@@ -682,10 +697,6 @@ impl task::Schedule for Arc<Worker> {
self.remote().pending_drop.push(task);
- if worker_has_core {
- return None;
- }
-
// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
@@ -799,16 +810,16 @@ impl Shared {
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
- fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
- let mut workers = self.shutdown_workers.lock();
- workers.push((core, worker));
+ fn shutdown(&self, core: Box<Core>) {
+ let mut cores = self.shutdown_cores.lock();
+ cores.push(core);
- if workers.len() != self.remotes.len() {
+ if cores.len() != self.remotes.len() {
return;
}
- for (mut core, worker) in workers.drain(..) {
- core.shutdown(&worker);
+ for mut core in cores.drain(..) {
+ core.shutdown();
}
// Drain the injection queue
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index 917078ef..9fbc0b3c 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -189,6 +189,8 @@ where
let mut lock = self.inner.lock();
+ assert!(!lock.is_shutdown);
+
let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs
index 90ebf6a6..54ec09ad 100644
--- a/tokio/tests/rt_threaded.rs
+++ b/tokio/tests/rt_threaded.rs
@@ -382,6 +382,20 @@ fn max_threads() {
.unwrap();
}
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn hang_on_shutdown() {
+ let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
+ tokio::spawn(async move {
+ tokio::task::block_in_place(|| sync_rx.recv().ok());
+ });
+
+ tokio::spawn(async {
+ tokio::time::sleep(std::time::Duration::from_secs(2)).await;
+ drop(sync_tx);
+ });
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+}
+
fn rt() -> Runtime {
Runtime::new().unwrap()
}