diff options
author | Émile Grégoire <eg@emilegregoire.ca> | 2020-07-28 23:43:19 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-28 20:43:19 -0700 |
commit | 646fbae76535e397ef79dbcaacb945d4c829f666 (patch) | |
tree | 49c00b3825463cae83eef0e1b65ee3d8266980c3 /tokio/src/time | |
parent | 1562bb314482215eb7517e6b8b8bdecbacf10e79 (diff) |
rt: fix potential leak during runtime shutdown (#2649)
JoinHandle of threads created by the pool are now tracked and properly joined at
shutdown. If the thread does not return within the timeout, then it's not joined and
left to the OS for cleanup.
Also, break a cycle between wakers held by the timer and the runtime.
Fixes #2641, #2535
Diffstat (limited to 'tokio/src/time')
-rw-r--r-- | tokio/src/time/driver/atomic_stack.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/driver/mod.rs | 27 | ||||
-rw-r--r-- | tokio/src/time/tests/test_delay.rs | 4 |
3 files changed, 28 insertions, 5 deletions
diff --git a/tokio/src/time/driver/atomic_stack.rs b/tokio/src/time/driver/atomic_stack.rs index d27579f9..c1972a76 100644 --- a/tokio/src/time/driver/atomic_stack.rs +++ b/tokio/src/time/driver/atomic_stack.rs @@ -95,7 +95,7 @@ impl Iterator for AtomicStackEntries { type Item = Arc<Entry>; fn next(&mut self) -> Option<Self::Item> { - if self.ptr.is_null() { + if self.ptr.is_null() || self.ptr == SHUTDOWN { return None; } diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 92a8474a..bb6c28b3 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -82,7 +82,7 @@ use std::{cmp, fmt}; /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] -pub(crate) struct Driver<T> { +pub(crate) struct Driver<T: Park> { /// Shared state inner: Arc<Inner>, @@ -94,6 +94,9 @@ pub(crate) struct Driver<T> { /// Source of "now" instances clock: Clock, + + /// True if the driver is being shutdown + is_shutdown: bool, } /// Timer state shared between `Driver`, `Handle`, and `Registration`. @@ -135,6 +138,7 @@ where wheel: wheel::Wheel::new(), park, clock, + is_shutdown: false, } } @@ -303,10 +307,12 @@ where Ok(()) } -} -impl<T> Drop for Driver<T> { - fn drop(&mut self) { + fn shutdown(&mut self) { + if self.is_shutdown { + return; + } + use std::u64; // Shutdown the stack of entries to process, preventing any new entries @@ -319,6 +325,19 @@ impl<T> Drop for Driver<T> { while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { entry.error(Error::shutdown()); } + + self.park.shutdown(); + + self.is_shutdown = true; + } +} + +impl<T> Drop for Driver<T> +where + T: Park, +{ + fn drop(&mut self) { + self.shutdown(); } } diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs index b708f6fc..b732e458 100644 --- a/tokio/src/time/tests/test_delay.rs +++ b/tokio/src/time/tests/test_delay.rs @@ -351,6 +351,8 @@ fn unpark_is_delayed() { self.0.advance(ms(436)); Ok(()) } + + fn shutdown(&mut self) {} } impl Unpark for MockUnpark { @@ -434,6 +436,8 @@ impl Park for MockPark { self.0.advance(duration); Ok(()) } + + fn shutdown(&mut self) {} } impl Unpark for MockUnpark { |