summaryrefslogtreecommitdiffstats
path: root/tokio-threadpool
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2018-10-10 09:05:36 +0200
committerGitHub <noreply@github.com>2018-10-10 09:05:36 +0200
commitadb0ba71d4819be45f3b88ad1e8b1152ad936874 (patch)
treebbcd8ad4e4c7f0e656ff7458ee6be87fdfb14b6e /tokio-threadpool
parentbfa6766f3cbe113357c730f1782f842153c3f86f (diff)
threadpool: worker threads shouldn't respect keep_alive (#692)
<!-- Thank you for your Pull Request. Please provide a description above and review the requirements below. Bug fixes and new features should include tests. Contributors guide: https://github.com/tokio-rs/tokio/blob/master/CONTRIBUTING.md --> ## Motivation Now that each worker thread drives its own reactor, reactors have to be driven until the threadpool shuts down. We mustn't use the `keep_alive` setting to shut down a worker thread if it doesn't receive an event from the reactor for a certain duration of time. <!-- Explain the context and why you're making that change. What is the problem you're trying to solve? In some cases there is not a problem and this can be thought of as being the motivation for your change. --> ## Solution Just ignore the `keep_alive` setting when parking in `Worker::sleep`. <!-- Summarize the solution and provide any necessary context needed to understand the code change. -->
Diffstat (limited to 'tokio-threadpool')
-rw-r--r--tokio-threadpool/src/builder.rs12
-rw-r--r--tokio-threadpool/src/worker/mod.rs114
-rw-r--r--tokio-threadpool/tests/threadpool.rs44
3 files changed, 12 insertions, 158 deletions
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs
index 5d5a7e5a..a02d0e1d 100644
--- a/tokio-threadpool/src/builder.rs
+++ b/tokio-threadpool/src/builder.rs
@@ -170,14 +170,14 @@ impl Builder {
self
}
- /// Set the worker thread keep alive duration
+ /// Set the thread keep alive duration
///
- /// If set, a worker thread will wait for up to the specified duration for
- /// work, at which point the thread will shutdown. When work becomes
- /// available, a new thread will eventually be spawned to replace the one
- /// that shut down.
+ /// If set, a thread that has completed a `blocking` call will wait for up
+ /// to the specified duration to become a worker thread again. Once the
+ /// duration elapses, the thread will shutdown.
///
- /// When the value is `None`, the thread will wait for work forever.
+ /// When the value is `None`, the thread will wait to become a worker
+ /// thread forever.
///
/// The default value is `None`.
///
diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs
index 72931649..0687b0ac 100644
--- a/tokio-threadpool/src/worker/mod.rs
+++ b/tokio-threadpool/src/worker/mod.rs
@@ -26,7 +26,7 @@ use std::rc::Rc;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::sync::Arc;
use std::thread;
-use std::time::{Duration, Instant};
+use std::time::Duration;
/// Thread worker
///
@@ -667,39 +667,14 @@ impl Worker {
trace!(" -> starting to sleep; idx={}", self.id.0);
- let sleep_until = self.inner.config.keep_alive
- .map(|dur| Instant::now() + dur);
-
// The state has been transitioned to sleeping, we can now wait by
// calling the parker. This is done in a loop as condvars can wakeup
// spuriously.
- 'sleep:
loop {
- let mut drop_thread = false;
-
- match sleep_until {
- Some(when) => {
- let now = Instant::now();
-
- if when >= now {
- drop_thread = true;
- }
-
- let dur = when - now;
-
- unsafe {
- (*self.entry().park.get())
- .park_timeout(dur)
- .unwrap();
- }
- }
- None => {
- unsafe {
- (*self.entry().park.get())
- .park()
- .unwrap();
- }
- }
+ unsafe {
+ (*self.entry().park.get())
+ .park()
+ .unwrap();
}
trace!(" -> wakeup; idx={}", self.id.0);
@@ -710,10 +685,7 @@ impl Worker {
// If the worker has been notified, transition back to running.
match state.lifecycle() {
Sleeping => {
- if !drop_thread {
- // This goes back to the outer loop.
- continue 'sleep;
- }
+ // Still sleeping. Park again.
}
Notified | Signaled => {
// Transition back to running
@@ -738,80 +710,6 @@ impl Worker {
unreachable!();
}
}
-
- // The thread has reached the maximum permitted sleep duration.
- // It is now going to begin to shutdown.
- //
- // Doing this requires first releasing the thread to the backup
- // stack. Because the moment the worker state is transitioned to
- // `Shutdown`, other threads **expect** the thread's backup
- // entry to be available on the backup stack.
- //
- // However, it is possible that the worker is notified between
- // us pushing the backup entry onto the backup stack and
- // transitioning the worker to `Shutdown`. If this happens, the
- // current thread lost the token to run the backup entry and has
- // to shutdown no matter what.
- //
- // To deal with this, the worker is transitioned to another
- // thread. This is a pretty rare condition.
- //
- // If pushing on the backup stack fails, then the pool is being
- // terminated and the thread should just shutdown
- let backup_push_err = self.inner.release_backup(self.backup_id).is_err();
-
- if backup_push_err {
- debug_assert!({
- let state: State = self.entry().state.load(Acquire).into();
- state.lifecycle() != Sleeping
- });
-
- self.should_finalize.set(true);
-
- return true;
- }
-
- loop {
- let mut next = state;
- next.set_lifecycle(Shutdown);
-
- let actual: State = self.entry().state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
-
- if actual == state {
- // Transitioned to a shutdown state
- return false;
- }
-
- match actual.lifecycle() {
- Sleeping => {
- state = actual;
- }
- Notified | Signaled => {
- // Transition back to running
- loop {
- let mut next = state;
- next.set_lifecycle(Running);
-
- let actual = self.entry().state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
-
- if actual == state {
- self.inner.spawn_thread(self.id.clone(), &self.inner);
- return false;
- }
-
- state = actual;
- }
- }
- Shutdown | Running => {
- // To get here, the block above transitioned the state to
- // `Sleeping`. No other thread can concurrently
- // transition to `Shutdown` or `Running`.
- unreachable!();
- }
- }
- }
}
}
diff --git a/tokio-threadpool/tests/threadpool.rs b/tokio-threadpool/tests/threadpool.rs
index d3ffb26a..0df1d1c2 100644
--- a/tokio-threadpool/tests/threadpool.rs
+++ b/tokio-threadpool/tests/threadpool.rs
@@ -205,50 +205,6 @@ fn drop_threadpool_drops_futures() {
}
#[test]
-fn thread_shutdown_timeout() {
- use std::sync::Mutex;
-
- let _ = ::env_logger::try_init();
-
- let (shutdown_tx, shutdown_rx) = mpsc::channel();
- let (complete_tx, complete_rx) = mpsc::channel();
-
- let t = Mutex::new(shutdown_tx);
-
- let pool = Builder::new()
- .keep_alive(Some(Duration::from_millis(200)))
- .around_worker(move |w, _| {
- w.run();
- // There could be multiple threads here
- let _ = t.lock().unwrap().send(());
- })
- .build();
- let tx = pool.sender().clone();
-
- let t = complete_tx.clone();
- tx.spawn(lazy(move || {
- t.send(()).unwrap();
- Ok(())
- })).unwrap();
-
- // The future completes
- complete_rx.recv().unwrap();
-
- // The thread shuts down eventually
- shutdown_rx.recv().unwrap();
-
- // Futures can still be run
- tx.spawn(lazy(move || {
- complete_tx.send(()).unwrap();
- Ok(())
- })).unwrap();
-
- complete_rx.recv().unwrap();
-
- pool.shutdown().wait().unwrap();
-}
-
-#[test]
fn many_oneshot_futures() {
const NUM: usize = 10_000;