diff options
author | Carl Lerche <me@carllerche.com> | 2020-05-07 16:25:04 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-07 16:25:04 -0700 |
commit | bff21aba6c1568f260fdf48d2eb3c0566e293f5a (patch) | |
tree | 31f6dd4cde77e16a840a729694d0f57f30fc4f7d /tokio/tests/rt_threaded.rs | |
parent | 07533a5255a6516b6e92c45e571a9ba497cb25d4 (diff) |
rt: set task budget after block_in_place call (#2502)
In some cases, when a call to `block_in_place` completes, the runtime is
reinstated on the thread. In this case, the task budget must also be set
in order to avoid starving other tasks on the worker.
Diffstat (limited to 'tokio/tests/rt_threaded.rs')
-rw-r--r-- | tokio/tests/rt_threaded.rs | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index ad063348..b5ec96de 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -7,6 +7,7 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; +use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::sync::atomic::AtomicUsize; @@ -322,6 +323,64 @@ fn multi_threadpool() { done_rx.recv().unwrap(); } +// When `block_in_place` returns, it attempts to reclaim the yielded runtime +// worker. In this case, the remainder of the task is on the runtime worker and +// must take part in the cooperative task budgeting system. +// +// The test ensures that, when this happens, attempting to consume from a +// channel yields occasionally even if there are values ready to receive. +#[test] +fn coop_and_block_in_place() { + use tokio::sync::mpsc; + + let mut rt = tokio::runtime::Builder::new() + .threaded_scheduler() + // Setting max threads to 1 prevents another thread from claiming the + // runtime worker yielded as part of `block_in_place` and guarantees the + // same thread will reclaim the worker at the end of the + // `block_in_place` call. + .max_threads(1) + .build() + .unwrap(); + + rt.block_on(async move { + let (mut tx, mut rx) = mpsc::channel(1024); + + // Fill the channel + for _ in 0..1024 { + tx.send(()).await.unwrap(); + } + + drop(tx); + + tokio::spawn(async move { + // Block in place without doing anything + tokio::task::block_in_place(|| {}); + + // Receive all the values, this should trigger a `Pending` as the + // coop limit will be reached. + poll_fn(|cx| { + while let Poll::Ready(v) = { + tokio::pin! { + let fut = rx.recv(); + } + + Pin::new(&mut fut).poll(cx) + } { + if v.is_none() { + panic!("did not yield"); + } + } + + Poll::Ready(()) + }) + .await + }) + .await + .unwrap(); + }); +} + // Testing this does not panic #[test] fn max_threads() { |