summaryrefslogtreecommitdiffstats
path: root/tokio-threadpool
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-04-04 13:30:54 -0700
committerGitHub <noreply@github.com>2018-04-04 13:30:54 -0700
commit0bcf9b0ae61ec78db7ebac9853b76a2bf1c02e62 (patch)
tree21db51b4061fe7438494dbb03c9da0aa64b52db7 /tokio-threadpool
parentc7157395997c7be29eeca3cb4d823266165ee63e (diff)
ThreadPool refactoring (#299)
Diffstat (limited to 'tokio-threadpool')
-rw-r--r--tokio-threadpool/src/builder.rs4
-rw-r--r--tokio-threadpool/src/futures2_wake.rs4
-rw-r--r--tokio-threadpool/src/lib.rs1
-rw-r--r--tokio-threadpool/src/notifier.rs4
-rw-r--r--tokio-threadpool/src/pool/mod.rs259
-rw-r--r--tokio-threadpool/src/pool/stack.rs252
-rw-r--r--tokio-threadpool/src/pool/state.rs77
-rw-r--r--tokio-threadpool/src/sender.rs12
-rw-r--r--tokio-threadpool/src/shutdown.rs4
-rw-r--r--tokio-threadpool/src/sleep_stack.rs83
-rw-r--r--tokio-threadpool/src/thread_pool.rs4
-rw-r--r--tokio-threadpool/src/worker/entry.rs125
-rw-r--r--tokio-threadpool/src/worker/mod.rs31
-rw-r--r--tokio-threadpool/src/worker/state.rs35
14 files changed, 522 insertions, 373 deletions
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs
index 8f042b6d..9583206c 100644
--- a/tokio-threadpool/src/builder.rs
+++ b/tokio-threadpool/src/builder.rs
@@ -2,7 +2,7 @@ use callback::Callback;
use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use sender::Sender;
-use pool::Inner;
+use pool::Pool;
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};
@@ -329,7 +329,7 @@ impl Builder {
// Create the pool
let inner = Arc::new(
- Inner::new(
+ Pool::new(
workers.into_boxed_slice(),
self.config.clone()));
diff --git a/tokio-threadpool/src/futures2_wake.rs b/tokio-threadpool/src/futures2_wake.rs
index d114cc3c..ed9d4552 100644
--- a/tokio-threadpool/src/futures2_wake.rs
+++ b/tokio-threadpool/src/futures2_wake.rs
@@ -1,4 +1,4 @@
-use inner::Inner;
+use inner::Pool;
use notifier::Notifier;
use std::marker::PhantomData;
@@ -14,7 +14,7 @@ pub(crate) struct Futures2Wake {
}
impl Futures2Wake {
- pub(crate) fn new(id: usize, inner: &Arc<Inner>) -> Futures2Wake {
+ pub(crate) fn new(id: usize, inner: &Arc<Pool>) -> Futures2Wake {
let notifier = Arc::new(Notifier {
inner: Arc::downgrade(inner),
});
diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs
index b37d8b79..5c33b09d 100644
--- a/tokio-threadpool/src/lib.rs
+++ b/tokio-threadpool/src/lib.rs
@@ -27,7 +27,6 @@ mod pool;
mod sender;
mod shutdown;
mod shutdown_task;
-mod sleep_stack;
mod task;
mod thread_pool;
mod worker;
diff --git a/tokio-threadpool/src/notifier.rs b/tokio-threadpool/src/notifier.rs
index 78f43593..851dc55f 100644
--- a/tokio-threadpool/src/notifier.rs
+++ b/tokio-threadpool/src/notifier.rs
@@ -1,4 +1,4 @@
-use pool::Inner;
+use pool::Pool;
use task::Task;
use std::mem;
@@ -12,7 +12,7 @@ use futures::executor::Notify;
/// to poll the future again.
#[derive(Debug)]
pub(crate) struct Notifier {
- pub inner: Weak<Inner>,
+ pub inner: Weak<Pool>,
}
impl Notify for Notifier {
diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs
index e1b06b23..ba4aa17f 100644
--- a/tokio-threadpool/src/pool/mod.rs
+++ b/tokio-threadpool/src/pool/mod.rs
@@ -1,27 +1,22 @@
mod state;
+mod stack;
pub(crate) use self::state::{
- // TODO: Rename `State`
- PoolState,
- SHUTDOWN_ON_IDLE,
- SHUTDOWN_NOW,
+ State,
+ Lifecycle,
MAX_FUTURES,
};
+use self::stack::SleepStack;
-use config::{Config, MAX_WORKERS};
-use sleep_stack::{
- SleepStack,
- EMPTY,
- TERMINATED,
-};
+use config::Config;
use shutdown_task::ShutdownTask;
use task::Task;
-use worker::{self, Worker, WorkerId, WorkerState, PUSHED_MASK};
+use worker::{self, Worker, WorkerId};
use futures::task::AtomicTask;
use std::cell::UnsafeCell;
-use std::sync::atomic::Ordering::{Acquire, AcqRel, Release, Relaxed};
+use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@@ -29,12 +24,12 @@ use rand::{Rng, SeedableRng, XorShiftRng};
// TODO: Rename this
#[derive(Debug)]
-pub(crate) struct Inner {
+pub(crate) struct Pool {
// ThreadPool state
pub state: AtomicUsize,
// Stack tracking sleeping workers.
- pub sleep_stack: AtomicUsize,
+ sleep_stack: SleepStack,
// Number of workers who haven't reached the final state of shutdown
//
@@ -57,14 +52,14 @@ pub(crate) struct Inner {
pub config: Config,
}
-impl Inner {
- /// Create a new `Inner`
- pub fn new(workers: Box<[worker::Entry]>, config: Config) -> Inner {
+impl Pool {
+ /// Create a new `Pool`
+ pub fn new(workers: Box<[worker::Entry]>, config: Config) -> Pool {
let pool_size = workers.len();
- let ret = Inner {
- state: AtomicUsize::new(PoolState::new().into()),
- sleep_stack: AtomicUsize::new(SleepStack::new().into()),
+ let ret = Pool {
+ state: AtomicUsize::new(State::new().into()),
+ sleep_stack: SleepStack::new(),
num_workers: AtomicUsize::new(pool_size),
next_thread_id: AtomicUsize::new(0),
workers,
@@ -78,7 +73,7 @@ impl Inner {
// Now, we prime the sleeper stack
for i in 0..pool_size {
- ret.push_sleeper(i).unwrap();
+ ret.sleep_stack.push(&ret.workers, i).unwrap();
}
ret
@@ -87,20 +82,20 @@ impl Inner {
/// Start shutting down the pool. This means that no new futures will be
/// accepted.
pub fn shutdown(&self, now: bool, purge_queue: bool) {
- let mut state: PoolState = self.state.load(Acquire).into();
+ let mut state: State = self.state.load(Acquire).into();
trace!("shutdown; state={:?}", state);
// For now, this must be true
debug_assert!(!purge_queue || now);
- // Start by setting the SHUTDOWN flag
+ // Start by setting the shutdown flag
loop {
let mut next = state;
let num_futures = next.num_futures();
- if next.lifecycle() >= SHUTDOWN_NOW {
+ if next.lifecycle() == Lifecycle::ShutdownNow {
// Already transitioned to shutting down state
if !purge_queue || num_futures == 0 {
@@ -114,9 +109,9 @@ impl Inner {
} else {
next.set_lifecycle(if now || num_futures == 0 {
// If already idle, always transition to shutdown now.
- SHUTDOWN_NOW
+ Lifecycle::ShutdownNow
} else {
- SHUTDOWN_ON_IDLE
+ Lifecycle::ShutdownOnIdle
});
if purge_queue {
@@ -146,69 +141,29 @@ impl Inner {
self.terminate_sleeping_workers();
}
+ /// Called by `Worker` as it tries to enter a sleeping state. Before it
+ /// sleeps, it must push itself onto the sleep stack. This enables other
+ /// threads to see it when signaling work.
+ pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> {
+ self.sleep_stack.push(&self.workers, idx)
+ }
+
pub fn terminate_sleeping_workers(&self) {
use worker::Lifecycle::Signaled;
trace!(" -> shutting down workers");
// Wakeup all sleeping workers. They will wake up, see the state
// transition, and terminate.
- while let Some((idx, worker_state)) = self.pop_sleeper(Signaled, TERMINATED) {
+ while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
trace!(" -> shutdown worker; idx={:?}; state={:?}", idx, worker_state);
- self.signal_stop(idx, worker_state);
- }
- }
-
- /// Signals to the worker that it should stop
- fn signal_stop(&self, idx: usize, mut state: WorkerState) {
- use worker::Lifecycle::*;
-
- let worker = &self.workers[idx];
-
- // Transition the worker state to signaled
- loop {
- let mut next = state;
- match state.lifecycle() {
- Shutdown => {
- trace!("signal_stop -- WORKER_SHUTDOWN; idx={}", idx);
- // If the worker is in the shutdown state, then it will never be
- // started again.
- self.worker_terminated();
-
- return;
- }
- Running | Sleeping => {}
- Notified | Signaled => {
- trace!("signal_stop -- skipping; idx={}; state={:?}", idx, state);
- // These two states imply that the worker is active, thus it
- // will eventually see the shutdown signal, so we don't need
- // to do anything.
- //
- // The worker is forced to see the shutdown signal
- // eventually as:
- //
- // a) No more work will arrive
- // b) The shutdown signal is stored as the head of the
- // sleep, stack which will prevent the worker from going to
- // sleep again.
- return;
- }
+ if self.workers[idx].signal_stop(worker_state).is_err() {
+ // The worker is already in the shutdown state, immediately
+ // track that it has terminated as the worker will never work
+ // again.
+ self.worker_terminated();
}
-
- next.set_lifecycle(Signaled);
-
- let actual = worker.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
-
- if actual == state {
- break;
- }
-
- state = actual;
}
-
- // Wakeup the worker
- worker.wakeup();
}
pub fn worker_terminated(&self) {
@@ -226,7 +181,7 @@ impl Inner {
///
/// Called from either inside or outside of the scheduler. If currently on
/// the scheduler, then a fast path is taken.
- pub fn submit(&self, task: Task, inner: &Arc<Inner>) {
+ pub fn submit(&self, task: Task, inner: &Arc<Pool>) {
Worker::with_current(|worker| {
match worker {
Some(worker) => {
@@ -248,14 +203,14 @@ impl Inner {
///
/// Called from outside of the scheduler, this function is how new tasks
/// enter the system.
- fn submit_external(&self, task: Task, inner: &Arc<Inner>) {
+ fn submit_external(&self, task: Task, inner: &Arc<Pool>) {
use worker::Lifecycle::Notified;
// First try to get a handle to a sleeping worker. This ensures that
// sleeping tasks get woken up
- if let Some((idx, state)) = self.pop_sleeper(Notified, EMPTY) {
- trace!("submit to existing worker; idx={}; state={:?}", idx, state);
- self.submit_to_external(idx, task, state, inner);
+ if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Notified, false) {
+ trace!("submit to existing worker; idx={}; state={:?}", idx, worker_state);
+ self.submit_to_external(idx, task, worker_state, inner);
return;
}
@@ -266,15 +221,15 @@ impl Inner {
trace!(" -> submitting to random; idx={}", idx);
- let state: WorkerState = self.workers[idx].state.load(Acquire).into();
+ let state = self.workers[idx].load_state();
self.submit_to_external(idx, task, state, inner);
}
fn submit_to_external(&self,
idx: usize,
task: Task,
- state: WorkerState,
- inner: &Arc<Inner>)
+ state: worker::State,
+ inner: &Arc<Pool>)
{
let entry = &self.workers[idx];
@@ -283,40 +238,39 @@ impl Inner {
}
}
- fn spawn_worker(&self, idx: usize, inner: &Arc<Inner>) {
+ fn spawn_worker(&self, idx: usize, inner: &Arc<Pool>) {
Worker::spawn(WorkerId::new(idx), inner);
}
/// If there are any other workers currently relaxing, signal them that work
/// is available so that they can try to find more work to process.
- pub fn signal_work(&self, inner: &Arc<Inner>) {
+ pub fn signal_work(&self, inner: &Arc<Pool>) {
use worker::Lifecycle::*;
- if let Some((idx, mut state)) = self.pop_sleeper(Signaled, EMPTY) {
+ if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
let entry = &self.workers[idx];
- debug_assert!(state.lifecycle() != Signaled, "actual={:?}", state.lifecycle());
+ debug_assert!(worker_state.lifecycle() != Signaled, "actual={:?}", worker_state.lifecycle());
// Transition the worker state to signaled
loop {
- let mut next = state;
+ let mut next = worker_state;
- // pop_sleeper should skip these
next.set_lifecycle(Signaled);
let actual = entry.state.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
+ worker_state.into(), next.into(), AcqRel).into();
- if actual == state {
+ if actual == worker_state {
break;
}
- state = actual;
+ worker_state = actual;
}
// The state has been transitioned to signal, now we need to wake up
// the worker if necessary.
- match state.lifecycle() {
+ match worker_state.lifecycle() {
Sleeping => {
trace!("signal_work -- wakeup; idx={}", idx);
self.workers[idx].wakeup();
@@ -332,113 +286,6 @@ impl Inner {
}
}
- /// Push a worker on the sleep stack
- ///
- /// Returns `Err` if the pool has been terminated
- pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> {
- let mut state: SleepStack = self.sleep_stack.load(Acquire).into();
-
- debug_assert!(WorkerState::from(self.workers[idx].state.load(Relaxed)).is_pushed());
-
- loop {
- let mut next = state;
-
- let head = state.head();
-
- if head == TERMINATED {
- // The pool is terminated, cannot push the sleeper.
- return Err(());
- }
-
- self.workers[idx].set_next_sleeper(head);
- next.set_head(idx);
-
- let actual = self.sleep_stack.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
-
- if state == actual {
- return Ok(());
- }
-
- state = actual;
- }
- }
-
- /// Pop a worker from the sleep stack
- fn pop_sleeper(&self, max_lifecycle: worker::Lifecycle, terminal: usize)
- -> Option<(usize, WorkerState)>
- {
- debug_assert!(terminal == EMPTY || terminal == TERMINATED);
-
- let mut state: SleepStack = self.sleep_stack.load(Acquire).into();
-
- loop {
- let head = state.head();
-
- if head == EMPTY {
- let mut next = state;
- next.set_head(terminal);
-
- if next == state {
- debug_assert!(terminal == EMPTY);
- return None;
- }
-
- let actual = self.sleep_stack.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
-
- if actual != state {
- state = actual;
- continue;
- }
-
- return None;
- } else if head == TERMINATED {
- return None;
- }
-
- debug_assert!(head < MAX_WORKERS);
-
- let mut next = state;
-
- let next_head = self.workers[head].next_sleeper();
-
- // TERMINATED can never be set as the "next pointer" on a worker.
- debug_assert!(next_head != TERMINATED);
-
- if next_head == EMPTY {
- next.set_head(terminal);
- } else {
- next.set_head(next_head);
- }
-
- let actual = self.sleep_stack.compare_and_swap(
- state.into(), next.into(), AcqRel).into();
-
- if actual == state {
- // The worker has been removed from the stack, so the pushed bit
- // can be unset. Release ordering is used to ensure that this
- // operation happens after actually popping the task.
- debug_assert_eq!(1, PUSHED_MASK);
-
- // Unset the PUSHED flag and get the current state.
- let state: WorkerState = self.workers[head].state
- // TODO This should be fetch_and(!PUSHED_MASK)
- .fetch_sub(PUSHED_MASK, Release).into();
-
- if state.lifecycle() >= max_lifecycle {
- // If the worker has already been notified, then it is
- // warming up to do more work. In this case, try to pop
- // another thread that might be in a relaxed state.
- continue;
- }
-
- return Some((head, state));
- }
-
- state = actual;
- }
- }
/// Generates a random number
///
@@ -479,5 +326,5 @@ impl Inner {
}
}
-unsafe impl Send for Inner {}
-unsafe impl Sync for Inner {}
+unsafe impl Send for Pool {}
+unsafe impl Sync for Pool {}
diff --git a/tokio-threadpool/src/pool/stack.rs b/tokio-threadpool/src/pool/stack.rs
new file mode 100644
index 00000000..60cb75ee
--- /dev/null
+++ b/tokio-threadpool/src/pool/stack.rs
@@ -0,0 +1,252 @@
+use config::MAX_WORKERS;
+use worker;
+
+use std::{fmt, usize};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed};
+
+/// Lock-free stack of sleeping workers.
+///
+/// This is implemented as a Treiber stack and references to nodes are
+/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by
+/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This
+/// bit tracks if the entry is already pushed onto the stack or not. A single
+/// entry can only be stored on the stack a single time.
+///
+/// By using indexes instead of pointers, that allows a much greater amount of
+/// data to be used for the ABA guard (see correctness section of wikipedia
+/// page).
+///
+/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack
+#[derive(Debug)]
+pub(crate) struct SleepStack {
+ state: AtomicUsize,
+}
+
+/// State related to the stack of sleeping workers.
+///
+/// - Parked head 16 bits
+/// - Sequence remaining
+///
+/// The parked head value has a couple of special values:
+///
+/// - EMPTY: No sleepers
+/// - TERMINATED: Don't spawn more threads
+#[derive(Eq, PartialEq, Clone, Copy)]
+pub struct State(usize);
+
+/// Extracts the head of the worker stack from the scheduler state
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: usize = MAX_WORKERS;
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: usize = EMPTY + 1;
+
+/// How many bits the treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl SleepStack =====
+
+impl SleepStack {
+ /// Create a new `SleepStack` representing the empty state.
+ pub fn new() -> SleepStack {
+ let state = AtomicUsize::new(State::new().into());
+ SleepStack { state }
+ }
+
+ /// Push a worker onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// Whene terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed());
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[idx].set_next_sleeper(head);
+ next.set_head(idx);
+
+ let actual = self.state.compare_and_swap(
+ state.into(), next.into(), AcqRel).into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+
+ /// Pop a worker off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further workers can be pusheed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// Returns the index of the popped worker and the worker's observed state.
+ ///
+ /// `None` if the stack is empty.
+ pub fn pop(&self, entries: &[worker::Entry],
+ max_lifecycle: worker::Lifecycle,
+ terminate: bool)
+ -> Option<(usize, worker::State)>
+ {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ // If terminating, the max lifecycle *must* be `Signaled`, which is the
+ // highest lifecycle. By passing the greatest possible lifecycle value,
+ // no entries are skipped by this function.
+ //
+ // TODO: It would be better to terminate in a separate function that
+ // atomically takes all values and transitions to a terminated state.
+ debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled);
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return None;
+ }
+
+ let actual = self.state.compare_and_swap(
+ state.into(), next.into(), AcqRel).into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return None;
+ } else if head == TERMINATED {
+ return None;
+ }
+
+ debug_assert!(head < MAX_WORKERS);
+
+ let mut next = state;
+
+ let next_head = entries[head].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self.state.compare_and_swap(
+ state.into(), next.into(), AcqRel).into();
+
+ if actual == state {
+ // Release ordering is needed to ensure that unsetting the
+ // `pushed` flag happens after popping the sleeper from the
+ // stack.
+ //
+ // Acquire ordering is required to acquire any memory associated
+ // with transitioning the worker's lifecycle.
+ let state = entries[head].fetch_unset_pushed(AcqRel);
+
+ if state.lifecycle() >= max_lifecycle {
+ // If the worker has already been notified, then it is
+ // warming up to do more work. In this case, try to pop
+ // another thread that might be in a relaxed state.
+ continue;
+ }
+
+ return Some((head, state));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ #[inline]
+ fn new() -> State {
+ State(EMPTY)
+ }
+
+ #[inline]
+ fn head(&self) -> usize {
+ self.0 & STACK_MASK
+ }
+
+ #[inline]
+ fn set_head(&mut self, val: usize) {
+ // The ABA guard protects against the ABA problem w/ treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let head = self.head();
+
+ let mut fmt = fmt.debug_struct("stack::State");
+
+ if head < MAX_WORKERS {
+ fmt.field("head", &head);
+ } else if head == EMPTY {
+ fmt.field("head", &"EMPTY");
+ } else if head == TERMINATED {
+ fmt.field("head", &"TERMINATED");
+ }
+
+ fmt.finish()
+ }
+}
diff --git a/tokio-threadpool/src/pool/state.rs b/tokio-threadpool/src/pool/state.rs
index de51cc43..e8f5d12e 100644
--- a/tokio-threadpool/src/pool/state.rs
+++ b/tokio-threadpool/src/pool/state.rs
@@ -6,11 +6,20 @@ use std::{fmt, usize};
/// shutdown on idle, 2 for shutting down). The remaining bits represent the
/// number of futures that still need to complete.
#[derive(Eq, PartialEq, Clone, Copy)]
-pub(crate) struct PoolState(usize);
+pub(crate) struct State(usize);
-/// Flag used to track if the pool is running
-pub(crate) const SHUTDOWN_ON_IDLE: usize = 1;
-pub(crate) const SHUTDOWN_NOW: usize = 2;
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
+#[repr(usize)]
+pub(crate) enum Lifecycle {
+ /// The thread pool is currently running
+ Running = 0,
+
+ /// The thread pool should shutdown once it reaches an idle state.
+ ShutdownOnIdle = 1,
+
+ /// The thread pool should start the process of shutting down.
+ ShutdownNow = 2,
+}
/// Mask used to extract the number of futures from the state
const LIFECYCLE_MASK: usize = 0b11;
@@ -20,10 +29,12 @@ const NUM_FUTURES_OFFSET: usize = 2;
/// Max number of futures the pool can handle.
pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET;
-impl PoolState {
+// ===== impl State =====
+
+impl State {
#[inline]
- pub fn new() -> PoolState {
- PoolState(0)
+ pub fn new() -> State {
+ State(0)
}
/// Returns the number of futures still pending completion.
@@ -36,7 +47,7 @@ impl PoolState {
/// Returns false on failure.
pub fn inc_num_futures(&mut self) {
debug_assert!(self.num_futures() < MAX_FUTURES);
- debug_assert!(self.lifecycle() < SHUTDOWN_NOW);
+ debug_assert!(self.lifecycle() < Lifecycle::ShutdownNow);
self.0 += 1 << NUM_FUTURES_OFFSET;
}
@@ -52,8 +63,8 @@ impl PoolState {
self.0 -= 1 << NUM_FUTURES_OFFSET;
- if self.lifecycle() == SHUTDOWN_ON_IDLE && num_futures == 1 {
- self.0 = SHUTDOWN_NOW;
+ if self.lifecycle() == Lifecycle::ShutdownOnIdle && num_futures == 1 {
+ self.set_lifecycle(Lifecycle::ShutdownNow);
}
}
@@ -62,36 +73,60 @@ impl PoolState {
self.0 = self.0 & LIFECYCLE_MASK;
}
- pub fn lifecycle(&self) -> usize {
- self.0 & LIFECYCLE_MASK
+ pub fn lifecycle(&self) -> Lifecycle {
+ (self.0 & LIFECYCLE_MASK).into()
}
- pub fn set_lifecycle(&mut self, val: usize) {
- self.0 = (self.0 & NUM_FUTURES_MASK) | val;
+ pub fn set_lifecycle(&mut self, val: Lifecycle) {
+ self.0 = (self.0 & NUM_FUTURES_MASK) | (val as usize);
}
pub fn is_terminated(&self) -> bool {
- self.lifecycle() == SHUTDOWN_NOW && self.num_futures() == 0
+ self.lifecycle() == Lifecycle::ShutdownNow &&
+ self.num_futures() == 0
}
}
-impl From<usize> for PoolState {
+impl From<usize> for State {
fn from(src: usize) -> Self {
- PoolState(src)
+ State(src)
}
}
-impl From<PoolState> for usize {
- fn from(src: PoolState) -> Self {
+impl From<State> for usize {
+ fn from(src: State) -> Self {
src.0
}
}
-impl fmt::Debug for PoolState {
+impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_struct("State")
+ fmt.debug_struct("pool::State")
.field("lifecycle", &self.lifecycle())
.field("num_futures", &self.num_futures())
.finish()
}
}
+
+// ===== impl Lifecycle =====
+
+impl From<usize> for Lifecycle {
+ fn from(src: usize) -> Lifecycle {
+ use self::Lifecycle::*;
+
+ debug_assert!(
+ src == Running as usize ||
+ src == ShutdownOnIdle as usize ||
+ src == ShutdownNow as usize);
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<Lifecycle> for usize {
+ fn from(src: Lifecycle) -> usize {
+ let v = src as usize;
+ debug_assert!(v & LIFECYCLE_MASK == v);
+ v
+ }
+}
diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs
index fd4cc94e..c07818fe 100644
--- a/tokio-threadpool/src/sender.rs
+++ b/tokio-threadpool/src/sender.rs
@@ -1,4 +1,4 @@
-use pool::{Inner, PoolState, SHUTDOWN_NOW, MAX_FUTURES};
+use pool::{self, Pool, Lifecycle, MAX_FUTURES};
use task::Task;
use std::sync::Arc;
@@ -27,7 +27,7 @@ use futures2_wake::{into_waker, Futures2Wake};
/// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender
#[derive(Debug)]
pub struct Sender {
- pub(crate) inner: Arc<Inner>,
+ pub(crate) inner: Arc<Pool>,
}
impl Sender {
@@ -89,7 +89,7 @@ impl Sender {
/// Logic to prepare for spawning
fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
- let mut state: PoolState = self.inner.state.load(Acquire).into();
+ let mut state: pool::State = self.inner.state.load(Acquire).into();
// Increment the number of futures spawned on the pool as well as
// validate that the pool is still running/
@@ -101,7 +101,7 @@ impl Sender {
return Err(SpawnError::at_capacity());
}
- if next.lifecycle() == SHUTDOWN_NOW {
+ if next.lifecycle() == Lifecycle::ShutdownNow {
// Cannot execute the future, executor is shutdown.
return Err(SpawnError::shutdown());
}
@@ -144,14 +144,14 @@ impl tokio_executor::Executor for Sender {
impl<'a> tokio_executor::Executor for &'a Sender {
fn status(&self) -> Result<(), tokio_executor::SpawnError> {
- let state: PoolState = self.inner.state.load(Acquire).into();
+ let state: pool::State = self.inner.state.load(Acquire).into();
if state.num_futures() == MAX_FUTURES {
// No capacity
return Err(SpawnError::at_capacity());
}
- if state.lifecycle() == SHUTDOWN_NOW {
+ if state.lifecycle() == Lifecycle::ShutdownNow {
// Cannot execute the future, executor is shutdown.
return Err(SpawnError::shutdown());
}
diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs
index ea59287d..