diff options
Diffstat (limited to 'tokio/tests/rt_threaded.rs')
-rw-r--r-- | tokio/tests/rt_threaded.rs | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index ad063348..b5ec96de 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -7,6 +7,7 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; +use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::sync::atomic::AtomicUsize; @@ -322,6 +323,64 @@ fn multi_threadpool() { done_rx.recv().unwrap(); } +// When `block_in_place` returns, it attempts to reclaim the yielded runtime +// worker. In this case, the remainder of the task is on the runtime worker and +// must take part in the cooperative task budgeting system. +// +// The test ensures that, when this happens, attempting to consume from a +// channel yields occasionally even if there are values ready to receive. +#[test] +fn coop_and_block_in_place() { + use tokio::sync::mpsc; + + let mut rt = tokio::runtime::Builder::new() + .threaded_scheduler() + // Setting max threads to 1 prevents another thread from claiming the + // runtime worker yielded as part of `block_in_place` and guarantees the + // same thread will reclaim the worker at the end of the + // `block_in_place` call. + .max_threads(1) + .build() + .unwrap(); + + rt.block_on(async move { + let (mut tx, mut rx) = mpsc::channel(1024); + + // Fill the channel + for _ in 0..1024 { + tx.send(()).await.unwrap(); + } + + drop(tx); + + tokio::spawn(async move { + // Block in place without doing anything + tokio::task::block_in_place(|| {}); + + // Receive all the values, this should trigger a `Pending` as the + // coop limit will be reached. + poll_fn(|cx| { + while let Poll::Ready(v) = { + tokio::pin! { + let fut = rx.recv(); + } + + Pin::new(&mut fut).poll(cx) + } { + if v.is_none() { + panic!("did not yield"); + } + } + + Poll::Ready(()) + }) + .await + }) + .await + .unwrap(); + }); +} + // Testing this does not panic #[test] fn max_threads() { |