summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Gjengset <jon@thesquareplanet.com>2020-04-16 16:40:11 -0400
committerGitHub <noreply@github.com>2020-04-16 16:40:11 -0400
commit67c4cc03919a58076c139f0930f28c3c41dad1e5 (patch)
tree01a0d9f5fd68e9dbfe2a64aff62b4eaddb04e70b
parent8381dff39b845bc3b74c5e5c1c7ce78e39c34fe2 (diff)
Support nested block_in_place (#2409)
-rw-r--r--tokio/src/runtime/enter.rs16
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs27
-rw-r--r--tokio/src/task/blocking.rs4
-rw-r--r--tokio/tests/task_blocking.rs20
4 files changed, 48 insertions, 19 deletions
diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs
index afdb67a3..440941e1 100644
--- a/tokio/src/runtime/enter.rs
+++ b/tokio/src/runtime/enter.rs
@@ -51,26 +51,20 @@ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
impl Drop for Reset {
fn drop(&mut self) {
ENTERED.with(|c| {
+ assert!(!c.get(), "closure claimed permanent executor");
c.set(true);
});
}
}
ENTERED.with(|c| {
- debug_assert!(c.get());
+ assert!(c.get(), "asked to exit when not entered");
c.set(false);
});
- let reset = Reset;
- let ret = f();
- std::mem::forget(reset);
-
- ENTERED.with(|c| {
- assert!(!c.get(), "closure claimed permanent executor");
- c.set(true);
- });
-
- ret
+ let _reset = Reset;
+ // dropping reset after f() will do c.set(true)
+ f()
}
cfg_blocking_impl! {
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 400e2a93..2213ec6c 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -177,21 +177,33 @@ cfg_blocking! {
F: FnOnce() -> R,
{
// Try to steal the worker core back
- struct Reset;
+ struct Reset(bool);
impl Drop for Reset {
fn drop(&mut self) {
CURRENT.with(|maybe_cx| {
+ if !self.0 {
+ // We were not the ones to give away the core,
+ // so we do not get to restore it either.
+ // This is necessary so that with a nested
+ // block_in_place, the inner block_in_place
+ // does not restore the core.
+ return;
+ }
+
if let Some(cx) = maybe_cx {
let core = cx.worker.core.take();
- *cx.core.borrow_mut() = core;
+ let mut cx_core = cx.core.borrow_mut();
+ assert!(cx_core.is_none());
+ *cx_core = core;
}
});
}
}
+ let mut had_core = false;
CURRENT.with(|maybe_cx| {
- let cx = maybe_cx.expect("can call blocking only when running in a spawned task");
+ let cx = maybe_cx.expect("can call blocking only when running in a spawned task on the multi-threaded runtime");
// Get the worker core. If none is set, then blocking is fine!
let core = match cx.core.borrow_mut().take() {
@@ -212,6 +224,7 @@ cfg_blocking! {
//
// First, move the core back into the worker's shared core slot.
cx.worker.core.set(core);
+ had_core = true;
// Next, clone the worker handle and send it to a new thread for
// processing.
@@ -222,9 +235,13 @@ cfg_blocking! {
runtime::spawn_blocking(move || run(worker));
});
- let _reset = Reset;
+ let _reset = Reset(had_core);
- f()
+ if had_core {
+ crate::runtime::enter::exit(f)
+ } else {
+ f()
+ }
}
}
diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs
index 0069b10a..9a22b0ff 100644
--- a/tokio/src/task/blocking.rs
+++ b/tokio/src/task/blocking.rs
@@ -32,9 +32,7 @@ cfg_rt_threaded! {
where
F: FnOnce() -> R,
{
- use crate::runtime::{enter, thread_pool};
-
- enter::exit(|| thread_pool::block_in_place(f))
+ crate::runtime::thread_pool::block_in_place(f)
}
}
diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs
index 4cd83d8a..edcb005d 100644
--- a/tokio/tests/task_blocking.rs
+++ b/tokio/tests/task_blocking.rs
@@ -27,3 +27,23 @@ async fn basic_blocking() {
assert_eq!(out, "hello");
}
}
+
+#[tokio::test(threaded_scheduler)]
+async fn block_in_block() {
+ // Run a few times
+ for _ in 0..100 {
+ let out = assert_ok!(
+ tokio::spawn(async {
+ task::block_in_place(|| {
+ task::block_in_place(|| {
+ thread::sleep(Duration::from_millis(5));
+ });
+ "hello"
+ })
+ })
+ .await
+ );
+
+ assert_eq!(out, "hello");
+ }
+}