diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-06 09:51:15 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-06 09:51:15 -0800 |
commit | 1a7f6fb201c04e8bb02c6e59ddaabadceb8413c2 (patch) | |
tree | 22cf0775346af87d0dd59cab8471e6df23589f9d /tokio | |
parent | 0da23aad772afb22db8edf73ac0f034c5ada3bde (diff) |
simplify enter (#1736)
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/runtime/current_thread/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/enter.rs | 53 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 7 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/pool.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/shutdown.rs | 19 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 3 |
6 files changed, 30 insertions, 58 deletions
diff --git a/tokio/src/runtime/current_thread/mod.rs b/tokio/src/runtime/current_thread/mod.rs index 6c5fdda8..d2cf4581 100644 --- a/tokio/src/runtime/current_thread/mod.rs +++ b/tokio/src/runtime/current_thread/mod.rs @@ -139,8 +139,7 @@ where let scheduler = &*self.scheduler; runtime::global::with_current_thread(scheduler, || { - let mut _enter = - runtime::enter::enter().expect("attempting to block while on a Tokio executor"); + let mut _enter = runtime::enter(); let raw_waker = RawWaker::new( scheduler as *const Scheduler as *const (), diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 78ddf8cc..5206c7e3 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -1,5 +1,4 @@ use std::cell::{Cell, RefCell}; -use std::error::Error; use std::fmt; use std::future::Future; use std::marker::PhantomData; @@ -13,50 +12,30 @@ pub(crate) struct Enter { _p: PhantomData<RefCell<()>>, } -/// An error returned by `enter` if an execution scope has already been -/// entered. -pub(crate) struct EnterError { - _a: (), -} - -impl fmt::Debug for EnterError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("EnterError") - .field("reason", &format!("{}", self)) - .finish() +/// Marks the current thread as being within the dynamic extent of an +/// executor. +pub(crate) fn enter() -> Enter { + if let Some(enter) = try_enter() { + return enter; } -} -impl fmt::Display for EnterError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "attempted to run an executor while another executor is already running" - ) - } + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); } -impl Error for EnterError {} - -/// Marks the current thread as being within the dynamic extent of an -/// executor. -/// -/// Executor implementations should call this function before blocking the -/// thread. If `None` is returned, the executor should fail by panicking or -/// taking some other action without blocking the current thread. This prevents -/// deadlocks due to multiple executors competing for the same thread. -/// -/// # Error -/// -/// Returns an error if the current thread is already marked -pub(crate) fn enter() -> Result<Enter, EnterError> { +/// Tries to enter a runtime context, returns `None` if already in a runtime +/// context. +pub(crate) fn try_enter() -> Option<Enter> { ENTERED.with(|c| { if c.get() { - Err(EnterError { _a: () }) + None } else { c.set(true); - - Ok(Enter { _p: PhantomData }) + Some(Enter { _p: PhantomData }) } }) } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index ac5bee97..9dbc857a 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -146,6 +146,8 @@ mod current_thread; #[cfg(feature = "blocking")] mod enter; +#[cfg(feature = "blocking")] +pub(crate) use self::enter::enter; mod global; pub use self::global::spawn; @@ -340,10 +342,7 @@ impl Runtime { let kind = &mut self.kind; blocking::with_pool(&self.blocking_pool, || match kind { - Kind::Shell => { - let mut enter = enter::enter().unwrap(); - enter.block_on(future) - } + Kind::Shell => enter().block_on(future), #[cfg(feature = "rt-current-thread")] Kind::CurrentThread(exec) => exec.block_on(future), #[cfg(feature = "rt-full")] diff --git a/tokio/src/runtime/thread_pool/pool.rs b/tokio/src/runtime/thread_pool/pool.rs index 06d5a61a..a6ef4346 100644 --- a/tokio/src/runtime/thread_pool/pool.rs +++ b/tokio/src/runtime/thread_pool/pool.rs @@ -55,8 +55,7 @@ impl ThreadPool { F: Future, { crate::runtime::global::with_thread_pool(self.spawner(), || { - let mut enter = crate::runtime::enter::enter() - .expect("attempting to block while on a Tokio executor"); + let mut enter = crate::runtime::enter(); crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || { enter.block_on(future) }) diff --git a/tokio/src/runtime/thread_pool/shutdown.rs b/tokio/src/runtime/thread_pool/shutdown.rs index 7f554898..d9f5eb0f 100644 --- a/tokio/src/runtime/thread_pool/shutdown.rs +++ b/tokio/src/runtime/thread_pool/shutdown.rs @@ -27,18 +27,15 @@ pub(super) fn channel() -> (Sender, Receiver) { impl Receiver { /// Block the current thread until all `Sender` handles drop. pub(crate) fn wait(&mut self) { - use crate::runtime::enter::enter; - - let mut e = match enter() { - Ok(e) => e, - Err(_) => { - if std::thread::panicking() { - // Already panicking, avoid a double panic - return; - } else { - panic!("cannot block on shutdown from the Tokio runtime"); - } + use crate::runtime::enter::{enter, try_enter}; + + let mut e = if std::thread::panicking() { + match try_enter() { + Some(enter) => enter, + _ => return, } + } else { + enter() }; // The oneshot completes with an Err diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 986f4534..5abdba24 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -131,8 +131,7 @@ where // Track the current worker current::set(&pool, index, || { - let _enter = - crate::runtime::enter::enter().expect("executor already running on thread"); + let _enter = crate::runtime::enter(); crate::runtime::global::with_thread_pool(&spawner, || { crate::runtime::blocking::with_pool(blocking, || { |