summaryrefslogtreecommitdiffstats
path: root/tokio/src/time
diff options
context:
space:
mode:
authorÉmile Grégoire <eg@emilegregoire.ca>2020-07-28 23:43:19 -0400
committerGitHub <noreply@github.com>2020-07-28 20:43:19 -0700
commit646fbae76535e397ef79dbcaacb945d4c829f666 (patch)
tree49c00b3825463cae83eef0e1b65ee3d8266980c3 /tokio/src/time
parent1562bb314482215eb7517e6b8b8bdecbacf10e79 (diff)
rt: fix potential leak during runtime shutdown (#2649)
JoinHandle of threads created by the pool are now tracked and properly joined at shutdown. If the thread does not return within the timeout, then it's not joined and left to the OS for cleanup. Also, break a cycle between wakers held by the timer and the runtime. Fixes #2641, #2535
Diffstat (limited to 'tokio/src/time')
-rw-r--r--tokio/src/time/driver/atomic_stack.rs2
-rw-r--r--tokio/src/time/driver/mod.rs27
-rw-r--r--tokio/src/time/tests/test_delay.rs4
3 files changed, 28 insertions, 5 deletions
diff --git a/tokio/src/time/driver/atomic_stack.rs b/tokio/src/time/driver/atomic_stack.rs
index d27579f9..c1972a76 100644
--- a/tokio/src/time/driver/atomic_stack.rs
+++ b/tokio/src/time/driver/atomic_stack.rs
@@ -95,7 +95,7 @@ impl Iterator for AtomicStackEntries {
type Item = Arc<Entry>;
fn next(&mut self) -> Option<Self::Item> {
- if self.ptr.is_null() {
+ if self.ptr.is_null() || self.ptr == SHUTDOWN {
return None;
}
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index 92a8474a..bb6c28b3 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -82,7 +82,7 @@ use std::{cmp, fmt};
/// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval
#[derive(Debug)]
-pub(crate) struct Driver<T> {
+pub(crate) struct Driver<T: Park> {
/// Shared state
inner: Arc<Inner>,
@@ -94,6 +94,9 @@ pub(crate) struct Driver<T> {
/// Source of "now" instances
clock: Clock,
+
+ /// True if the driver is being shutdown
+ is_shutdown: bool,
}
/// Timer state shared between `Driver`, `Handle`, and `Registration`.
@@ -135,6 +138,7 @@ where
wheel: wheel::Wheel::new(),
park,
clock,
+ is_shutdown: false,
}
}
@@ -303,10 +307,12 @@ where
Ok(())
}
-}
-impl<T> Drop for Driver<T> {
- fn drop(&mut self) {
+ fn shutdown(&mut self) {
+ if self.is_shutdown {
+ return;
+ }
+
use std::u64;
// Shutdown the stack of entries to process, preventing any new entries
@@ -319,6 +325,19 @@ impl<T> Drop for Driver<T> {
while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error(Error::shutdown());
}
+
+ self.park.shutdown();
+
+ self.is_shutdown = true;
+ }
+}
+
+impl<T> Drop for Driver<T>
+where
+ T: Park,
+{
+ fn drop(&mut self) {
+ self.shutdown();
}
}
diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs
index b708f6fc..b732e458 100644
--- a/tokio/src/time/tests/test_delay.rs
+++ b/tokio/src/time/tests/test_delay.rs
@@ -351,6 +351,8 @@ fn unpark_is_delayed() {
self.0.advance(ms(436));
Ok(())
}
+
+ fn shutdown(&mut self) {}
}
impl Unpark for MockUnpark {
@@ -434,6 +436,8 @@ impl Park for MockPark {
self.0.advance(duration);
Ok(())
}
+
+ fn shutdown(&mut self) {}
}
impl Unpark for MockUnpark {