summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/thread_pool/worker.rs')
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs161
1 files changed, 79 insertions, 82 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index c88f9954..bc544c9b 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -9,6 +9,7 @@ use crate::loom::rand::seed;
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::runtime;
+use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::{queue, task};
@@ -172,100 +173,96 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
(shared, launch)
}
-cfg_blocking! {
- use crate::runtime::enter::EnterContext;
-
- pub(crate) fn block_in_place<F, R>(f: F) -> R
- where
- F: FnOnce() -> R,
- {
- // Try to steal the worker core back
- struct Reset(coop::Budget);
-
- impl Drop for Reset {
- fn drop(&mut self) {
- CURRENT.with(|maybe_cx| {
- if let Some(cx) = maybe_cx {
- let core = cx.worker.core.take();
- let mut cx_core = cx.core.borrow_mut();
- assert!(cx_core.is_none());
- *cx_core = core;
-
- // Reset the task budget as we are re-entering the
- // runtime.
- coop::set(self.0);
- }
- });
- }
+pub(crate) fn block_in_place<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // Try to steal the worker core back
+ struct Reset(coop::Budget);
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ CURRENT.with(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ let core = cx.worker.core.take();
+ let mut cx_core = cx.core.borrow_mut();
+ assert!(cx_core.is_none());
+ *cx_core = core;
+
+ // Reset the task budget as we are re-entering the
+ // runtime.
+ coop::set(self.0);
+ }
+ });
}
+ }
- let mut had_entered = false;
+ let mut had_entered = false;
- CURRENT.with(|maybe_cx| {
- match (crate::runtime::enter::context(), maybe_cx.is_some()) {
- (EnterContext::Entered { .. }, true) => {
- // We are on a thread pool runtime thread, so we just need to set up blocking.
+ CURRENT.with(|maybe_cx| {
+ match (crate::runtime::enter::context(), maybe_cx.is_some()) {
+ (EnterContext::Entered { .. }, true) => {
+ // We are on a thread pool runtime thread, so we just need to set up blocking.
+ had_entered = true;
+ }
+ (EnterContext::Entered { allow_blocking }, false) => {
+ // We are on an executor, but _not_ on the thread pool.
+ // That is _only_ okay if we are in a thread pool runtime's block_on method:
+ if allow_blocking {
had_entered = true;
- }
- (EnterContext::Entered { allow_blocking }, false) => {
- // We are on an executor, but _not_ on the thread pool.
- // That is _only_ okay if we are in a thread pool runtime's block_on method:
- if allow_blocking {
- had_entered = true;
- return;
- } else {
- // This probably means we are on the basic_scheduler or in a LocalSet,
- // where it is _not_ okay to block.
- panic!("can call blocking only when running on the multi-threaded runtime");
- }
- }
- (EnterContext::NotEntered, true) => {
- // This is a nested call to block_in_place (we already exited).
- // All the necessary setup has already been done.
- return;
- }
- (EnterContext::NotEntered, false) => {
- // We are outside of the tokio runtime, so blocking is fine.
- // We can also skip all of the thread pool blocking setup steps.
return;
+ } else {
+ // This probably means we are on the basic_scheduler or in a LocalSet,
+ // where it is _not_ okay to block.
+ panic!("can call blocking only when running on the multi-threaded runtime");
}
}
+ (EnterContext::NotEntered, true) => {
+ // This is a nested call to block_in_place (we already exited).
+ // All the necessary setup has already been done.
+ return;
+ }
+ (EnterContext::NotEntered, false) => {
+ // We are outside of the tokio runtime, so blocking is fine.
+ // We can also skip all of the thread pool blocking setup steps.
+ return;
+ }
+ }
- let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
-
- // Get the worker core. If none is set, then blocking is fine!
- let core = match cx.core.borrow_mut().take() {
- Some(core) => core,
- None => return,
- };
-
- // The parker should be set here
- assert!(core.park.is_some());
+ let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
- // In order to block, the core must be sent to another thread for
- // execution.
- //
- // First, move the core back into the worker's shared core slot.
- cx.worker.core.set(core);
+ // Get the worker core. If none is set, then blocking is fine!
+ let core = match cx.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return,
+ };
- // Next, clone the worker handle and send it to a new thread for
- // processing.
- //
- // Once the blocking task is done executing, we will attempt to
- // steal the core back.
- let worker = cx.worker.clone();
- runtime::spawn_blocking(move || run(worker));
- });
+ // The parker should be set here
+ assert!(core.park.is_some());
+
+ // In order to block, the core must be sent to another thread for
+ // execution.
+ //
+ // First, move the core back into the worker's shared core slot.
+ cx.worker.core.set(core);
+
+ // Next, clone the worker handle and send it to a new thread for
+ // processing.
+ //
+ // Once the blocking task is done executing, we will attempt to
+ // steal the core back.
+ let worker = cx.worker.clone();
+ runtime::spawn_blocking(move || run(worker));
+ });
- if had_entered {
- // Unset the current task's budget. Blocking sections are not
- // constrained by task budgets.
- let _reset = Reset(coop::stop());
+ if had_entered {
+ // Unset the current task's budget. Blocking sections are not
+ // constrained by task budgets.
+ let _reset = Reset(coop::stop());
- crate::runtime::enter::exit(f)
- } else {
- f()
- }
+ crate::runtime::enter::exit(f)
+ } else {
+ f()
}
}