diff options
author | Carl Lerche <me@carllerche.com> | 2018-04-04 13:30:54 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-04 13:30:54 -0700 |
commit | 0bcf9b0ae61ec78db7ebac9853b76a2bf1c02e62 (patch) | |
tree | 21db51b4061fe7438494dbb03c9da0aa64b52db7 /tokio-threadpool | |
parent | c7157395997c7be29eeca3cb4d823266165ee63e (diff) |
ThreadPool refactoring (#299)
Diffstat (limited to 'tokio-threadpool')
-rw-r--r-- | tokio-threadpool/src/builder.rs | 4 | ||||
-rw-r--r-- | tokio-threadpool/src/futures2_wake.rs | 4 | ||||
-rw-r--r-- | tokio-threadpool/src/lib.rs | 1 | ||||
-rw-r--r-- | tokio-threadpool/src/notifier.rs | 4 | ||||
-rw-r--r-- | tokio-threadpool/src/pool/mod.rs | 259 | ||||
-rw-r--r-- | tokio-threadpool/src/pool/stack.rs | 252 | ||||
-rw-r--r-- | tokio-threadpool/src/pool/state.rs | 77 | ||||
-rw-r--r-- | tokio-threadpool/src/sender.rs | 12 | ||||
-rw-r--r-- | tokio-threadpool/src/shutdown.rs | 4 | ||||
-rw-r--r-- | tokio-threadpool/src/sleep_stack.rs | 83 | ||||
-rw-r--r-- | tokio-threadpool/src/thread_pool.rs | 4 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/entry.rs | 125 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/mod.rs | 31 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/state.rs | 35 |
14 files changed, 522 insertions, 373 deletions
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index 8f042b6d..9583206c 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -2,7 +2,7 @@ use callback::Callback; use config::{Config, MAX_WORKERS}; use park::{BoxPark, BoxedPark, DefaultPark}; use sender::Sender; -use pool::Inner; +use pool::Pool; use thread_pool::ThreadPool; use worker::{self, Worker, WorkerId}; @@ -329,7 +329,7 @@ impl Builder { // Create the pool let inner = Arc::new( - Inner::new( + Pool::new( workers.into_boxed_slice(), self.config.clone())); diff --git a/tokio-threadpool/src/futures2_wake.rs b/tokio-threadpool/src/futures2_wake.rs index d114cc3c..ed9d4552 100644 --- a/tokio-threadpool/src/futures2_wake.rs +++ b/tokio-threadpool/src/futures2_wake.rs @@ -1,4 +1,4 @@ -use inner::Inner; +use inner::Pool; use notifier::Notifier; use std::marker::PhantomData; @@ -14,7 +14,7 @@ pub(crate) struct Futures2Wake { } impl Futures2Wake { - pub(crate) fn new(id: usize, inner: &Arc<Inner>) -> Futures2Wake { + pub(crate) fn new(id: usize, inner: &Arc<Pool>) -> Futures2Wake { let notifier = Arc::new(Notifier { inner: Arc::downgrade(inner), }); diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index b37d8b79..5c33b09d 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -27,7 +27,6 @@ mod pool; mod sender; mod shutdown; mod shutdown_task; -mod sleep_stack; mod task; mod thread_pool; mod worker; diff --git a/tokio-threadpool/src/notifier.rs b/tokio-threadpool/src/notifier.rs index 78f43593..851dc55f 100644 --- a/tokio-threadpool/src/notifier.rs +++ b/tokio-threadpool/src/notifier.rs @@ -1,4 +1,4 @@ -use pool::Inner; +use pool::Pool; use task::Task; use std::mem; @@ -12,7 +12,7 @@ use futures::executor::Notify; /// to poll the future again. #[derive(Debug)] pub(crate) struct Notifier { - pub inner: Weak<Inner>, + pub inner: Weak<Pool>, } impl Notify for Notifier { diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index e1b06b23..ba4aa17f 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -1,27 +1,22 @@ mod state; +mod stack; pub(crate) use self::state::{ - // TODO: Rename `State` - PoolState, - SHUTDOWN_ON_IDLE, - SHUTDOWN_NOW, + State, + Lifecycle, MAX_FUTURES, }; +use self::stack::SleepStack; -use config::{Config, MAX_WORKERS}; -use sleep_stack::{ - SleepStack, - EMPTY, - TERMINATED, -}; +use config::Config; use shutdown_task::ShutdownTask; use task::Task; -use worker::{self, Worker, WorkerId, WorkerState, PUSHED_MASK}; +use worker::{self, Worker, WorkerId}; use futures::task::AtomicTask; use std::cell::UnsafeCell; -use std::sync::atomic::Ordering::{Acquire, AcqRel, Release, Relaxed}; +use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -29,12 +24,12 @@ use rand::{Rng, SeedableRng, XorShiftRng}; // TODO: Rename this #[derive(Debug)] -pub(crate) struct Inner { +pub(crate) struct Pool { // ThreadPool state pub state: AtomicUsize, // Stack tracking sleeping workers. - pub sleep_stack: AtomicUsize, + sleep_stack: SleepStack, // Number of workers who haven't reached the final state of shutdown // @@ -57,14 +52,14 @@ pub(crate) struct Inner { pub config: Config, } -impl Inner { - /// Create a new `Inner` - pub fn new(workers: Box<[worker::Entry]>, config: Config) -> Inner { +impl Pool { + /// Create a new `Pool` + pub fn new(workers: Box<[worker::Entry]>, config: Config) -> Pool { let pool_size = workers.len(); - let ret = Inner { - state: AtomicUsize::new(PoolState::new().into()), - sleep_stack: AtomicUsize::new(SleepStack::new().into()), + let ret = Pool { + state: AtomicUsize::new(State::new().into()), + sleep_stack: SleepStack::new(), num_workers: AtomicUsize::new(pool_size), next_thread_id: AtomicUsize::new(0), workers, @@ -78,7 +73,7 @@ impl Inner { // Now, we prime the sleeper stack for i in 0..pool_size { - ret.push_sleeper(i).unwrap(); + ret.sleep_stack.push(&ret.workers, i).unwrap(); } ret @@ -87,20 +82,20 @@ impl Inner { /// Start shutting down the pool. This means that no new futures will be /// accepted. pub fn shutdown(&self, now: bool, purge_queue: bool) { - let mut state: PoolState = self.state.load(Acquire).into(); + let mut state: State = self.state.load(Acquire).into(); trace!("shutdown; state={:?}", state); // For now, this must be true debug_assert!(!purge_queue || now); - // Start by setting the SHUTDOWN flag + // Start by setting the shutdown flag loop { let mut next = state; let num_futures = next.num_futures(); - if next.lifecycle() >= SHUTDOWN_NOW { + if next.lifecycle() == Lifecycle::ShutdownNow { // Already transitioned to shutting down state if !purge_queue || num_futures == 0 { @@ -114,9 +109,9 @@ impl Inner { } else { next.set_lifecycle(if now || num_futures == 0 { // If already idle, always transition to shutdown now. - SHUTDOWN_NOW + Lifecycle::ShutdownNow } else { - SHUTDOWN_ON_IDLE + Lifecycle::ShutdownOnIdle }); if purge_queue { @@ -146,69 +141,29 @@ impl Inner { self.terminate_sleeping_workers(); } + /// Called by `Worker` as it tries to enter a sleeping state. Before it + /// sleeps, it must push itself onto the sleep stack. This enables other + /// threads to see it when signaling work. + pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> { + self.sleep_stack.push(&self.workers, idx) + } + pub fn terminate_sleeping_workers(&self) { use worker::Lifecycle::Signaled; trace!(" -> shutting down workers"); // Wakeup all sleeping workers. They will wake up, see the state // transition, and terminate. - while let Some((idx, worker_state)) = self.pop_sleeper(Signaled, TERMINATED) { + while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) { trace!(" -> shutdown worker; idx={:?}; state={:?}", idx, worker_state); - self.signal_stop(idx, worker_state); - } - } - - /// Signals to the worker that it should stop - fn signal_stop(&self, idx: usize, mut state: WorkerState) { - use worker::Lifecycle::*; - - let worker = &self.workers[idx]; - - // Transition the worker state to signaled - loop { - let mut next = state; - match state.lifecycle() { - Shutdown => { - trace!("signal_stop -- WORKER_SHUTDOWN; idx={}", idx); - // If the worker is in the shutdown state, then it will never be - // started again. - self.worker_terminated(); - - return; - } - Running | Sleeping => {} - Notified | Signaled => { - trace!("signal_stop -- skipping; idx={}; state={:?}", idx, state); - // These two states imply that the worker is active, thus it - // will eventually see the shutdown signal, so we don't need - // to do anything. - // - // The worker is forced to see the shutdown signal - // eventually as: - // - // a) No more work will arrive - // b) The shutdown signal is stored as the head of the - // sleep, stack which will prevent the worker from going to - // sleep again. - return; - } + if self.workers[idx].signal_stop(worker_state).is_err() { + // The worker is already in the shutdown state, immediately + // track that it has terminated as the worker will never work + // again. + self.worker_terminated(); } - - next.set_lifecycle(Signaled); - - let actual = worker.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if actual == state { - break; - } - - state = actual; } - - // Wakeup the worker - worker.wakeup(); } pub fn worker_terminated(&self) { @@ -226,7 +181,7 @@ impl Inner { /// /// Called from either inside or outside of the scheduler. If currently on /// the scheduler, then a fast path is taken. - pub fn submit(&self, task: Task, inner: &Arc<Inner>) { + pub fn submit(&self, task: Task, inner: &Arc<Pool>) { Worker::with_current(|worker| { match worker { Some(worker) => { @@ -248,14 +203,14 @@ impl Inner { /// /// Called from outside of the scheduler, this function is how new tasks /// enter the system. - fn submit_external(&self, task: Task, inner: &Arc<Inner>) { + fn submit_external(&self, task: Task, inner: &Arc<Pool>) { use worker::Lifecycle::Notified; // First try to get a handle to a sleeping worker. This ensures that // sleeping tasks get woken up - if let Some((idx, state)) = self.pop_sleeper(Notified, EMPTY) { - trace!("submit to existing worker; idx={}; state={:?}", idx, state); - self.submit_to_external(idx, task, state, inner); + if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Notified, false) { + trace!("submit to existing worker; idx={}; state={:?}", idx, worker_state); + self.submit_to_external(idx, task, worker_state, inner); return; } @@ -266,15 +221,15 @@ impl Inner { trace!(" -> submitting to random; idx={}", idx); - let state: WorkerState = self.workers[idx].state.load(Acquire).into(); + let state = self.workers[idx].load_state(); self.submit_to_external(idx, task, state, inner); } fn submit_to_external(&self, idx: usize, task: Task, - state: WorkerState, - inner: &Arc<Inner>) + state: worker::State, + inner: &Arc<Pool>) { let entry = &self.workers[idx]; @@ -283,40 +238,39 @@ impl Inner { } } - fn spawn_worker(&self, idx: usize, inner: &Arc<Inner>) { + fn spawn_worker(&self, idx: usize, inner: &Arc<Pool>) { Worker::spawn(WorkerId::new(idx), inner); } /// If there are any other workers currently relaxing, signal them that work /// is available so that they can try to find more work to process. - pub fn signal_work(&self, inner: &Arc<Inner>) { + pub fn signal_work(&self, inner: &Arc<Pool>) { use worker::Lifecycle::*; - if let Some((idx, mut state)) = self.pop_sleeper(Signaled, EMPTY) { + if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { let entry = &self.workers[idx]; - debug_assert!(state.lifecycle() != Signaled, "actual={:?}", state.lifecycle()); + debug_assert!(worker_state.lifecycle() != Signaled, "actual={:?}", worker_state.lifecycle()); // Transition the worker state to signaled loop { - let mut next = state; + let mut next = worker_state; - // pop_sleeper should skip these next.set_lifecycle(Signaled); let actual = entry.state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); + worker_state.into(), next.into(), AcqRel).into(); - if actual == state { + if actual == worker_state { break; } - state = actual; + worker_state = actual; } // The state has been transitioned to signal, now we need to wake up // the worker if necessary. - match state.lifecycle() { + match worker_state.lifecycle() { Sleeping => { trace!("signal_work -- wakeup; idx={}", idx); self.workers[idx].wakeup(); @@ -332,113 +286,6 @@ impl Inner { } } - /// Push a worker on the sleep stack - /// - /// Returns `Err` if the pool has been terminated - pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> { - let mut state: SleepStack = self.sleep_stack.load(Acquire).into(); - - debug_assert!(WorkerState::from(self.workers[idx].state.load(Relaxed)).is_pushed()); - - loop { - let mut next = state; - - let head = state.head(); - - if head == TERMINATED { - // The pool is terminated, cannot push the sleeper. - return Err(()); - } - - self.workers[idx].set_next_sleeper(head); - next.set_head(idx); - - let actual = self.sleep_stack.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if state == actual { - return Ok(()); - } - - state = actual; - } - } - - /// Pop a worker from the sleep stack - fn pop_sleeper(&self, max_lifecycle: worker::Lifecycle, terminal: usize) - -> Option<(usize, WorkerState)> - { - debug_assert!(terminal == EMPTY || terminal == TERMINATED); - - let mut state: SleepStack = self.sleep_stack.load(Acquire).into(); - - loop { - let head = state.head(); - - if head == EMPTY { - let mut next = state; - next.set_head(terminal); - - if next == state { - debug_assert!(terminal == EMPTY); - return None; - } - - let actual = self.sleep_stack.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if actual != state { - state = actual; - continue; - } - - return None; - } else if head == TERMINATED { - return None; - } - - debug_assert!(head < MAX_WORKERS); - - let mut next = state; - - let next_head = self.workers[head].next_sleeper(); - - // TERMINATED can never be set as the "next pointer" on a worker. - debug_assert!(next_head != TERMINATED); - - if next_head == EMPTY { - next.set_head(terminal); - } else { - next.set_head(next_head); - } - - let actual = self.sleep_stack.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if actual == state { - // The worker has been removed from the stack, so the pushed bit - // can be unset. Release ordering is used to ensure that this - // operation happens after actually popping the task. - debug_assert_eq!(1, PUSHED_MASK); - - // Unset the PUSHED flag and get the current state. - let state: WorkerState = self.workers[head].state - // TODO This should be fetch_and(!PUSHED_MASK) - .fetch_sub(PUSHED_MASK, Release).into(); - - if state.lifecycle() >= max_lifecycle { - // If the worker has already been notified, then it is - // warming up to do more work. In this case, try to pop - // another thread that might be in a relaxed state. - continue; - } - - return Some((head, state)); - } - - state = actual; - } - } /// Generates a random number /// @@ -479,5 +326,5 @@ impl Inner { } } -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} +unsafe impl Send for Pool {} +unsafe impl Sync for Pool {} diff --git a/tokio-threadpool/src/pool/stack.rs b/tokio-threadpool/src/pool/stack.rs new file mode 100644 index 00000000..60cb75ee --- /dev/null +++ b/tokio-threadpool/src/pool/stack.rs @@ -0,0 +1,252 @@ +use config::MAX_WORKERS; +use worker; + +use std::{fmt, usize}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed}; + +/// Lock-free stack of sleeping workers. +/// +/// This is implemented as a Treiber stack and references to nodes are +/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by +/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This +/// bit tracks if the entry is already pushed onto the stack or not. A single +/// entry can only be stored on the stack a single time. +/// +/// By using indexes instead of pointers, that allows a much greater amount of +/// data to be used for the ABA guard (see correctness section of wikipedia +/// page). +/// +/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack +#[derive(Debug)] +pub(crate) struct SleepStack { + state: AtomicUsize, +} + +/// State related to the stack of sleeping workers. +/// +/// - Parked head 16 bits +/// - Sequence remaining +/// +/// The parked head value has a couple of special values: +/// +/// - EMPTY: No sleepers +/// - TERMINATED: Don't spawn more threads +#[derive(Eq, PartialEq, Clone, Copy)] +pub struct State(usize); + +/// Extracts the head of the worker stack from the scheduler state +const STACK_MASK: usize = ((1 << 16) - 1); + +/// Used to mark the stack as empty +pub(crate) const EMPTY: usize = MAX_WORKERS; + +/// Used to mark the stack as terminated +pub(crate) const TERMINATED: usize = EMPTY + 1; + +/// How many bits the treiber ABA guard is offset by +const ABA_GUARD_SHIFT: usize = 16; + +#[cfg(target_pointer_width = "64")] +const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; + +#[cfg(target_pointer_width = "32")] +const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; + +// ===== impl SleepStack ===== + +impl SleepStack { + /// Create a new `SleepStack` representing the empty state. + pub fn new() -> SleepStack { + let state = AtomicUsize::new(State::new().into()); + SleepStack { state } + } + + /// Push a worker onto the stack + /// + /// # Return + /// + /// Returns `Ok` on success. + /// + /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. + /// Whene terminated, pushing new entries is no longer permitted. + pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> { + let mut state: State = self.state.load(Acquire).into(); + + debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed()); + + loop { + let mut next = state; + + let head = state.head(); + + if head == TERMINATED { + // The pool is terminated, cannot push the sleeper. + return Err(()); + } + + entries[idx].set_next_sleeper(head); + next.set_head(idx); + + let actual = self.state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if state == actual { + return Ok(()); + } + + state = actual; + } + } + + + /// Pop a worker off the stack. + /// + /// If `terminate` is set and the stack is empty when this function is + /// called, the state of the stack is transitioned to "terminated". At this + /// point, no further workers can be pusheed onto the stack. + /// + /// # Return + /// + /// Returns the index of the popped worker and the worker's observed state. + /// + /// `None` if the stack is empty. + pub fn pop(&self, entries: &[worker::Entry], + max_lifecycle: worker::Lifecycle, + terminate: bool) + -> Option<(usize, worker::State)> + { + // Figure out the empty value + let terminal = match terminate { + true => TERMINATED, + false => EMPTY, + }; + + // If terminating, the max lifecycle *must* be `Signaled`, which is the + // highest lifecycle. By passing the greatest possible lifecycle value, + // no entries are skipped by this function. + // + // TODO: It would be better to terminate in a separate function that + // atomically takes all values and transitions to a terminated state. + debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled); + + let mut state: State = self.state.load(Acquire).into(); + + loop { + let head = state.head(); + + if head == EMPTY { + let mut next = state; + next.set_head(terminal); + + if next == state { + debug_assert!(terminal == EMPTY); + return None; + } + + let actual = self.state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if actual != state { + state = actual; + continue; + } + + return None; + } else if head == TERMINATED { + return None; + } + + debug_assert!(head < MAX_WORKERS); + + let mut next = state; + + let next_head = entries[head].next_sleeper(); + + // TERMINATED can never be set as the "next pointer" on a worker. + debug_assert!(next_head != TERMINATED); + + if next_head == EMPTY { + next.set_head(terminal); + } else { + next.set_head(next_head); + } + + let actual = self.state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if actual == state { + // Release ordering is needed to ensure that unsetting the + // `pushed` flag happens after popping the sleeper from the + // stack. + // + // Acquire ordering is required to acquire any memory associated + // with transitioning the worker's lifecycle. + let state = entries[head].fetch_unset_pushed(AcqRel); + + if state.lifecycle() >= max_lifecycle { + // If the worker has already been notified, then it is + // warming up to do more work. In this case, try to pop + // another thread that might be in a relaxed state. + continue; + } + + return Some((head, state)); + } + + state = actual; + } + } +} + +// ===== impl State ===== + +impl State { + #[inline] + fn new() -> State { + State(EMPTY) + } + + #[inline] + fn head(&self) -> usize { + self.0 & STACK_MASK + } + + #[inline] + fn set_head(&mut self, val: usize) { + // The ABA guard protects against the ABA problem w/ treiber stacks + let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; + + self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let head = self.head(); + + let mut fmt = fmt.debug_struct("stack::State"); + + if head < MAX_WORKERS { + fmt.field("head", &head); + } else if head == EMPTY { + fmt.field("head", &"EMPTY"); + } else if head == TERMINATED { + fmt.field("head", &"TERMINATED"); + } + + fmt.finish() + } +} diff --git a/tokio-threadpool/src/pool/state.rs b/tokio-threadpool/src/pool/state.rs index de51cc43..e8f5d12e 100644 --- a/tokio-threadpool/src/pool/state.rs +++ b/tokio-threadpool/src/pool/state.rs @@ -6,11 +6,20 @@ use std::{fmt, usize}; /// shutdown on idle, 2 for shutting down). The remaining bits represent the /// number of futures that still need to complete. #[derive(Eq, PartialEq, Clone, Copy)] -pub(crate) struct PoolState(usize); +pub(crate) struct State(usize); -/// Flag used to track if the pool is running -pub(crate) const SHUTDOWN_ON_IDLE: usize = 1; -pub(crate) const SHUTDOWN_NOW: usize = 2; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] +#[repr(usize)] +pub(crate) enum Lifecycle { + /// The thread pool is currently running + Running = 0, + + /// The thread pool should shutdown once it reaches an idle state. + ShutdownOnIdle = 1, + + /// The thread pool should start the process of shutting down. + ShutdownNow = 2, +} /// Mask used to extract the number of futures from the state const LIFECYCLE_MASK: usize = 0b11; @@ -20,10 +29,12 @@ const NUM_FUTURES_OFFSET: usize = 2; /// Max number of futures the pool can handle. pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET; -impl PoolState { +// ===== impl State ===== + +impl State { #[inline] - pub fn new() -> PoolState { - PoolState(0) + pub fn new() -> State { + State(0) } /// Returns the number of futures still pending completion. @@ -36,7 +47,7 @@ impl PoolState { /// Returns false on failure. pub fn inc_num_futures(&mut self) { debug_assert!(self.num_futures() < MAX_FUTURES); - debug_assert!(self.lifecycle() < SHUTDOWN_NOW); + debug_assert!(self.lifecycle() < Lifecycle::ShutdownNow); self.0 += 1 << NUM_FUTURES_OFFSET; } @@ -52,8 +63,8 @@ impl PoolState { self.0 -= 1 << NUM_FUTURES_OFFSET; - if self.lifecycle() == SHUTDOWN_ON_IDLE && num_futures == 1 { - self.0 = SHUTDOWN_NOW; + if self.lifecycle() == Lifecycle::ShutdownOnIdle && num_futures == 1 { + self.set_lifecycle(Lifecycle::ShutdownNow); } } @@ -62,36 +73,60 @@ impl PoolState { self.0 = self.0 & LIFECYCLE_MASK; } - pub fn lifecycle(&self) -> usize { - self.0 & LIFECYCLE_MASK + pub fn lifecycle(&self) -> Lifecycle { + (self.0 & LIFECYCLE_MASK).into() } - pub fn set_lifecycle(&mut self, val: usize) { - self.0 = (self.0 & NUM_FUTURES_MASK) | val; + pub fn set_lifecycle(&mut self, val: Lifecycle) { + self.0 = (self.0 & NUM_FUTURES_MASK) | (val as usize); } pub fn is_terminated(&self) -> bool { - self.lifecycle() == SHUTDOWN_NOW && self.num_futures() == 0 + self.lifecycle() == Lifecycle::ShutdownNow && + self.num_futures() == 0 } } -impl From<usize> for PoolState { +impl From<usize> for State { fn from(src: usize) -> Self { - PoolState(src) + State(src) } } -impl From<PoolState> for usize { - fn from(src: PoolState) -> Self { +impl From<State> for usize { + fn from(src: State) -> Self { src.0 } } -impl fmt::Debug for PoolState { +impl fmt::Debug for State { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("State") + fmt.debug_struct("pool::State") .field("lifecycle", &self.lifecycle()) .field("num_futures", &self.num_futures()) .finish() } } + +// ===== impl Lifecycle ===== + +impl From<usize> for Lifecycle { + fn from(src: usize) -> Lifecycle { + use self::Lifecycle::*; + + debug_assert!( + src == Running as usize || + src == ShutdownOnIdle as usize || + src == ShutdownNow as usize); + + unsafe { ::std::mem::transmute(src) } + } +} + +impl From<Lifecycle> for usize { + fn from(src: Lifecycle) -> usize { + let v = src as usize; + debug_assert!(v & LIFECYCLE_MASK == v); + v + } +} diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs index fd4cc94e..c07818fe 100644 --- a/tokio-threadpool/src/sender.rs +++ b/tokio-threadpool/src/sender.rs @@ -1,4 +1,4 @@ -use pool::{Inner, PoolState, SHUTDOWN_NOW, MAX_FUTURES}; +use pool::{self, Pool, Lifecycle, MAX_FUTURES}; use task::Task; use std::sync::Arc; @@ -27,7 +27,7 @@ use futures2_wake::{into_waker, Futures2Wake}; /// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender #[derive(Debug)] pub struct Sender { - pub(crate) inner: Arc<Inner>, + pub(crate) inner: Arc<Pool>, } impl Sender { @@ -89,7 +89,7 @@ impl Sender { /// Logic to prepare for spawning fn prepare_for_spawn(&self) -> Result<(), SpawnError> { - let mut state: PoolState = self.inner.state.load(Acquire).into(); + let mut state: pool::State = self.inner.state.load(Acquire).into(); // Increment the number of futures spawned on the pool as well as // validate that the pool is still running/ @@ -101,7 +101,7 @@ impl Sender { return Err(SpawnError::at_capacity()); } - if next.lifecycle() == SHUTDOWN_NOW { + if next.lifecycle() == Lifecycle::ShutdownNow { // Cannot execute the future, executor is shutdown. return Err(SpawnError::shutdown()); } @@ -144,14 +144,14 @@ impl tokio_executor::Executor for Sender { impl<'a> tokio_executor::Executor for &'a Sender { fn status(&self) -> Result<(), tokio_executor::SpawnError> { - let state: PoolState = self.inner.state.load(Acquire).into(); + let state: pool::State = self.inner.state.load(Acquire).into(); if state.num_futures() == MAX_FUTURES { // No capacity return Err(SpawnError::at_capacity()); } - if state.lifecycle() == SHUTDOWN_NOW { + if state.lifecycle() == Lifecycle::ShutdownNow { // Cannot execute the future, executor is shutdown. return Err(SpawnError::shutdown()); } diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs index ea59287d.. |