summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorÉmile Grégoire <eg@emilegregoire.ca>2020-07-28 23:43:19 -0400
committerGitHub <noreply@github.com>2020-07-28 20:43:19 -0700
commit646fbae76535e397ef79dbcaacb945d4c829f666 (patch)
tree49c00b3825463cae83eef0e1b65ee3d8266980c3 /tokio/src/runtime
parent1562bb314482215eb7517e6b8b8bdecbacf10e79 (diff)
rt: fix potential leak during runtime shutdown (#2649)
JoinHandle of threads created by the pool are now tracked and properly joined at shutdown. If the thread does not return within the timeout, then it's not joined and left to the OS for cleanup. Also, break a cycle between wakers held by the timer and the runtime. Fixes #2641, #2535
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/blocking/pool.rs34
-rw-r--r--tokio/src/runtime/blocking/shutdown.rs11
-rw-r--r--tokio/src/runtime/mod.rs9
-rw-r--r--tokio/src/runtime/park.rs12
-rw-r--r--tokio/src/runtime/spawner.rs11
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs6
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs2
7 files changed, 69 insertions, 16 deletions
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 40d417b1..c5d464c8 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -8,6 +8,8 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
+use slab::Slab;
+
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
@@ -41,6 +43,7 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,
+ // Maximum number of threads
thread_cap: usize,
}
@@ -51,6 +54,7 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
+ worker_threads: Slab<thread::JoinHandle<()>>,
}
type Task = task::Notified<NoopSchedule>;
@@ -96,6 +100,7 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
+ worker_threads: Slab::new(),
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
@@ -126,10 +131,15 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
+ let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
drop(shared);
- self.shutdown_rx.wait(timeout);
+ if self.shutdown_rx.wait(timeout) {
+ for handle in workers.drain() {
+ let _ = handle.join();
+ }
+ }
}
}
@@ -187,13 +197,23 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- self.spawn_thread(shutdown_tx, rt);
+ let mut shared = self.inner.shared.lock().unwrap();
+ let entry = shared.worker_threads.vacant_entry();
+
+ let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
+
+ entry.insert(handle);
}
Ok(())
}
- fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
+ fn spawn_thread(
+ &self,
+ shutdown_tx: shutdown::Sender,
+ rt: &Handle,
+ worker_id: usize,
+ ) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
if let Some(stack_size) = self.inner.stack_size {
@@ -207,16 +227,16 @@ impl Spawner {
// Only the reference should be moved into the closure
let rt = &rt;
rt.enter(move || {
- rt.blocking_spawner.inner.run();
+ rt.blocking_spawner.inner.run(worker_id);
drop(shutdown_tx);
})
})
- .unwrap();
+ .unwrap()
}
}
impl Inner {
- fn run(&self) {
+ fn run(&self, worker_id: usize) {
if let Some(f) = &self.after_start {
f()
}
@@ -252,6 +272,8 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
+ shared.worker_threads.remove(worker_id);
+
break 'main;
}
diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs
index e76a7013..3b6cc593 100644
--- a/tokio/src/runtime/blocking/shutdown.rs
+++ b/tokio/src/runtime/blocking/shutdown.rs
@@ -32,11 +32,13 @@ impl Receiver {
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
- pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
+ ///
+ /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
+ pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::enter::try_enter;
if timeout == Some(Duration::from_nanos(0)) {
- return;
+ return true;
}
let mut e = match try_enter(false) {
@@ -44,7 +46,7 @@ impl Receiver {
_ => {
if std::thread::panicking() {
// Don't panic in a panic
- return;
+ return false;
} else {
panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \
@@ -60,9 +62,10 @@ impl Receiver {
// current thread (usually, shutting down a runtime stored in a
// thread-local).
if let Some(timeout) = timeout {
- let _ = e.block_on_timeout(&mut self.rx, timeout);
+ e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else {
let _ = e.block_on(&mut self.rx);
+ true
}
}
}
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index 300a1465..637f38ca 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -542,11 +542,10 @@ impl Runtime {
/// runtime.shutdown_timeout(Duration::from_millis(100));
/// }
/// ```
- pub fn shutdown_timeout(self, duration: Duration) {
- let Runtime {
- mut blocking_pool, ..
- } = self;
- blocking_pool.shutdown(Some(duration));
+ pub fn shutdown_timeout(mut self, duration: Duration) {
+ // Wakeup and shutdown all the worker threads
+ self.handle.spawner.shutdown();
+ self.blocking_pool.shutdown(Some(duration));
}
/// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs
index ee437d1d..1dcf65af 100644
--- a/tokio/src/runtime/park.rs
+++ b/tokio/src/runtime/park.rs
@@ -104,6 +104,10 @@ impl Park for Parker {
Ok(())
}
}
+
+ fn shutdown(&mut self) {
+ self.inner.shutdown();
+ }
}
impl Unpark for Unparker {
@@ -242,4 +246,12 @@ impl Inner {
fn unpark_driver(&self) {
self.shared.handle.unpark();
}
+
+ fn shutdown(&self) {
+ if let Some(mut driver) = self.shared.driver.try_lock() {
+ driver.shutdown();
+ }
+
+ self.condvar.notify_all();
+ }
}
diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs
index d136945c..c5f2d17c 100644
--- a/tokio/src/runtime/spawner.rs
+++ b/tokio/src/runtime/spawner.rs
@@ -18,6 +18,17 @@ pub(crate) enum Spawner {
ThreadPool(thread_pool::Spawner),
}
+impl Spawner {
+ pub(crate) fn shutdown(&mut self) {
+ #[cfg(feature = "rt-threaded")]
+ {
+ if let Spawner::ThreadPool(spawner) = self {
+ spawner.shutdown();
+ }
+ }
+ }
+}
+
cfg_rt_core! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index ced9712d..d30e8d45 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -91,7 +91,7 @@ impl fmt::Debug for ThreadPool {
impl Drop for ThreadPool {
fn drop(&mut self) {
- self.spawner.shared.close();
+ self.spawner.shutdown();
}
}
@@ -108,6 +108,10 @@ impl Spawner {
self.shared.schedule(task, false);
handle
}
+
+ pub(crate) fn shutdown(&mut self) {
+ self.shared.close();
+ }
}
impl fmt::Debug for Spawner {
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index c53c9384..ac052854 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -572,6 +572,8 @@ impl Core {
// Drain the queue
while self.next_local_task().is_some() {}
+
+ park.shutdown();
}
fn drain_pending_drop(&mut self, worker: &Worker) {