summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbdonlan <bdonlan@gmail.com>2020-11-05 10:38:37 -0800
committerGitHub <noreply@github.com>2020-11-05 10:38:37 -0800
commitd7e3fcb9ee472d40337776cd5f5ffd51bc50272c (patch)
treecb65ecbea4f04a0c3639dd9b3982e29fea0f5b7f
parent0b3918bce956567cccc617213a56c339a5a21d6f (diff)
rt: remove last slab dependency (#2917)
This removes the last slab dependency by replacing the current slab-based JoinHandle tracking with one based on HashMap instead. Co-authored-by: Bryan Donlan <bdonlan@amazon.com>
-rw-r--r--tokio/src/runtime/blocking/pool.rs59
1 files changed, 45 insertions, 14 deletions
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 6b9fb1bf..9f98d898 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -10,9 +10,7 @@ use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
-use slab::Slab;
-
-use std::collections::VecDeque;
+use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::time::Duration;
@@ -59,7 +57,18 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
- worker_threads: Slab<thread::JoinHandle<()>>,
+ /// Prior to shutdown, we clean up JoinHandles by having each timed-out
+ /// thread join on the previous timed-out thread. This is not strictly
+ /// necessary but helps avoid Valgrind false positives, see
+ /// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666
+ /// for more information.
+ last_exiting_thread: Option<thread::JoinHandle<()>>,
+ /// This holds the JoinHandles for all running threads; on shutdown, the thread
+ /// calling shutdown handles joining on these.
+ worker_threads: HashMap<usize, thread::JoinHandle<()>>,
+ /// This is a counter used to iterate worker_threads in a consistent order (for loom's
+ /// benefit)
+ worker_thread_index: usize,
}
type Task = task::Notified<NoopSchedule>;
@@ -105,7 +114,9 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
- worker_threads: Slab::new(),
+ last_exiting_thread: None,
+ worker_threads: HashMap::new(),
+ worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
@@ -137,12 +148,21 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
- let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
+
+ let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
+ let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new());
drop(shared);
if self.shutdown_rx.wait(timeout) {
- for handle in workers.drain() {
+ let _ = last_exited_thread.map(|th| th.join());
+
+ // Loom requires that execution be deterministic, so sort by thread ID before joining.
+ // (HashMaps use a randomly-seeded hash function, so the order is nondeterministic)
+ let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect();
+ workers.sort_by_key(|(id, _)| *id);
+
+ for (_id, handle) in workers.into_iter() {
let _ = handle.join();
}
}
@@ -204,11 +224,13 @@ impl Spawner {
if let Some(shutdown_tx) = shutdown_tx {
let mut shared = self.inner.shared.lock();
- let entry = shared.worker_threads.vacant_entry();
- let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
+ let id = shared.worker_thread_index;
+ shared.worker_thread_index += 1;
- entry.insert(handle);
+ let handle = self.spawn_thread(shutdown_tx, rt, id);
+
+ shared.worker_threads.insert(id, handle);
}
Ok(())
@@ -218,7 +240,7 @@ impl Spawner {
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
- worker_id: usize,
+ id: usize,
) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
@@ -232,7 +254,7 @@ impl Spawner {
.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
- rt.blocking_spawner.inner.run(worker_id);
+ rt.blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
.unwrap()
@@ -240,12 +262,13 @@ impl Spawner {
}
impl Inner {
- fn run(&self, worker_id: usize) {
+ fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
f()
}
let mut shared = self.shared.lock();
+ let mut join_on_thread = None;
'main: loop {
// BUSY
@@ -276,7 +299,11 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
- shared.worker_threads.remove(worker_id);
+ // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
+ // This isn't done when shutting down, because the thread calling shutdown will
+ // handle joining everything.
+ let my_handle = shared.worker_threads.remove(&worker_thread_id);
+ join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
@@ -323,6 +350,10 @@ impl Inner {
if let Some(f) = &self.before_stop {
f()
}
+
+ if let Some(handle) = join_on_thread {
+ let _ = handle.join();
+ }
}
}