diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2018-10-10 09:05:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-10 09:05:36 +0200 |
commit | adb0ba71d4819be45f3b88ad1e8b1152ad936874 (patch) | |
tree | bbcd8ad4e4c7f0e656ff7458ee6be87fdfb14b6e /tokio-threadpool | |
parent | bfa6766f3cbe113357c730f1782f842153c3f86f (diff) |
threadpool: worker threads shouldn't respect keep_alive (#692)
<!--
Thank you for your Pull Request. Please provide a description above and review
the requirements below.
Bug fixes and new features should include tests.
Contributors guide: https://github.com/tokio-rs/tokio/blob/master/CONTRIBUTING.md
-->
## Motivation
Now that each worker thread drives its own reactor, reactors have to be driven until the threadpool shuts down. We mustn't use the `keep_alive` setting to shut down a worker thread if it doesn't receive an event from the reactor for a certain duration of time.
<!--
Explain the context and why you're making that change. What is the problem
you're trying to solve? In some cases there is not a problem and this can be
thought of as being the motivation for your change.
-->
## Solution
Just ignore the `keep_alive` setting when parking in `Worker::sleep`.
<!--
Summarize the solution and provide any necessary context needed to understand
the code change.
-->
Diffstat (limited to 'tokio-threadpool')
-rw-r--r-- | tokio-threadpool/src/builder.rs | 12 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/mod.rs | 114 | ||||
-rw-r--r-- | tokio-threadpool/tests/threadpool.rs | 44 |
3 files changed, 12 insertions, 158 deletions
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index 5d5a7e5a..a02d0e1d 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -170,14 +170,14 @@ impl Builder { self } - /// Set the worker thread keep alive duration + /// Set the thread keep alive duration /// - /// If set, a worker thread will wait for up to the specified duration for - /// work, at which point the thread will shutdown. When work becomes - /// available, a new thread will eventually be spawned to replace the one - /// that shut down. + /// If set, a thread that has completed a `blocking` call will wait for up + /// to the specified duration to become a worker thread again. Once the + /// duration elapses, the thread will shutdown. /// - /// When the value is `None`, the thread will wait for work forever. + /// When the value is `None`, the thread will wait to become a worker + /// thread forever. /// /// The default value is `None`. /// diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs index 72931649..0687b0ac 100644 --- a/tokio-threadpool/src/worker/mod.rs +++ b/tokio-threadpool/src/worker/mod.rs @@ -26,7 +26,7 @@ use std::rc::Rc; use std::sync::atomic::Ordering::{AcqRel, Acquire}; use std::sync::Arc; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; /// Thread worker /// @@ -667,39 +667,14 @@ impl Worker { trace!(" -> starting to sleep; idx={}", self.id.0); - let sleep_until = self.inner.config.keep_alive - .map(|dur| Instant::now() + dur); - // The state has been transitioned to sleeping, we can now wait by // calling the parker. This is done in a loop as condvars can wakeup // spuriously. - 'sleep: loop { - let mut drop_thread = false; - - match sleep_until { - Some(when) => { - let now = Instant::now(); - - if when >= now { - drop_thread = true; - } - - let dur = when - now; - - unsafe { - (*self.entry().park.get()) - .park_timeout(dur) - .unwrap(); - } - } - None => { - unsafe { - (*self.entry().park.get()) - .park() - .unwrap(); - } - } + unsafe { + (*self.entry().park.get()) + .park() + .unwrap(); } trace!(" -> wakeup; idx={}", self.id.0); @@ -710,10 +685,7 @@ impl Worker { // If the worker has been notified, transition back to running. match state.lifecycle() { Sleeping => { - if !drop_thread { - // This goes back to the outer loop. - continue 'sleep; - } + // Still sleeping. Park again. } Notified | Signaled => { // Transition back to running @@ -738,80 +710,6 @@ impl Worker { unreachable!(); } } - - // The thread has reached the maximum permitted sleep duration. - // It is now going to begin to shutdown. - // - // Doing this requires first releasing the thread to the backup - // stack. Because the moment the worker state is transitioned to - // `Shutdown`, other threads **expect** the thread's backup - // entry to be available on the backup stack. - // - // However, it is possible that the worker is notified between - // us pushing the backup entry onto the backup stack and - // transitioning the worker to `Shutdown`. If this happens, the - // current thread lost the token to run the backup entry and has - // to shutdown no matter what. - // - // To deal with this, the worker is transitioned to another - // thread. This is a pretty rare condition. - // - // If pushing on the backup stack fails, then the pool is being - // terminated and the thread should just shutdown - let backup_push_err = self.inner.release_backup(self.backup_id).is_err(); - - if backup_push_err { - debug_assert!({ - let state: State = self.entry().state.load(Acquire).into(); - state.lifecycle() != Sleeping - }); - - self.should_finalize.set(true); - - return true; - } - - loop { - let mut next = state; - next.set_lifecycle(Shutdown); - - let actual: State = self.entry().state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if actual == state { - // Transitioned to a shutdown state - return false; - } - - match actual.lifecycle() { - Sleeping => { - state = actual; - } - Notified | Signaled => { - // Transition back to running - loop { - let mut next = state; - next.set_lifecycle(Running); - - let actual = self.entry().state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if actual == state { - self.inner.spawn_thread(self.id.clone(), &self.inner); - return false; - } - - state = actual; - } - } - Shutdown | Running => { - // To get here, the block above transitioned the state to - // `Sleeping`. No other thread can concurrently - // transition to `Shutdown` or `Running`. - unreachable!(); - } - } - } } } diff --git a/tokio-threadpool/tests/threadpool.rs b/tokio-threadpool/tests/threadpool.rs index d3ffb26a..0df1d1c2 100644 --- a/tokio-threadpool/tests/threadpool.rs +++ b/tokio-threadpool/tests/threadpool.rs @@ -205,50 +205,6 @@ fn drop_threadpool_drops_futures() { } #[test] -fn thread_shutdown_timeout() { - use std::sync::Mutex; - - let _ = ::env_logger::try_init(); - - let (shutdown_tx, shutdown_rx) = mpsc::channel(); - let (complete_tx, complete_rx) = mpsc::channel(); - - let t = Mutex::new(shutdown_tx); - - let pool = Builder::new() - .keep_alive(Some(Duration::from_millis(200))) - .around_worker(move |w, _| { - w.run(); - // There could be multiple threads here - let _ = t.lock().unwrap().send(()); - }) - .build(); - let tx = pool.sender().clone(); - - let t = complete_tx.clone(); - tx.spawn(lazy(move || { - t.send(()).unwrap(); - Ok(()) - })).unwrap(); - - // The future completes - complete_rx.recv().unwrap(); - - // The thread shuts down eventually - shutdown_rx.recv().unwrap(); - - // Futures can still be run - tx.spawn(lazy(move || { - complete_tx.send(()).unwrap(); - Ok(()) - })).unwrap(); - - complete_rx.recv().unwrap(); - - pool.shutdown().wait().unwrap(); -} - -#[test] fn many_oneshot_futures() { const NUM: usize = 10_000; |