summaryrefslogtreecommitdiffstats
path: root/tokio/tests/rt_threaded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/rt_threaded.rs')
-rw-r--r--tokio/tests/rt_threaded.rs59
1 files changed, 59 insertions, 0 deletions
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs
index ad063348..b5ec96de 100644
--- a/tokio/tests/rt_threaded.rs
+++ b/tokio/tests/rt_threaded.rs
@@ -7,6 +7,7 @@ use tokio::runtime::{self, Runtime};
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
+use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
@@ -322,6 +323,64 @@ fn multi_threadpool() {
done_rx.recv().unwrap();
}
+// When `block_in_place` returns, it attempts to reclaim the yielded runtime
+// worker. In this case, the remainder of the task is on the runtime worker and
+// must take part in the cooperative task budgeting system.
+//
+// The test ensures that, when this happens, attempting to consume from a
+// channel yields occasionally even if there are values ready to receive.
+#[test]
+fn coop_and_block_in_place() {
+ use tokio::sync::mpsc;
+
+ let mut rt = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ // Setting max threads to 1 prevents another thread from claiming the
+ // runtime worker yielded as part of `block_in_place` and guarantees the
+ // same thread will reclaim the worker at the end of the
+ // `block_in_place` call.
+ .max_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ let (mut tx, mut rx) = mpsc::channel(1024);
+
+ // Fill the channel
+ for _ in 0..1024 {
+ tx.send(()).await.unwrap();
+ }
+
+ drop(tx);
+
+ tokio::spawn(async move {
+ // Block in place without doing anything
+ tokio::task::block_in_place(|| {});
+
+ // Receive all the values, this should trigger a `Pending` as the
+ // coop limit will be reached.
+ poll_fn(|cx| {
+ while let Poll::Ready(v) = {
+ tokio::pin! {
+ let fut = rx.recv();
+ }
+
+ Pin::new(&mut fut).poll(cx)
+ } {
+ if v.is_none() {
+ panic!("did not yield");
+ }
+ }
+
+ Poll::Ready(())
+ })
+ .await
+ })
+ .await
+ .unwrap();
+ });
+}
+
// Testing this does not panic
#[test]
fn max_threads() {