summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-03-28 13:55:12 -0700
committerGitHub <noreply@github.com>2020-03-28 13:55:12 -0700
commitcaa7e180e43fdf914774de86f01f88e6b41f4a32 (patch)
treeacd63c2a01e11f2c728f2d7527efafbc99c66132 /tokio/src
parent7b2438e7441e98b2a3f72eb239b1c51489b7d9b8 (diff)
rt: cap fifo scheduler slot to avoid starvation (#2349)
The work-stealing scheduler includes an optimization where each worker includes a single slot to store the **last** scheduled task. Tasks in scheduler's LIFO slot are executed next. This speeds up and reduces latency with message passing patterns. Previously, this optimization was susceptible to starving other tasks in certain cases. If two tasks ping-ping between each other without ever yielding, the worker would never execute other tasks. An early PR (#2160) introduced a form of pre-emption. Each task is allocated a per-poll operation budget. Tokio resources will return ready until the budget is depleted, at which point, Tokio resources will always return `Pending`. This patch leverages the operation budget to limit the LIFO scheduler optimization. When executing tasks from the LIFO slot, the budget is **not** reset. Once the budget goes to zero, the task in the LIFO slot is pushed to the back of the queue.
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/coop.rs7
-rw-r--r--tokio/src/runtime/basic_scheduler.rs2
-rw-r--r--tokio/src/runtime/queue.rs29
-rw-r--r--tokio/src/runtime/task/core.rs2
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs72
-rw-r--r--tokio/src/task/local.rs2
6 files changed, 69 insertions, 45 deletions
diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs
index 19302559..78905e3c 100644
--- a/tokio/src/coop.rs
+++ b/tokio/src/coop.rs
@@ -98,6 +98,13 @@ where
})
}
+cfg_rt_threaded! {
+ #[inline(always)]
+ pub(crate) fn has_budget_remaining() -> bool {
+ HITS.with(|hits| hits.get() > 0)
+ }
+}
+
cfg_blocking_impl! {
/// Forcibly remove the budgeting constraints early.
pub(crate) fn stop() {
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 0419c209..30155428 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -152,7 +152,7 @@ where
};
match next {
- Some(task) => task.run(),
+ Some(task) => crate::coop::budget(|| task.run()),
None => {
// Park until the thread is signaled
scheduler.park.park().ok().expect("failed to park");
diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs
index 233fe454..81408135 100644
--- a/tokio/src/runtime/queue.rs
+++ b/tokio/src/runtime/queue.rs
@@ -13,9 +13,6 @@ use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
inner: Arc<Inner<T>>,
-
- /// LIFO slot. Cannot be stolen.
- next: Option<task::Notified<T>>,
}
/// Consumer handle. May be used from many threads.
@@ -96,7 +93,6 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let local = Local {
inner: inner.clone(),
- next: None,
};
let remote = Steal(inner);
@@ -110,26 +106,6 @@ impl<T> Local<T> {
!self.inner.is_empty()
}
- /// Returns true if the queue has an unstealable entry.
- pub(super) fn has_unstealable(&self) -> bool {
- self.next.is_some()
- }
-
- /// Push a task to the local queue. Returns `true` if a stealer should be
- /// notified.
- pub(super) fn push(&mut self, task: task::Notified<T>, inject: &Inject<T>) -> bool {
- let prev = self.next.take();
- let ret = prev.is_some();
-
- if let Some(prev) = prev {
- self.push_back(prev, inject);
- }
-
- self.next = Some(task);
-
- ret
- }
-
/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
let tail = loop {
@@ -270,11 +246,6 @@ impl<T> Local<T> {
/// Pops a task from the local queue.
pub(super) fn pop(&mut self) -> Option<task::Notified<T>> {
- // If a task is available in the FIFO slot, return that.
- if let Some(task) = self.next.take() {
- return Some(task);
- }
-
let mut head = self.inner.head.load(Acquire);
let idx = loop {
diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs
index 2092c0aa..573b9f3c 100644
--- a/tokio/src/runtime/task/core.rs
+++ b/tokio/src/runtime/task/core.rs
@@ -160,7 +160,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
let waker_ref = waker_ref::<T, S>(header);
let mut cx = Context::from_waker(&*waker_ref);
- crate::coop::budget(|| future.poll(&mut cx))
+ future.poll(&mut cx)
})
};
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index c07aa054..400e2a93 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -34,6 +34,13 @@ struct Core {
/// Used to schedule bookkeeping tasks every so often.
tick: u8,
+ /// When a task is scheduled from a worker, it is stored in this slot. The
+ /// worker will check this slot for a task **before** checking the run
+ /// queue. This effectively results in the **last** scheduled task to be run
+ /// next (LIFO). This is an optimization for message passing patterns and
+ /// helps to reduce latency.
+ lifo_slot: Option<Notified>,
+
/// The worker-local run queue.
run_queue: queue::Local<Arc<Worker>>,
@@ -128,6 +135,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
cores.push(Box::new(Core {
tick: 0,
+ lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
@@ -296,13 +304,37 @@ impl Context {
*self.core.borrow_mut() = Some(core);
// Run the task
- task.run();
-
- // Try to take the core back
- match self.core.borrow_mut().take() {
- Some(core) => Ok(core),
- None => Err(()),
- }
+ crate::coop::budget(|| {
+ task.run();
+
+ // As long as there is budget remaining and a task exists in the
+ // `lifo_slot`, then keep running.
+ loop {
+ // Check if we still have the core. If not, the core was stolen
+ // by another worker.
+ let mut core = match self.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return Err(()),
+ };
+
+ // Check for a task in the LIFO slot
+ let task = match core.lifo_slot.take() {
+ Some(task) => task,
+ None => return Ok(core),
+ };
+
+ if crate::coop::has_budget_remaining() {
+ // Run the LIFO task, then loop
+ *self.core.borrow_mut() = Some(core);
+ task.run();
+ } else {
+ // Not enough budget left to run the LIFO task, push it to
+ // the back of the queue and return.
+ core.run_queue.push_back(task, self.worker.inject());
+ return Ok(core);
+ }
+ }
+ })
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
@@ -373,12 +405,16 @@ impl Core {
/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % GLOBAL_POLL_INTERVAL == 0 {
- worker.inject().pop().or_else(|| self.run_queue.pop())
+ worker.inject().pop().or_else(|| self.next_local_task())
} else {
- self.run_queue.pop().or_else(|| worker.inject().pop())
+ self.next_local_task().or_else(|| worker.inject().pop())
}
}
+ fn next_local_task(&mut self) -> Option<Notified> {
+ self.lifo_slot.take().or_else(|| self.run_queue.pop())
+ }
+
fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
if !self.transition_to_searching(worker) {
return None;
@@ -444,9 +480,9 @@ impl Core {
/// Returns `true` if the transition happened.
fn transition_from_parked(&mut self, worker: &Worker) -> bool {
- // If there is a non-stealable task, then we must unpark regardless of
+ // If a task is in the lifo slot, then we must unpark regardless of
// being notified
- if self.run_queue.has_unstealable() {
+ if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
return true;
@@ -494,7 +530,7 @@ impl Core {
}
// Drain the queue
- while let Some(_) = self.run_queue.pop() {}
+ while let Some(_) = self.next_local_task() {}
}
fn drain_pending_drop(&mut self, worker: &Worker) {
@@ -639,7 +675,17 @@ impl Shared {
core.run_queue.push_back(task, &self.inject);
true
} else {
- core.run_queue.push(task, &self.inject)
+ // Push to the LIFO slot
+ let prev = core.lifo_slot.take();
+ let ret = prev.is_some();
+
+ if let Some(prev) = prev {
+ core.run_queue.push_back(prev, &self.inject);
+ }
+
+ core.lifo_slot = Some(task);
+
+ ret
};
// Only notify if not currently parked. If `park` is `None`, then the
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index a2d1ceb2..fcb8c789 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -398,7 +398,7 @@ impl LocalSet {
// task initially. Because `LocalSet` itself is `!Send`, and
// `spawn_local` spawns into the `LocalSet` on the current
// thread, the invariant is maintained.
- Some(task) => task.run(),
+ Some(task) => crate::coop::budget(|| task.run()),
// We have fully drained the queue of notified tasks, so the
// local future doesn't need to be notified again — it can wait
// until something else wakes a task in the local set.