diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-28 13:55:12 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-28 13:55:12 -0700 |
commit | caa7e180e43fdf914774de86f01f88e6b41f4a32 (patch) | |
tree | acd63c2a01e11f2c728f2d7527efafbc99c66132 /tokio/tests/rt_common.rs | |
parent | 7b2438e7441e98b2a3f72eb239b1c51489b7d9b8 (diff) |
rt: cap fifo scheduler slot to avoid starvation (#2349)
The work-stealing scheduler includes an optimization where each worker
includes a single slot to store the **last** scheduled task. Tasks in
scheduler's LIFO slot are executed next. This speeds up and reduces
latency with message passing patterns.
Previously, this optimization was susceptible to starving other tasks in
certain cases. If two tasks ping-ping between each other without ever
yielding, the worker would never execute other tasks.
An early PR (#2160) introduced a form of pre-emption. Each task is
allocated a per-poll operation budget. Tokio resources will return ready
until the budget is depleted, at which point, Tokio resources will
always return `Pending`.
This patch leverages the operation budget to limit the LIFO scheduler
optimization. When executing tasks from the LIFO slot, the budget is
**not** reset. Once the budget goes to zero, the task in the LIFO slot
is pushed to the back of the queue.
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r-- | tokio/tests/rt_common.rs | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 0355a6e7..ae16721e 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -878,4 +878,58 @@ rt_test! { }).await; }); } + + // Tests that the "next task" scheduler optimization is not able to starve + // other tasks. + #[test] + fn ping_pong_saturation() { + use tokio::sync::mpsc; + + const NUM: usize = 100; + + let mut rt = rt(); + + rt.block_on(async { + let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); + + // Spawn a bunch of tasks that ping ping between each other to + // saturate the runtime. + for _ in 0..NUM { + let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx2, mut rx2) = mpsc::unbounded_channel(); + let spawned_tx = spawned_tx.clone(); + + task::spawn(async move { + spawned_tx.send(()).unwrap(); + + tx1.send(()).unwrap(); + + loop { + rx2.recv().await.unwrap(); + tx1.send(()).unwrap(); + } + }); + + task::spawn(async move { + loop { + rx1.recv().await.unwrap(); + tx2.send(()).unwrap(); + } + }); + } + + for _ in 0..NUM { + spawned_rx.recv().await.unwrap(); + } + + // spawn another task and wait for it to complete + let handle = task::spawn(async { + for _ in 0..5 { + // Yielding forces it back into the local queue. + task::yield_now().await; + } + }); + handle.await.unwrap(); + }); + } } |