diff options
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 61 | ||||
-rw-r--r-- | tokio/src/time/driver/mod.rs | 2 | ||||
-rw-r--r-- | tokio/tests/rt_threaded.rs | 14 |
3 files changed, 52 insertions, 25 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 400ddee3..31712e44 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -78,11 +78,12 @@ pub(super) struct Shared { /// Coordinates idle workers idle: Idle, - /// Workers have have observed the shutdown signal + /// Cores that have observed the shutdown signal /// /// The core is **not** placed back in the worker to avoid it from being /// stolen by a thread that was spawned as part of `block_in_place`. - shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>, + #[allow(clippy::vec_box)] // we're moving an already-boxed value + shutdown_cores: Mutex<Vec<Box<Core>>>, } /// Used to communicate with a worker from other threads. @@ -157,7 +158,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { remotes: remotes.into_boxed_slice(), inject: queue::Inject::new(), idle: Idle::new(size), - shutdown_workers: Mutex::new(vec![]), + shutdown_cores: Mutex::new(vec![]), }); let mut launch = Launch(vec![]); @@ -328,8 +329,10 @@ impl Context { } } + core.pre_shutdown(&self.worker); + // Signal shutdown - self.worker.shared.shutdown(core, self.worker.clone()); + self.worker.shared.shutdown(core); Err(()) } @@ -546,11 +549,9 @@ impl Core { } } - // Shutdown the core - fn shutdown(&mut self, worker: &Worker) { - // Take the core - let mut park = self.park.take().expect("park missing"); - + // Signals all tasks to shut down, and waits for them to complete. Must run + // before we enter the single-threaded phase of shutdown processing. + fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. for header in self.tasks.iter() { header.shutdown(); @@ -564,8 +565,17 @@ impl Core { } // Wait until signalled + let park = self.park.as_mut().expect("park missing"); park.park().expect("park failed"); } + } + + // Shutdown the core + fn shutdown(&mut self) { + assert!(self.tasks.is_empty()); + + // Take the core + let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} @@ -630,18 +640,23 @@ impl task::Schedule for Arc<Worker> { use std::ptr::NonNull; enum Immediate { + // Task has been synchronously removed from the Core owned by the + // current thread Removed(Option<Task>), - Core(bool), + // Task is owned by another thread, so we need to notify it to clean + // up the task later. + MaybeRemote, } let immediate = CURRENT.with(|maybe_cx| { let cx = match maybe_cx { Some(cx) => cx, - None => return Immediate::Core(false), + None => return Immediate::MaybeRemote, }; if !self.eq(&cx.worker) { - return Immediate::Core(cx.core.borrow().is_some()); + // Task owned by another core, so we need to notify it. + return Immediate::MaybeRemote; } let mut maybe_core = cx.core.borrow_mut(); @@ -656,15 +671,15 @@ impl task::Schedule for Arc<Worker> { } } - Immediate::Core(false) + Immediate::MaybeRemote }); // Checks if we were called from within a worker, allowing for immediate // removal of a scheduled task. Else we have to go through the slower // process below where we remotely mark a task as dropped. - let worker_has_core = match immediate { + match immediate { Immediate::Removed(task) => return task, - Immediate::Core(worker_has_core) => worker_has_core, + Immediate::MaybeRemote => (), }; // Track the task to be released by the worker that owns it @@ -682,10 +697,6 @@ impl task::Schedule for Arc<Worker> { self.remote().pending_drop.push(task); - if worker_has_core { - return None; - } - // The worker core has been handed off to another thread. In the // event that the scheduler is currently shutting down, the thread // that owns the task may be waiting on the release to complete @@ -799,16 +810,16 @@ impl Shared { /// its core back into its handle. /// /// If all workers have reached this point, the final cleanup is performed. - fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) { - let mut workers = self.shutdown_workers.lock(); - workers.push((core, worker)); + fn shutdown(&self, core: Box<Core>) { + let mut cores = self.shutdown_cores.lock(); + cores.push(core); - if workers.len() != self.remotes.len() { + if cores.len() != self.remotes.len() { return; } - for (mut core, worker) in workers.drain(..) { - core.shutdown(&worker); + for mut core in cores.drain(..) { + core.shutdown(); } // Drain the injection queue diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 917078ef..9fbc0b3c 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -189,6 +189,8 @@ where let mut lock = self.inner.lock(); + assert!(!lock.is_shutdown); + let next_wake = lock.wheel.next_expiration_time(); lock.next_wake = next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 90ebf6a6..54ec09ad 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -382,6 +382,20 @@ fn max_threads() { .unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn hang_on_shutdown() { + let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>(); + tokio::spawn(async move { + tokio::task::block_in_place(|| sync_rx.recv().ok()); + }); + + tokio::spawn(async { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + drop(sync_tx); + }); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; +} + fn rt() -> Runtime { Runtime::new().unwrap() } |