diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2018-08-09 21:14:13 +0200 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-08-09 12:14:13 -0700 |
commit | 96b556fbff07dc0df32ac3d9a9150bde1a2b75db (patch) | |
tree | 484421420a682c136ecd5504e2d824b7100d9462 /tokio-threadpool | |
parent | fd36054ae4357686c33104619bfe0d04447f69ba (diff) |
Steal multiple tasks from another worker at a time (#534)
* Steal multiple tasks from another worker at a time
* Better spinning and failing pop
* Update crossbeam-deque and simplify spinning
Diffstat (limited to 'tokio-threadpool')
-rw-r--r-- | tokio-threadpool/Cargo.toml | 2 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/entry.rs | 21 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/mod.rs | 44 |
3 files changed, 44 insertions, 23 deletions
diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml index 9f21f6ca..4c7b8653 100644 --- a/tokio-threadpool/Cargo.toml +++ b/tokio-threadpool/Cargo.toml @@ -19,7 +19,7 @@ categories = ["concurrency", "asynchronous"] [dependencies] tokio-executor = { version = "0.1.2", path = "../tokio-executor" } futures = "0.1.19" -crossbeam-deque = "0.5.0" +crossbeam-deque = "0.6.0" crossbeam-utils = "0.5.0" num_cpus = "1.2" rand = "0.5" diff --git a/tokio-threadpool/src/worker/entry.rs b/tokio-threadpool/src/worker/entry.rs index 4892342b..459e7cc1 100644 --- a/tokio-threadpool/src/worker/entry.rs +++ b/tokio-threadpool/src/worker/entry.rs @@ -188,23 +188,34 @@ impl WorkerEntry { /// /// This **must** only be called by the thread that owns the worker entry. /// This function is not `Sync`. - pub fn pop_task(&self) -> Option<Arc<Task>> { + pub fn pop_task(&self) -> deque::Pop<Arc<Task>> { self.worker.pop() } - /// Steal a task + /// Steal tasks /// /// This is called by *other* workers to steal a task for processing. This /// function is `Sync`. - pub fn steal_task(&self) -> Option<Arc<Task>> { - self.stealer.steal() + /// + /// At the same time, this method steals some additional tasks and moves + /// them into `dest` in order to balance the work distribution among + /// workers. + pub fn steal_tasks(&self, dest: &Self) -> deque::Steal<Arc<Task>> { + self.stealer.steal_many(&dest.worker) } /// Drain (and drop) all tasks that are queued for work. /// /// This is called when the pool is shutting down. pub fn drain_tasks(&self) { - while let Some(_) = self.worker.pop() { + use deque::Pop; + + loop { + match self.worker.pop() { + Pop::Data(_) => {} + Pop::Empty => break, + Pop::Retry => {} + } } } diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs index 2fc79d93..037441f3 100644 --- a/tokio-threadpool/src/worker/mod.rs +++ b/tokio-threadpool/src/worker/mod.rs @@ -217,7 +217,7 @@ impl Worker { /// /// This function blocks until the worker is shutting down. pub fn run(&self) { - const MAX_SPINS: usize = 60; + const MAX_SPINS: usize = 3; const LIGHT_SLEEP_INTERVAL: usize = 32; // Get the notifier. @@ -256,14 +256,14 @@ impl Worker { } if !consistent { - thread::yield_now(); spin_cnt = 0; continue; } spin_cnt += 1; - if spin_cnt < MAX_SPINS { + // Yield the thread several times before it actually goes to sleep. + if spin_cnt <= MAX_SPINS { thread::yield_now(); continue; } @@ -384,13 +384,16 @@ impl Worker { /// /// Returns `true` if work was found. fn try_run_owned_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool { + use deque::Pop; + // Poll the internal queue for a task to run match self.entry().pop_task() { - Some(task) => { + Pop::Data(task) => { self.run_task(task, notify, sender); true } - None => false, + Pop::Empty => false, + Pop::Retry => true, } } @@ -398,29 +401,36 @@ impl Worker { /// /// Returns `true` if work was found fn try_steal_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool { + use deque::Steal; + debug_assert!(!self.is_blocking.get()); let len = self.inner.workers.len(); let mut idx = self.inner.rand_usize() % len; + let mut found_work = false; let start = idx; loop { if idx < len { - if let Some(task) = self.inner.workers[idx].steal_task() { - trace!("stole task"); + match self.inner.workers[idx].steal_tasks(self.entry()) { + Steal::Data(task) => { + trace!("stole task"); - self.run_task(task, notify, sender); + self.run_task(task, notify, sender); - trace!("try_steal_task -- signal_work; self={}; from={}", - self.id.0, idx); + trace!("try_steal_task -- signal_work; self={}; from={}", + self.id.0, idx); - // Signal other workers that work is available - // - // TODO: Should this be called here or before - // `run_task`? - self.inner.signal_work(&self.inner); + // Signal other workers that work is available + // + // TODO: Should this be called here or before + // `run_task`? + self.inner.signal_work(&self.inner); - return true; + return true; + } + Steal::Empty => {} + Steal::Retry => found_work = true, } idx += 1; @@ -433,7 +443,7 @@ impl Worker { } } - false + found_work } fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>, sender: &mut Sender) { |