From 67c4cc03919a58076c139f0930f28c3c41dad1e5 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Thu, 16 Apr 2020 16:40:11 -0400 Subject: Support nested block_in_place (#2409) --- tokio/src/runtime/enter.rs | 16 +++++----------- tokio/src/runtime/thread_pool/worker.rs | 27 ++++++++++++++++++++++----- tokio/src/task/blocking.rs | 4 +--- tokio/tests/task_blocking.rs | 20 ++++++++++++++++++++ 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index afdb67a3..440941e1 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -51,26 +51,20 @@ pub(crate) fn exit R, R>(f: F) -> R { impl Drop for Reset { fn drop(&mut self) { ENTERED.with(|c| { + assert!(!c.get(), "closure claimed permanent executor"); c.set(true); }); } } ENTERED.with(|c| { - debug_assert!(c.get()); + assert!(c.get(), "asked to exit when not entered"); c.set(false); }); - let reset = Reset; - let ret = f(); - std::mem::forget(reset); - - ENTERED.with(|c| { - assert!(!c.get(), "closure claimed permanent executor"); - c.set(true); - }); - - ret + let _reset = Reset; + // dropping reset after f() will do c.set(true) + f() } cfg_blocking_impl! { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 400e2a93..2213ec6c 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -177,21 +177,33 @@ cfg_blocking! { F: FnOnce() -> R, { // Try to steal the worker core back - struct Reset; + struct Reset(bool); impl Drop for Reset { fn drop(&mut self) { CURRENT.with(|maybe_cx| { + if !self.0 { + // We were not the ones to give away the core, + // so we do not get to restore it either. + // This is necessary so that with a nested + // block_in_place, the inner block_in_place + // does not restore the core. + return; + } + if let Some(cx) = maybe_cx { let core = cx.worker.core.take(); - *cx.core.borrow_mut() = core; + let mut cx_core = cx.core.borrow_mut(); + assert!(cx_core.is_none()); + *cx_core = core; } }); } } + let mut had_core = false; CURRENT.with(|maybe_cx| { - let cx = maybe_cx.expect("can call blocking only when running in a spawned task"); + let cx = maybe_cx.expect("can call blocking only when running in a spawned task on the multi-threaded runtime"); // Get the worker core. If none is set, then blocking is fine! let core = match cx.core.borrow_mut().take() { @@ -212,6 +224,7 @@ cfg_blocking! { // // First, move the core back into the worker's shared core slot. cx.worker.core.set(core); + had_core = true; // Next, clone the worker handle and send it to a new thread for // processing. @@ -222,9 +235,13 @@ cfg_blocking! { runtime::spawn_blocking(move || run(worker)); }); - let _reset = Reset; + let _reset = Reset(had_core); - f() + if had_core { + crate::runtime::enter::exit(f) + } else { + f() + } } } diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index 0069b10a..9a22b0ff 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -32,9 +32,7 @@ cfg_rt_threaded! { where F: FnOnce() -> R, { - use crate::runtime::{enter, thread_pool}; - - enter::exit(|| thread_pool::block_in_place(f)) + crate::runtime::thread_pool::block_in_place(f) } } diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 4cd83d8a..edcb005d 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -27,3 +27,23 @@ async fn basic_blocking() { assert_eq!(out, "hello"); } } + +#[tokio::test(threaded_scheduler)] +async fn block_in_block() { + // Run a few times + for _ in 0..100 { + let out = assert_ok!( + tokio::spawn(async { + task::block_in_place(|| { + task::block_in_place(|| { + thread::sleep(Duration::from_millis(5)); + }); + "hello" + }) + }) + .await + ); + + assert_eq!(out, "hello"); + } +} -- cgit v1.2.3