summaryrefslogtreecommitdiffstats
path: root/tokio/tests/rt_threaded.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-05-07 16:25:04 -0700
committerGitHub <noreply@github.com>2020-05-07 16:25:04 -0700
commitbff21aba6c1568f260fdf48d2eb3c0566e293f5a (patch)
tree31f6dd4cde77e16a840a729694d0f57f30fc4f7d /tokio/tests/rt_threaded.rs
parent07533a5255a6516b6e92c45e571a9ba497cb25d4 (diff)
rt: set task budget after block_in_place call (#2502)
In some cases, when a call to `block_in_place` completes, the runtime is reinstated on the thread. In this case, the task budget must also be set in order to avoid starving other tasks on the worker.
Diffstat (limited to 'tokio/tests/rt_threaded.rs')
-rw-r--r--tokio/tests/rt_threaded.rs59
1 files changed, 59 insertions, 0 deletions
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs
index ad063348..b5ec96de 100644
--- a/tokio/tests/rt_threaded.rs
+++ b/tokio/tests/rt_threaded.rs
@@ -7,6 +7,7 @@ use tokio::runtime::{self, Runtime};
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
+use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
@@ -322,6 +323,64 @@ fn multi_threadpool() {
done_rx.recv().unwrap();
}
+// When `block_in_place` returns, it attempts to reclaim the yielded runtime
+// worker. In this case, the remainder of the task is on the runtime worker and
+// must take part in the cooperative task budgeting system.
+//
+// The test ensures that, when this happens, attempting to consume from a
+// channel yields occasionally even if there are values ready to receive.
+#[test]
+fn coop_and_block_in_place() {
+ use tokio::sync::mpsc;
+
+ let mut rt = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ // Setting max threads to 1 prevents another thread from claiming the
+ // runtime worker yielded as part of `block_in_place` and guarantees the
+ // same thread will reclaim the worker at the end of the
+ // `block_in_place` call.
+ .max_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ let (mut tx, mut rx) = mpsc::channel(1024);
+
+ // Fill the channel
+ for _ in 0..1024 {
+ tx.send(()).await.unwrap();
+ }
+
+ drop(tx);
+
+ tokio::spawn(async move {
+ // Block in place without doing anything
+ tokio::task::block_in_place(|| {});
+
+ // Receive all the values, this should trigger a `Pending` as the
+ // coop limit will be reached.
+ poll_fn(|cx| {
+ while let Poll::Ready(v) = {
+ tokio::pin! {
+ let fut = rx.recv();
+ }
+
+ Pin::new(&mut fut).poll(cx)
+ } {
+ if v.is_none() {
+ panic!("did not yield");
+ }
+ }
+
+ Poll::Ready(())
+ })
+ .await
+ })
+ .await
+ .unwrap();
+ });
+}
+
// Testing this does not panic
#[test]
fn max_threads() {