diff options
Diffstat (limited to 'tokio/src/runtime/thread_pool/worker.rs')
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 161 |
1 files changed, 79 insertions, 82 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index c88f9954..bc544c9b 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -9,6 +9,7 @@ use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::runtime; +use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; use crate::runtime::thread_pool::{AtomicCell, Idle}; use crate::runtime::{queue, task}; @@ -172,100 +173,96 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { (shared, launch) } -cfg_blocking! { - use crate::runtime::enter::EnterContext; - - pub(crate) fn block_in_place<F, R>(f: F) -> R - where - F: FnOnce() -> R, - { - // Try to steal the worker core back - struct Reset(coop::Budget); - - impl Drop for Reset { - fn drop(&mut self) { - CURRENT.with(|maybe_cx| { - if let Some(cx) = maybe_cx { - let core = cx.worker.core.take(); - let mut cx_core = cx.core.borrow_mut(); - assert!(cx_core.is_none()); - *cx_core = core; - - // Reset the task budget as we are re-entering the - // runtime. - coop::set(self.0); - } - }); - } +pub(crate) fn block_in_place<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + // Try to steal the worker core back + struct Reset(coop::Budget); + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT.with(|maybe_cx| { + if let Some(cx) = maybe_cx { + let core = cx.worker.core.take(); + let mut cx_core = cx.core.borrow_mut(); + assert!(cx_core.is_none()); + *cx_core = core; + + // Reset the task budget as we are re-entering the + // runtime. + coop::set(self.0); + } + }); } + } - let mut had_entered = false; + let mut had_entered = false; - CURRENT.with(|maybe_cx| { - match (crate::runtime::enter::context(), maybe_cx.is_some()) { - (EnterContext::Entered { .. }, true) => { - // We are on a thread pool runtime thread, so we just need to set up blocking. + CURRENT.with(|maybe_cx| { + match (crate::runtime::enter::context(), maybe_cx.is_some()) { + (EnterContext::Entered { .. }, true) => { + // We are on a thread pool runtime thread, so we just need to set up blocking. + had_entered = true; + } + (EnterContext::Entered { allow_blocking }, false) => { + // We are on an executor, but _not_ on the thread pool. + // That is _only_ okay if we are in a thread pool runtime's block_on method: + if allow_blocking { had_entered = true; - } - (EnterContext::Entered { allow_blocking }, false) => { - // We are on an executor, but _not_ on the thread pool. - // That is _only_ okay if we are in a thread pool runtime's block_on method: - if allow_blocking { - had_entered = true; - return; - } else { - // This probably means we are on the basic_scheduler or in a LocalSet, - // where it is _not_ okay to block. - panic!("can call blocking only when running on the multi-threaded runtime"); - } - } - (EnterContext::NotEntered, true) => { - // This is a nested call to block_in_place (we already exited). - // All the necessary setup has already been done. - return; - } - (EnterContext::NotEntered, false) => { - // We are outside of the tokio runtime, so blocking is fine. - // We can also skip all of the thread pool blocking setup steps. return; + } else { + // This probably means we are on the basic_scheduler or in a LocalSet, + // where it is _not_ okay to block. + panic!("can call blocking only when running on the multi-threaded runtime"); } } + (EnterContext::NotEntered, true) => { + // This is a nested call to block_in_place (we already exited). + // All the necessary setup has already been done. + return; + } + (EnterContext::NotEntered, false) => { + // We are outside of the tokio runtime, so blocking is fine. + // We can also skip all of the thread pool blocking setup steps. + return; + } + } - let cx = maybe_cx.expect("no .is_some() == false cases above should lead here"); - - // Get the worker core. If none is set, then blocking is fine! - let core = match cx.core.borrow_mut().take() { - Some(core) => core, - None => return, - }; - - // The parker should be set here - assert!(core.park.is_some()); + let cx = maybe_cx.expect("no .is_some() == false cases above should lead here"); - // In order to block, the core must be sent to another thread for - // execution. - // - // First, move the core back into the worker's shared core slot. - cx.worker.core.set(core); + // Get the worker core. If none is set, then blocking is fine! + let core = match cx.core.borrow_mut().take() { + Some(core) => core, + None => return, + }; - // Next, clone the worker handle and send it to a new thread for - // processing. - // - // Once the blocking task is done executing, we will attempt to - // steal the core back. - let worker = cx.worker.clone(); - runtime::spawn_blocking(move || run(worker)); - }); + // The parker should be set here + assert!(core.park.is_some()); + + // In order to block, the core must be sent to another thread for + // execution. + // + // First, move the core back into the worker's shared core slot. + cx.worker.core.set(core); + + // Next, clone the worker handle and send it to a new thread for + // processing. + // + // Once the blocking task is done executing, we will attempt to + // steal the core back. + let worker = cx.worker.clone(); + runtime::spawn_blocking(move || run(worker)); + }); - if had_entered { - // Unset the current task's budget. Blocking sections are not - // constrained by task budgets. - let _reset = Reset(coop::stop()); + if had_entered { + // Unset the current task's budget. Blocking sections are not + // constrained by task budgets. + let _reset = Reset(coop::stop()); - crate::runtime::enter::exit(f) - } else { - f() - } + crate::runtime::enter::exit(f) + } else { + f() } } |