diff options
author | Carl Lerche <me@carllerche.com> | 2018-03-30 11:50:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-30 11:50:02 -0700 |
commit | baa2502ec64587710fd8db460c5751b838138a63 (patch) | |
tree | 4aa10b0ebef0f08433f493ce430c2ec9923d6e9c /tokio-threadpool | |
parent | d4d17392fef4f6161cdfbfc4fbe1d0d05db020ad (diff) |
Integrate timers with runtime. (#266)
This patch integrate the new timer implementation with the runtime by
initializing a timer per worker thread. This allows minimizing the
amount of synchronization needed for using timers.
Diffstat (limited to 'tokio-threadpool')
-rw-r--r-- | tokio-threadpool/src/builder.rs | 20 | ||||
-rw-r--r-- | tokio-threadpool/src/inner.rs | 8 | ||||
-rw-r--r-- | tokio-threadpool/src/worker.rs | 53 |
3 files changed, 54 insertions, 27 deletions
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index b2788e83..a5c719a1 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -7,7 +7,7 @@ use sleep_stack::SleepStack; use state::State; use thread_pool::ThreadPool; use inner::Inner; -use worker::Worker; +use worker::{Worker, WorkerId}; use worker_entry::WorkerEntry; use std::error::Error; @@ -70,7 +70,7 @@ pub struct Builder { pool_size: usize, /// Generates the `Park` instances - new_park: Box<Fn() -> BoxPark>, + new_park: Box<Fn(&WorkerId) -> BoxPark>, } impl Builder { @@ -98,7 +98,7 @@ impl Builder { pub fn new() -> Builder { let num_cpus = num_cpus::get(); - let new_park = Box::new(|| { + let new_park = Box::new(|_: &WorkerId| { Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark }); @@ -277,7 +277,7 @@ impl Builder { /// # pub fn main() { /// // Create a thread pool with default configuration values /// let thread_pool = Builder::new() - /// .custom_park(|| { + /// .custom_park(|_| { /// use tokio_threadpool::park::DefaultPark; /// /// // This is the default park type that the worker would use if we @@ -292,11 +292,14 @@ impl Builder { /// # } /// ``` pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self - where F: Fn() -> P + 'static, + where F: Fn(&WorkerId) -> P + 'static, P: Park + Send + 'static, P::Error: Error, { - self.new_park = Box::new(move || Box::new(BoxedPark::new(f()))); + self.new_park = Box::new(move |id| { + Box::new(BoxedPark::new(f(id))) + }); + self } @@ -322,8 +325,9 @@ impl Builder { trace!("build; num-workers={}", self.pool_size); - for _ in 0..self.pool_size { - let park = (self.new_park)(); + for i in 0..self.pool_size { + let id = WorkerId::new(i); + let park = (self.new_park)(&id); let unpark = park.unpark(); workers.push(WorkerEntry::new(park, unpark)); diff --git a/tokio-threadpool/src/inner.rs b/tokio-threadpool/src/inner.rs index 5fc55d19..cd6ba5b5 100644 --- a/tokio-threadpool/src/inner.rs +++ b/tokio-threadpool/src/inner.rs @@ -7,7 +7,7 @@ use sleep_stack::{ use shutdown_task::ShutdownTask; use state::{State, SHUTDOWN_ON_IDLE, SHUTDOWN_NOW}; use task::Task; -use worker::Worker; +use worker::{Worker, WorkerId}; use worker_entry::WorkerEntry; use worker_state::{ WorkerState, @@ -189,7 +189,7 @@ impl Inner { Worker::with_current(|worker| { match worker { Some(worker) => { - let idx = worker.idx; + let idx = worker.id.idx; trace!(" -> submit internal; idx={}", idx); @@ -236,7 +236,7 @@ impl Inner { let entry = &self.workers[idx]; if !entry.submit_external(task, state) { - Worker::spawn(idx, inner); + Worker::spawn(WorkerId::new(idx), inner); } } @@ -273,7 +273,7 @@ impl Inner { } WORKER_SHUTDOWN => { trace!("signal_work -- spawn; idx={}", idx); - Worker::spawn(idx, inner); + Worker::spawn(WorkerId::new(idx), inner); } _ => {} } diff --git a/tokio-threadpool/src/worker.rs b/tokio-threadpool/src/worker.rs index a1439cdc..3fb18f6f 100644 --- a/tokio-threadpool/src/worker.rs +++ b/tokio-threadpool/src/worker.rs @@ -33,7 +33,7 @@ pub struct Worker { pub(crate) inner: Arc<Inner>, // WorkerEntry index - pub(crate) idx: usize, + pub(crate) id: WorkerId, // Set when the worker should finalize on drop should_finalize: Cell<bool>, @@ -42,17 +42,26 @@ pub struct Worker { _p: PhantomData<Rc<()>>, } +/// Identifiers a thread pool worker. +/// +/// This identifier is unique scoped by the thread pool. It is possible that +/// different thread pool instances share worker identifier values. +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct WorkerId { + pub(crate) idx: usize, +} + // Pointer to the current worker info thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _)); impl Worker { - pub(crate) fn spawn(idx: usize, inner: &Arc<Inner>) { - trace!("spawning new worker thread; idx={}", idx); + pub(crate) fn spawn(id: WorkerId, inner: &Arc<Inner>) { + trace!("spawning new worker thread; id={}", id.idx); let mut th = thread::Builder::new(); if let Some(ref prefix) = inner.config.name_prefix { - th = th.name(format!("{}{}", prefix, idx)); + th = th.name(format!("{}{}", prefix, id.idx)); } if let Some(stack) = inner.config.stack_size { @@ -63,8 +72,8 @@ impl Worker { th.spawn(move || { let worker = Worker { - inner: inner, - idx: idx, + inner, + id, should_finalize: Cell::new(false), _p: PhantomData, }; @@ -106,6 +115,14 @@ impl Worker { }) } + /// Returns a reference to the worker's identifier. + /// + /// This identifier is unique scoped by the thread pool. It is possible that + /// different thread pool instances share worker identifier values. + pub fn id(&self) -> &WorkerId { + &self.id + } + /// Run the worker /// /// This function blocks until the worker is shutting down. @@ -261,7 +278,7 @@ impl Worker { self.run_task(task, notify, sender); trace!("try_steal_task -- signal_work; self={}; from={}", - self.idx, idx); + self.id.idx, idx); // Signal other workers that work is available self.inner.signal_work(&self.inner); @@ -370,7 +387,7 @@ impl Worker { /// /// Returns `true` if woken up due to new work arriving. fn sleep(&self) -> bool { - trace!("Worker::sleep; idx={}", self.idx); + trace!("Worker::sleep; idx={}", self.id.idx); let mut state: WorkerState = self.entry().state.load(Acquire).into(); @@ -409,12 +426,12 @@ impl Worker { if !state.is_pushed() { debug_assert!(next.is_pushed()); - trace!(" sleeping -- push to stack; idx={}", self.idx); + trace!(" sleeping -- push to stack; idx={}", self.id.idx); // We obtained permission to push the worker into the // sleeper queue. - if let Err(_) = self.inner.push_sleeper(self.idx) { - trace!(" sleeping -- push to stack failed; idx={}", self.idx); + if let Err(_) = self.inner.push_sleeper(self.id.idx) { + trace!(" sleeping -- push to stack failed; idx={}", self.id.idx); // The push failed due to the pool being terminated. // // This is true because the "work" being woken up for is @@ -429,7 +446,7 @@ impl Worker { state = actual; } - trace!(" -> starting to sleep; idx={}", self.idx); + trace!(" -> starting to sleep; idx={}", self.id.idx); let sleep_until = self.inner.config.keep_alive .map(|dur| Instant::now() + dur); @@ -465,7 +482,7 @@ impl Worker { } } - trace!(" -> wakeup; idx={}", self.idx); + trace!(" -> wakeup; idx={}", self.id.idx); // Reload the state state = self.entry().state.load(Acquire).into(); @@ -527,13 +544,13 @@ impl Worker { } fn entry(&self) -> &WorkerEntry { - &self.inner.workers[self.idx] + &self.inner.workers[self.id.idx] } } impl Drop for Worker { fn drop(&mut self) { - trace!("shutting down thread; idx={}", self.idx); + trace!("shutting down thread; idx={}", self.id.idx); if self.should_finalize.get() { // Drain all work @@ -547,3 +564,9 @@ impl Drop for Worker { } } } + +impl WorkerId { + pub(crate) fn new(idx: usize) -> WorkerId { + WorkerId { idx } + } +} |