From d7e3fcb9ee472d40337776cd5f5ffd51bc50272c Mon Sep 17 00:00:00 2001 From: bdonlan Date: Thu, 5 Nov 2020 10:38:37 -0800 Subject: rt: remove last slab dependency (#2917) This removes the last slab dependency by replacing the current slab-based JoinHandle tracking with one based on HashMap instead. Co-authored-by: Bryan Donlan --- tokio/src/runtime/blocking/pool.rs | 59 +++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 14 deletions(-) (limited to 'tokio') diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 6b9fb1bf..9f98d898 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -10,9 +10,7 @@ use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; -use slab::Slab; - -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::time::Duration; @@ -59,7 +57,18 @@ struct Shared { num_notify: u32, shutdown: bool, shutdown_tx: Option, - worker_threads: Slab>, + /// Prior to shutdown, we clean up JoinHandles by having each timed-out + /// thread join on the previous timed-out thread. This is not strictly + /// necessary but helps avoid Valgrind false positives, see + /// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666 + /// for more information. + last_exiting_thread: Option>, + /// This holds the JoinHandles for all running threads; on shutdown, the thread + /// calling shutdown handles joining on these. + worker_threads: HashMap>, + /// This is a counter used to iterate worker_threads in a consistent order (for loom's + /// benefit) + worker_thread_index: usize, } type Task = task::Notified; @@ -105,7 +114,9 @@ impl BlockingPool { num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), - worker_threads: Slab::new(), + last_exiting_thread: None, + worker_threads: HashMap::new(), + worker_thread_index: 0, }), condvar: Condvar::new(), thread_name: builder.thread_name.clone(), @@ -137,12 +148,21 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); - let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new()); + + let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); + let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new()); drop(shared); if self.shutdown_rx.wait(timeout) { - for handle in workers.drain() { + let _ = last_exited_thread.map(|th| th.join()); + + // Loom requires that execution be deterministic, so sort by thread ID before joining. + // (HashMaps use a randomly-seeded hash function, so the order is nondeterministic) + let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect(); + workers.sort_by_key(|(id, _)| *id); + + for (_id, handle) in workers.into_iter() { let _ = handle.join(); } } @@ -204,11 +224,13 @@ impl Spawner { if let Some(shutdown_tx) = shutdown_tx { let mut shared = self.inner.shared.lock(); - let entry = shared.worker_threads.vacant_entry(); - let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); + let id = shared.worker_thread_index; + shared.worker_thread_index += 1; - entry.insert(handle); + let handle = self.spawn_thread(shutdown_tx, rt, id); + + shared.worker_threads.insert(id, handle); } Ok(()) @@ -218,7 +240,7 @@ impl Spawner { &self, shutdown_tx: shutdown::Sender, rt: &Handle, - worker_id: usize, + id: usize, ) -> thread::JoinHandle<()> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); @@ -232,7 +254,7 @@ impl Spawner { .spawn(move || { // Only the reference should be moved into the closure let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(worker_id); + rt.blocking_spawner.inner.run(id); drop(shutdown_tx); }) .unwrap() @@ -240,12 +262,13 @@ impl Spawner { } impl Inner { - fn run(&self, worker_id: usize) { + fn run(&self, worker_thread_id: usize) { if let Some(f) = &self.after_start { f() } let mut shared = self.shared.lock(); + let mut join_on_thread = None; 'main: loop { // BUSY @@ -276,7 +299,11 @@ impl Inner { // Even if the condvar "timed out", if the pool is entering the // shutdown phase, we want to perform the cleanup logic. if !shared.shutdown && timeout_result.timed_out() { - shared.worker_threads.remove(worker_id); + // We'll join the prior timed-out thread's JoinHandle after dropping the lock. + // This isn't done when shutting down, because the thread calling shutdown will + // handle joining everything. + let my_handle = shared.worker_threads.remove(&worker_thread_id); + join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle); break 'main; } @@ -323,6 +350,10 @@ impl Inner { if let Some(f) = &self.before_stop { f() } + + if let Some(handle) = join_on_thread { + let _ = handle.join(); + } } } -- cgit v1.2.3