summaryrefslogtreecommitdiffstats
path: root/tokio-threadpool
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-30 11:50:02 -0700
committerGitHub <noreply@github.com>2018-03-30 11:50:02 -0700
commitbaa2502ec64587710fd8db460c5751b838138a63 (patch)
tree4aa10b0ebef0f08433f493ce430c2ec9923d6e9c /tokio-threadpool
parentd4d17392fef4f6161cdfbfc4fbe1d0d05db020ad (diff)
Integrate timers with runtime. (#266)
This patch integrate the new timer implementation with the runtime by initializing a timer per worker thread. This allows minimizing the amount of synchronization needed for using timers.
Diffstat (limited to 'tokio-threadpool')
-rw-r--r--tokio-threadpool/src/builder.rs20
-rw-r--r--tokio-threadpool/src/inner.rs8
-rw-r--r--tokio-threadpool/src/worker.rs53
3 files changed, 54 insertions, 27 deletions
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs
index b2788e83..a5c719a1 100644
--- a/tokio-threadpool/src/builder.rs
+++ b/tokio-threadpool/src/builder.rs
@@ -7,7 +7,7 @@ use sleep_stack::SleepStack;
use state::State;
use thread_pool::ThreadPool;
use inner::Inner;
-use worker::Worker;
+use worker::{Worker, WorkerId};
use worker_entry::WorkerEntry;
use std::error::Error;
@@ -70,7 +70,7 @@ pub struct Builder {
pool_size: usize,
/// Generates the `Park` instances
- new_park: Box<Fn() -> BoxPark>,
+ new_park: Box<Fn(&WorkerId) -> BoxPark>,
}
impl Builder {
@@ -98,7 +98,7 @@ impl Builder {
pub fn new() -> Builder {
let num_cpus = num_cpus::get();
- let new_park = Box::new(|| {
+ let new_park = Box::new(|_: &WorkerId| {
Box::new(BoxedPark::new(DefaultPark::new()))
as BoxPark
});
@@ -277,7 +277,7 @@ impl Builder {
/// # pub fn main() {
/// // Create a thread pool with default configuration values
/// let thread_pool = Builder::new()
- /// .custom_park(|| {
+ /// .custom_park(|_| {
/// use tokio_threadpool::park::DefaultPark;
///
/// // This is the default park type that the worker would use if we
@@ -292,11 +292,14 @@ impl Builder {
/// # }
/// ```
pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
- where F: Fn() -> P + 'static,
+ where F: Fn(&WorkerId) -> P + 'static,
P: Park + Send + 'static,
P::Error: Error,
{
- self.new_park = Box::new(move || Box::new(BoxedPark::new(f())));
+ self.new_park = Box::new(move |id| {
+ Box::new(BoxedPark::new(f(id)))
+ });
+
self
}
@@ -322,8 +325,9 @@ impl Builder {
trace!("build; num-workers={}", self.pool_size);
- for _ in 0..self.pool_size {
- let park = (self.new_park)();
+ for i in 0..self.pool_size {
+ let id = WorkerId::new(i);
+ let park = (self.new_park)(&id);
let unpark = park.unpark();
workers.push(WorkerEntry::new(park, unpark));
diff --git a/tokio-threadpool/src/inner.rs b/tokio-threadpool/src/inner.rs
index 5fc55d19..cd6ba5b5 100644
--- a/tokio-threadpool/src/inner.rs
+++ b/tokio-threadpool/src/inner.rs
@@ -7,7 +7,7 @@ use sleep_stack::{
use shutdown_task::ShutdownTask;
use state::{State, SHUTDOWN_ON_IDLE, SHUTDOWN_NOW};
use task::Task;
-use worker::Worker;
+use worker::{Worker, WorkerId};
use worker_entry::WorkerEntry;
use worker_state::{
WorkerState,
@@ -189,7 +189,7 @@ impl Inner {
Worker::with_current(|worker| {
match worker {
Some(worker) => {
- let idx = worker.idx;
+ let idx = worker.id.idx;
trace!(" -> submit internal; idx={}", idx);
@@ -236,7 +236,7 @@ impl Inner {
let entry = &self.workers[idx];
if !entry.submit_external(task, state) {
- Worker::spawn(idx, inner);
+ Worker::spawn(WorkerId::new(idx), inner);
}
}
@@ -273,7 +273,7 @@ impl Inner {
}
WORKER_SHUTDOWN => {
trace!("signal_work -- spawn; idx={}", idx);
- Worker::spawn(idx, inner);
+ Worker::spawn(WorkerId::new(idx), inner);
}
_ => {}
}
diff --git a/tokio-threadpool/src/worker.rs b/tokio-threadpool/src/worker.rs
index a1439cdc..3fb18f6f 100644
--- a/tokio-threadpool/src/worker.rs
+++ b/tokio-threadpool/src/worker.rs
@@ -33,7 +33,7 @@ pub struct Worker {
pub(crate) inner: Arc<Inner>,
// WorkerEntry index
- pub(crate) idx: usize,
+ pub(crate) id: WorkerId,
// Set when the worker should finalize on drop
should_finalize: Cell<bool>,
@@ -42,17 +42,26 @@ pub struct Worker {
_p: PhantomData<Rc<()>>,
}
+/// Identifiers a thread pool worker.
+///
+/// This identifier is unique scoped by the thread pool. It is possible that
+/// different thread pool instances share worker identifier values.
+#[derive(Debug, Clone, Hash, Eq, PartialEq)]
+pub struct WorkerId {
+ pub(crate) idx: usize,
+}
+
// Pointer to the current worker info
thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _));
impl Worker {
- pub(crate) fn spawn(idx: usize, inner: &Arc<Inner>) {
- trace!("spawning new worker thread; idx={}", idx);
+ pub(crate) fn spawn(id: WorkerId, inner: &Arc<Inner>) {
+ trace!("spawning new worker thread; id={}", id.idx);
let mut th = thread::Builder::new();
if let Some(ref prefix) = inner.config.name_prefix {
- th = th.name(format!("{}{}", prefix, idx));
+ th = th.name(format!("{}{}", prefix, id.idx));
}
if let Some(stack) = inner.config.stack_size {
@@ -63,8 +72,8 @@ impl Worker {
th.spawn(move || {
let worker = Worker {
- inner: inner,
- idx: idx,
+ inner,
+ id,
should_finalize: Cell::new(false),
_p: PhantomData,
};
@@ -106,6 +115,14 @@ impl Worker {
})
}
+ /// Returns a reference to the worker's identifier.
+ ///
+ /// This identifier is unique scoped by the thread pool. It is possible that
+ /// different thread pool instances share worker identifier values.
+ pub fn id(&self) -> &WorkerId {
+ &self.id
+ }
+
/// Run the worker
///
/// This function blocks until the worker is shutting down.
@@ -261,7 +278,7 @@ impl Worker {
self.run_task(task, notify, sender);
trace!("try_steal_task -- signal_work; self={}; from={}",
- self.idx, idx);
+ self.id.idx, idx);
// Signal other workers that work is available
self.inner.signal_work(&self.inner);
@@ -370,7 +387,7 @@ impl Worker {
///
/// Returns `true` if woken up due to new work arriving.
fn sleep(&self) -> bool {
- trace!("Worker::sleep; idx={}", self.idx);
+ trace!("Worker::sleep; idx={}", self.id.idx);
let mut state: WorkerState = self.entry().state.load(Acquire).into();
@@ -409,12 +426,12 @@ impl Worker {
if !state.is_pushed() {
debug_assert!(next.is_pushed());
- trace!(" sleeping -- push to stack; idx={}", self.idx);
+ trace!(" sleeping -- push to stack; idx={}", self.id.idx);
// We obtained permission to push the worker into the
// sleeper queue.
- if let Err(_) = self.inner.push_sleeper(self.idx) {
- trace!(" sleeping -- push to stack failed; idx={}", self.idx);
+ if let Err(_) = self.inner.push_sleeper(self.id.idx) {
+ trace!(" sleeping -- push to stack failed; idx={}", self.id.idx);
// The push failed due to the pool being terminated.
//
// This is true because the "work" being woken up for is
@@ -429,7 +446,7 @@ impl Worker {
state = actual;
}
- trace!(" -> starting to sleep; idx={}", self.idx);
+ trace!(" -> starting to sleep; idx={}", self.id.idx);
let sleep_until = self.inner.config.keep_alive
.map(|dur| Instant::now() + dur);
@@ -465,7 +482,7 @@ impl Worker {
}
}
- trace!(" -> wakeup; idx={}", self.idx);
+ trace!(" -> wakeup; idx={}", self.id.idx);
// Reload the state
state = self.entry().state.load(Acquire).into();
@@ -527,13 +544,13 @@ impl Worker {
}
fn entry(&self) -> &WorkerEntry {
- &self.inner.workers[self.idx]
+ &self.inner.workers[self.id.idx]
}
}
impl Drop for Worker {
fn drop(&mut self) {
- trace!("shutting down thread; idx={}", self.idx);
+ trace!("shutting down thread; idx={}", self.id.idx);
if self.should_finalize.get() {
// Drain all work
@@ -547,3 +564,9 @@ impl Drop for Worker {
}
}
}
+
+impl WorkerId {
+ pub(crate) fn new(idx: usize) -> WorkerId {
+ WorkerId { idx }
+ }
+}