summaryrefslogtreecommitdiffstats
path: root/tokio-threadpool
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2018-08-09 21:14:13 +0200
committerCarl Lerche <me@carllerche.com>2018-08-09 12:14:13 -0700
commit96b556fbff07dc0df32ac3d9a9150bde1a2b75db (patch)
tree484421420a682c136ecd5504e2d824b7100d9462 /tokio-threadpool
parentfd36054ae4357686c33104619bfe0d04447f69ba (diff)
Steal multiple tasks from another worker at a time (#534)
* Steal multiple tasks from another worker at a time * Better spinning and failing pop * Update crossbeam-deque and simplify spinning
Diffstat (limited to 'tokio-threadpool')
-rw-r--r--tokio-threadpool/Cargo.toml2
-rw-r--r--tokio-threadpool/src/worker/entry.rs21
-rw-r--r--tokio-threadpool/src/worker/mod.rs44
3 files changed, 44 insertions, 23 deletions
diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml
index 9f21f6ca..4c7b8653 100644
--- a/tokio-threadpool/Cargo.toml
+++ b/tokio-threadpool/Cargo.toml
@@ -19,7 +19,7 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
-crossbeam-deque = "0.5.0"
+crossbeam-deque = "0.6.0"
crossbeam-utils = "0.5.0"
num_cpus = "1.2"
rand = "0.5"
diff --git a/tokio-threadpool/src/worker/entry.rs b/tokio-threadpool/src/worker/entry.rs
index 4892342b..459e7cc1 100644
--- a/tokio-threadpool/src/worker/entry.rs
+++ b/tokio-threadpool/src/worker/entry.rs
@@ -188,23 +188,34 @@ impl WorkerEntry {
///
/// This **must** only be called by the thread that owns the worker entry.
/// This function is not `Sync`.
- pub fn pop_task(&self) -> Option<Arc<Task>> {
+ pub fn pop_task(&self) -> deque::Pop<Arc<Task>> {
self.worker.pop()
}
- /// Steal a task
+ /// Steal tasks
///
/// This is called by *other* workers to steal a task for processing. This
/// function is `Sync`.
- pub fn steal_task(&self) -> Option<Arc<Task>> {
- self.stealer.steal()
+ ///
+ /// At the same time, this method steals some additional tasks and moves
+ /// them into `dest` in order to balance the work distribution among
+ /// workers.
+ pub fn steal_tasks(&self, dest: &Self) -> deque::Steal<Arc<Task>> {
+ self.stealer.steal_many(&dest.worker)
}
/// Drain (and drop) all tasks that are queued for work.
///
/// This is called when the pool is shutting down.
pub fn drain_tasks(&self) {
- while let Some(_) = self.worker.pop() {
+ use deque::Pop;
+
+ loop {
+ match self.worker.pop() {
+ Pop::Data(_) => {}
+ Pop::Empty => break,
+ Pop::Retry => {}
+ }
}
}
diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs
index 2fc79d93..037441f3 100644
--- a/tokio-threadpool/src/worker/mod.rs
+++ b/tokio-threadpool/src/worker/mod.rs
@@ -217,7 +217,7 @@ impl Worker {
///
/// This function blocks until the worker is shutting down.
pub fn run(&self) {
- const MAX_SPINS: usize = 60;
+ const MAX_SPINS: usize = 3;
const LIGHT_SLEEP_INTERVAL: usize = 32;
// Get the notifier.
@@ -256,14 +256,14 @@ impl Worker {
}
if !consistent {
- thread::yield_now();
spin_cnt = 0;
continue;
}
spin_cnt += 1;
- if spin_cnt < MAX_SPINS {
+ // Yield the thread several times before it actually goes to sleep.
+ if spin_cnt <= MAX_SPINS {
thread::yield_now();
continue;
}
@@ -384,13 +384,16 @@ impl Worker {
///
/// Returns `true` if work was found.
fn try_run_owned_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool {
+ use deque::Pop;
+
// Poll the internal queue for a task to run
match self.entry().pop_task() {
- Some(task) => {
+ Pop::Data(task) => {
self.run_task(task, notify, sender);
true
}
- None => false,
+ Pop::Empty => false,
+ Pop::Retry => true,
}
}
@@ -398,29 +401,36 @@ impl Worker {
///
/// Returns `true` if work was found
fn try_steal_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool {
+ use deque::Steal;
+
debug_assert!(!self.is_blocking.get());
let len = self.inner.workers.len();
let mut idx = self.inner.rand_usize() % len;
+ let mut found_work = false;
let start = idx;
loop {
if idx < len {
- if let Some(task) = self.inner.workers[idx].steal_task() {
- trace!("stole task");
+ match self.inner.workers[idx].steal_tasks(self.entry()) {
+ Steal::Data(task) => {
+ trace!("stole task");
- self.run_task(task, notify, sender);
+ self.run_task(task, notify, sender);
- trace!("try_steal_task -- signal_work; self={}; from={}",
- self.id.0, idx);
+ trace!("try_steal_task -- signal_work; self={}; from={}",
+ self.id.0, idx);
- // Signal other workers that work is available
- //
- // TODO: Should this be called here or before
- // `run_task`?
- self.inner.signal_work(&self.inner);
+ // Signal other workers that work is available
+ //
+ // TODO: Should this be called here or before
+ // `run_task`?
+ self.inner.signal_work(&self.inner);
- return true;
+ return true;
+ }
+ Steal::Empty => {}
+ Steal::Retry => found_work = true,
}
idx += 1;
@@ -433,7 +443,7 @@ impl Worker {
}
}
- false
+ found_work
}
fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>, sender: &mut Sender) {