diff options
author | Lucio Franco <luciofranco14@gmail.com> | 2020-10-12 13:44:54 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-12 13:44:54 -0400 |
commit | 8880222036f37c6204c8466f25e828447f16dacb (patch) | |
tree | fd623afc20f73bbce65746a3d1b1b2731ecf30a5 /tokio/src/runtime | |
parent | 0893841f31542b2b04c5050a8a4a3c45cf867e55 (diff) |
rt: Remove `threaded_scheduler()` and `basic_scheduler()` (#2876)
Co-authored-by: Alice Ryhl <alice@ryhl.io>
Co-authored-by: Carl Lerche <me@carllerche.com>
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 20 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/builder.rs | 262 | ||||
-rw-r--r-- | tokio/src/runtime/context.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/driver.rs | 20 | ||||
-rw-r--r-- | tokio/src/runtime/enter.rs | 214 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 648 | ||||
-rw-r--r-- | tokio/src/runtime/spawner.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/task/error.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/task/join.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/task/mod.rs | 34 | ||||
-rw-r--r-- | tokio/src/runtime/tests/loom_blocking.rs | 5 | ||||
-rw-r--r-- | tokio/src/runtime/tests/loom_pool.rs | 5 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/atomic_cell.rs | 1 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 161 |
17 files changed, 631 insertions, 760 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 60fe92c3..5ca84671 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -2,7 +2,7 @@ use crate::future::poll_fn; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; -use crate::sync::Notify; +use crate::sync::notify::Notify; use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index a819e9e9..fece3c27 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -3,21 +3,20 @@ //! shells. This isolates the complexity of dealing with conditional //! compilation. -cfg_blocking_impl! { - mod pool; - pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner}; +mod pool; +pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; - mod schedule; - mod shutdown; - pub(crate) mod task; +mod schedule; +mod shutdown; +pub(crate) mod task; - use crate::runtime::Builder; +use crate::runtime::Builder; - pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { - BlockingPool::new(builder, thread_cap) - } +pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) } +/* cfg_not_blocking_impl! { use crate::runtime::Builder; use std::time::Duration; @@ -40,3 +39,4 @@ cfg_not_blocking_impl! { } } } +*/ diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index df0175b1..2d44f896 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -94,10 +94,7 @@ where impl BlockingPool { pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); - #[cfg(feature = "blocking")] let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); - #[cfg(not(feature = "blocking"))] - let keep_alive = KEEP_ALIVE; BlockingPool { spawner: Spawner { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index d43666d3..8e76f52b 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,9 +1,8 @@ use crate::runtime::handle::Handle; -use crate::runtime::shell::Shell; -use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; use std::fmt; -#[cfg(feature = "blocking")] +use std::io; use std::time::Duration; /// Builds Tokio Runtime with custom configuration values. @@ -26,9 +25,8 @@ use std::time::Duration; /// /// fn main() { /// // build runtime -/// let runtime = Builder::new() -/// .threaded_scheduler() -/// .core_threads(4) +/// let runtime = Builder::new_multi_thread() +/// .worker_threads(4) /// .thread_name("my-custom-name") /// .thread_stack_size(3 * 1024 * 1024) /// .build() @@ -38,7 +36,7 @@ use std::time::Duration; /// } /// ``` pub struct Builder { - /// The task execution model to use. + /// Runtime type kind: Kind, /// Whether or not to enable the I/O driver @@ -50,7 +48,7 @@ pub struct Builder { /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. - core_threads: Option<usize>, + worker_threads: Option<usize>, /// Cap on thread usage. max_threads: usize, @@ -67,32 +65,37 @@ pub struct Builder { /// To run before each worker thread stops pub(super) before_stop: Option<Callback>, - #[cfg(feature = "blocking")] - #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option<Duration>, } pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; -#[derive(Debug, Clone, Copy)] -enum Kind { - Shell, - #[cfg(feature = "rt-core")] - Basic, +pub(crate) enum Kind { + CurrentThread, #[cfg(feature = "rt-threaded")] - ThreadPool, + MultiThread, } impl Builder { + /// TODO + pub fn new_current_thread() -> Builder { + Builder::new(Kind::CurrentThread) + } + + /// TODO + #[cfg(feature = "rt-threaded")] + pub fn new_multi_thread() -> Builder { + Builder::new(Kind::MultiThread) + } + /// Returns a new runtime builder initialized with default configuration /// values. /// /// Configuration methods can be chained on the return value. - pub fn new() -> Builder { + pub(crate) fn new(kind: Kind) -> Builder { Builder { - // No task execution by default - kind: Kind::Shell, + kind, // I/O defaults to "off" enable_io: false, @@ -101,7 +104,7 @@ impl Builder { enable_time: false, // Default to lazy auto-detection (one thread per CPU core) - core_threads: None, + worker_threads: None, max_threads: 512, @@ -115,7 +118,6 @@ impl Builder { after_start: None, before_stop: None, - #[cfg(feature = "blocking")] keep_alive: None, } } @@ -131,8 +133,7 @@ impl Builder { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); @@ -152,51 +153,63 @@ impl Builder { self } - #[deprecated(note = "In future will be replaced by core_threads method")] - /// Sets the maximum number of worker threads for the `Runtime`'s thread pool. + /// Sets the number of worker threads the `Runtime` will use. + /// + /// This should be a number between 0 and 32,768 though it is advised to + /// keep this value on the smaller side. /// - /// This must be a number between 1 and 32,768 though it is advised to keep - /// this value on the smaller side. + /// # Default /// /// The default value is the number of cores available to the system. - pub fn num_threads(&mut self, val: usize) -> &mut Self { - self.core_threads = Some(val); - self - } - - /// Sets the core number of worker threads for the `Runtime`'s thread pool. /// - /// This should be a number between 1 and 32,768 though it is advised to keep - /// this value on the smaller side. + /// # Panic /// - /// The default value is the number of cores available to the system. + /// When using the `current_thread` runtime this method will panic, since + /// those variants do not allow setting worker thread counts. /// - /// These threads will be always active and running. /// /// # Examples /// + /// ## Multi threaded runtime with 4 threads + /// /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() - /// .core_threads(4) + /// // This will spawn a work-stealing runtime with 4 worker threads. + /// let rt = runtime::Builder::new_multi_thread() + /// .worker_threads(4) /// .build() /// .unwrap(); + /// + /// rt.spawn(async move {}); /// ``` - pub fn core_threads(&mut self, val: usize) -> &mut Self { - assert_ne!(val, 0, "Core threads cannot be zero"); - self.core_threads = Some(val); + /// + /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) + /// + /// ``` + /// use tokio::runtime; + /// + /// // Create a runtime that _must_ be driven from a call + /// // to `Runtime::block_on`. + /// let rt = runtime::Builder::new_current_thread() + /// .build() + /// .unwrap(); + /// + /// // This will run the runtime and future on the current thread + /// rt.block_on(async move {}); + /// ``` + pub fn worker_threads(&mut self, val: usize) -> &mut Self { + self.worker_threads = Some(val); self } /// Specifies limit for threads, spawned by the Runtime. /// /// This is number of threads to be used by Runtime, including `core_threads` - /// Having `max_threads` less than `core_threads` results in invalid configuration + /// Having `max_threads` less than `worker_threads` results in invalid configuration /// when building multi-threaded `Runtime`, which would cause a panic. /// - /// Similarly to the `core_threads`, this number should be between 1 and 32,768. + /// Similarly to the `worker_threads`, this number should be between 0 and 32,768. /// /// The default value is 512. /// @@ -205,7 +218,6 @@ impl Builder { /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for /// blocking annotations) as `max_threads - core_threads`. pub fn max_threads(&mut self, val: usize) -> &mut Self { - assert_ne!(val, 0, "Thread limit cannot be zero"); self.max_threads = val; self } @@ -220,7 +232,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_name("my-pool") /// .build(); /// # } @@ -242,7 +254,7 @@ impl Builder { /// # use std::sync::atomic::{AtomicUsize, Ordering}; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_name_fn(|| { /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); @@ -273,8 +285,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_stack_size(32 * 1024) /// .build(); /// # } @@ -295,8 +306,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let runtime = runtime::Builder::new() - /// .threaded_scheduler() + /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_start(|| { /// println!("thread started"); /// }) @@ -322,8 +332,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let runtime = runtime::Builder::new() - /// .threaded_scheduler() + /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_stop(|| { /// println!("thread stopping"); /// }) @@ -341,26 +350,24 @@ impl Builder { /// Creates the configured `Runtime`. /// - /// The returned `ThreadPool` instance is ready to spawn tasks. + /// The returned `Runtime` instance is ready to spawn tasks. /// /// # Examples /// /// ``` /// use tokio::runtime::Builder; /// - /// let rt = Builder::new().build().unwrap(); + /// let rt = Builder::new_multi_thread().build().unwrap(); /// /// rt.block_on(async { /// println!("Hello from the Tokio runtime"); /// }); /// ``` pub fn build(&mut self) -> io::Result<Runtime> { - match self.kind { - Kind::Shell => self.build_shell_runtime(), - #[cfg(feature = "rt-core")] - Kind::Basic => self.build_basic_runtime(), + match &self.kind { + Kind::CurrentThread => self.build_basic_runtime(), #[cfg(feature = "rt-threaded")] - Kind::ThreadPool => self.build_threaded_runtime(), + Kind::MultiThread => self.build_threaded_runtime(), } } @@ -371,32 +378,6 @@ impl Builder { } } - fn build_shell_runtime(&mut self) -> io::Result<Runtime> { - use crate::runtime::Kind; - - let (driver, resources) = driver::Driver::new(self.get_cfg())?; - - let spawner = Spawner::Shell; - - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - - Ok(Runtime { - kind: Kind::Shell(Shell::new(driver)), - handle: Handle { - spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }, - blocking_pool, - }) - } - - #[cfg(feature = "blocking")] - #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] /// Sets a custom timeout for a thread in the blocking pool. /// /// By default, the timeout for a thread is set to 10 seconds. This can @@ -409,7 +390,7 @@ impl Builder { /// # use std::time::Duration; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_keep_alive(Duration::from_millis(100)) /// .build(); /// # } @@ -418,6 +399,36 @@ impl Builder { self.keep_alive = Some(duration); self } + + fn build_basic_runtime(&mut self) -> io::Result<Runtime> { + use crate::runtime::{BasicScheduler, Kind}; + + let (driver, resources) = driver::Driver::new(self.get_cfg())?; + + // And now put a single-threaded scheduler on top of the timer. When + // there are no futures ready to do something, it'll let the timer or + // the reactor to generate some new stimuli for the futures to continue + // in their life. + let scheduler = BasicScheduler::new(driver); + let spawner = Spawner::Basic(scheduler.spawner().clone()); + + // Blocking pool + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + Ok(Runtime { + kind: Kind::CurrentThread(scheduler), + handle: Handle { + spawner, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, + blocking_spawner, + }, + blocking_pool, + }) + } } cfg_io_driver! { @@ -432,7 +443,7 @@ cfg_io_driver! { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_io() /// .build() /// .unwrap(); @@ -455,7 +466,7 @@ cfg_time! { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_time() /// .build() /// .unwrap(); @@ -467,75 +478,15 @@ cfg_time! { } } -cfg_rt_core! { - impl Builder { - /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread. - /// - /// The executor and all necessary drivers will all be run on the current - /// thread during [`block_on`] calls. - /// - /// See also [the module level documentation][1], which has a section on scheduler - /// types. - /// - /// [1]: index.html#runtime-configurations - /// [`block_on`]: Runtime::block_on - pub fn basic_scheduler(&mut self) -> &mut Self { - self.kind = Kind::Basic; - self - } - - fn build_basic_runtime(&mut self) -> io::Result<Runtime> { - use crate::runtime::{BasicScheduler, Kind}; - - let (driver, resources) = driver::Driver::new(self.get_cfg())?; - - // And now put a single-threaded scheduler on top of the timer. When - // there are no futures ready to do something, it'll let the timer or - // the reactor to generate some new stimuli for the futures to continue - // in their life. - let scheduler = BasicScheduler::new(driver); - let spawner = Spawner::Basic(scheduler.spawner().clone()); - - // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - - Ok(Runtime { - kind: Kind::Basic(scheduler), - handle: Handle { - spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }, - blocking_pool, - }) - } - } -} - cfg_rt_threaded! { impl Builder { - /// Sets runtime to use a multi-threaded scheduler for executing tasks. - /// - /// See also [the module level documentation][1], which has a section on scheduler - /// types. - /// - /// [1]: index.html#runtime-configurations - pub fn threaded_scheduler(&mut self) -> &mut Self { - self.kind = Kind::ThreadPool; - self - } - fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { use crate::loom::sys::num_cpus; use crate::runtime::{Kind, ThreadPool}; use crate::runtime::park::Parker; use std::cmp; - let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); + let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -569,17 +520,10 @@ cfg_rt_threaded! { } } -impl Default for Builder { - fn default() -> Self { - Self::new() - } -} - impl fmt::Debug for Builder { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Builder") - .field("kind", &self.kind) - .field("core_threads", &self.core_threads) + .field("worker_threads", &self.worker_threads) .field("max_threads", &self.max_threads) .field( "thread_name", diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index a4f88e90..e28d5282 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -7,10 +7,8 @@ thread_local! { static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None) } -cfg_blocking_impl! { - pub(crate) fn current() -> Option<Handle> { - CONTEXT.with(|ctx| ctx.borrow().clone()) - } +pub(crate) fn current() -> Option<Handle> { + CONTEXT.with(|ctx| ctx.borrow().clone()) } cfg_io_driver! { diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index af8e17a3..6fccb11e 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -1,16 +1,18 @@ //! Abstracts out the entire chain of runtime sub-drivers into common types. -use crate::park::{Park, ParkThread}; +use crate::park::thread::ParkThread; +use crate::park::Park; + use std::io; use std::time::Duration; // ===== io driver ===== cfg_io_driver! { - type IoDriver = crate::park::Either<crate::io::driver::Driver, crate::park::ParkThread>; + type IoDriver = crate::park::either::Either<crate::io::driver::Driver, ParkThread>; pub(crate) type IoHandle = Option<crate::io::driver::Handle>; fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> { - use crate::park::Either; + use crate::park::either::Either; #[cfg(loom)] assert!(!enable); @@ -47,11 +49,11 @@ macro_rules! cfg_signal_internal_and_unix { } cfg_signal_internal_and_unix! { - type SignalDriver = crate::park::Either<crate::signal::unix::driver::Driver, IoDriver>; + type SignalDriver = crate::park::either::Either<crate::signal::unix::driver::Driver, IoDriver>; pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>; fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { - use crate::park::Either; + use crate::park::either::Either; // Enable the signal driver if IO is also enabled match io_driver { @@ -77,10 +79,10 @@ cfg_not_signal_internal! { // ===== process driver ===== cfg_process_driver! { - type ProcessDriver = crate::park::Either<crate::process::unix::driver::Driver, SignalDriver>; + type ProcessDriver = crate::park::either::Either<crate::process::unix::driver::Driver, SignalDriver>; fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> { - use crate::park::Either; + use crate::park::either::Either; // Enable the signal driver if IO is also enabled match signal_driver { @@ -104,7 +106,7 @@ cfg_not_process_driver! { // ===== time driver ===== cfg_time! { - type TimeDriver = crate::park::Either<crate::time::driver::Driver<ProcessDriver>, ProcessDriver>; + type TimeDriver = crate::park::either::Either<crate::time::driver::Driver<ProcessDriver>, ProcessDriver>; pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option<crate::time::driver::Handle>; @@ -118,7 +120,7 @@ cfg_time! { process_driver: ProcessDriver, clock: Clock, ) -> (TimeDriver, TimeHandle) { - use crate::park::Either; + use crate::park::either::Either; if enable { let driver = crate::time::driver::Driver::new(process_driver, clock); diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index f934162b..79ed4d17 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -4,8 +4,8 @@ use std::marker::PhantomData; #[derive(Debug, Clone, Copy)] pub(crate) enum EnterContext { + #[cfg_attr(not(feature = "rt-core"), allow(dead_code))] Entered { - #[allow(dead_code)] allow_blocking: bool, }, NotEntered, @@ -24,32 +24,38 @@ pub(crate) struct Enter { _p: PhantomData<RefCell<()>>, } -/// Marks the current thread as being within the dynamic extent of an -/// executor. -pub(crate) fn enter(allow_blocking: bool) -> Enter { - if let Some(enter) = try_enter(allow_blocking) { - return enter; - } +cfg_rt_core! { + use crate::park::thread::ParkError; - panic!( - "Cannot start a runtime from within a runtime. This happens \ - because a function (like `block_on`) attempted to block the \ - current thread while the thread is being used to drive \ - asynchronous tasks." - ); -} + use std::time::Duration; -/// Tries to enter a runtime context, returns `None` if already in a runtime -/// context. -pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> { - ENTERED.with(|c| { - if c.get().is_entered() { - None - } else { - c.set(EnterContext::Entered { allow_blocking }); - Some(Enter { _p: PhantomData }) + /// Marks the current thread as being within the dynamic extent of an + /// executor. + pub(crate) fn enter(allow_blocking: bool) -> Enter { + if let Some(enter) = try_enter(allow_blocking) { + return enter; } - }) + + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); + } + + /// Tries to enter a runtime context, returns `None` if already in a runtime + /// context. + pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> { + ENTERED.with(|c| { + if c.get().is_entered() { + None + } else { + c.set(EnterContext::Entered { allow_blocking }); + Some(Enter { _p: PhantomData }) + } + }) + } } // Forces the current "entered" state to be cleared while the closure @@ -59,113 +65,92 @@ pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> { // // This is hidden for a reason. Do not use without fully understanding // executors. Misuing can easily cause your program to deadlock. -#[cfg(all(feature = "rt-threaded", feature = "blocking"))] -pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset(EnterContext); - impl Drop for Reset { - fn drop(&mut self) { - ENTERED.with(|c| { - assert!(!c.get().is_entered(), "closure claimed permanent executor"); - c.set(self.0); - }); +cfg_rt_threaded! { + pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset(EnterContext); + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(!c.get().is_entered(), "closure claimed permanent executor"); + c.set(self.0); + }); + } } - } - let was = ENTERED.with(|c| { - let e = c.get(); - assert!(e.is_entered(), "asked to exit when not entered"); - c.set(EnterContext::NotEntered); - e - }); + let was = ENTERED.with(|c| { + let e = c.get(); + assert!(e.is_entered(), "asked to exit when not entered"); + c.set(EnterContext::NotEntered); + e + }); - let _reset = Reset(was); - // dropping _reset after f() will reset ENTERED - f() + let _reset = Reset(was); + // dropping _reset after f() will reset ENTERED + f() + } } -cfg_rt_core! { - cfg_rt_util! { - /// Disallow blocking in the current runtime context until the guard is dropped. - pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { - let reset = ENTERED.with(|c| { - if let EnterContext::Entered { - allow_blocking: true, - } = c.get() - { - c.set(EnterContext::Entered { - allow_blocking: false, - }); - true - } else { - false - } - }); - DisallowBlockingGuard(reset) - } +cfg_rt_util! { + /// Disallow blocking in the current runtime context until the guard is dropped. + pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { + let reset = ENTERED.with(|c| { + if let EnterContext::Entered { + allow_blocking: true, + } = c.get() + { + c.set(EnterContext::Entered { + allow_blocking: false, + }); + true + } else { + false + } + }); + DisallowBlockingGuard(reset) + } - pub(crate) struct DisallowBlockingGuard(bool); - impl Drop for DisallowBlockingGuard { - fn drop(&mut self) { - if self.0 { - // XXX: Do we want some kind of assertion here, or is "best effort" okay? - ENTERED.with(|c| { - if let EnterContext::Entered { - allow_blocking: false, - } = c.get() - { - c.set(EnterContext::Entered { - allow_blocking: true, - }); - } - }) - } + pub(crate) struct DisallowBlockingGuard(bool); + impl Drop for DisallowBlockingGuard { + fn drop(&mut self) { + if self.0 { + // XXX: Do we want some kind of assertion here, or is "best effort" okay? + ENTERED.with(|c| { + if let EnterContext::Entered { + allow_blocking: false, + } = c.get() + { + c.set(EnterContext::Entered { + allow_blocking: true, + }); + } + }) } } } } cfg_rt_threaded! { - cfg_blocking! { - /// Returns true if in a runtime context. - pub(crate) fn context() -> EnterContext { - ENTERED.with(|c| c.get()) - } + /// Returns true if in a runtime context. + pub(crate) fn context() -> EnterContext { + ENTERED.with(|c| c.get()) } } -impl Enter { - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError> - where - F: std::future::Future, - { - use crate::park::{CachedParkThread, Park}; - use std::task::Context; - use std::task::Poll::Ready; - - let mut park = CachedParkThread::new(); - let waker = park.get_unpark()?.into_waker(); - let mut cx = Context::from_waker(&waker); - - pin!(f); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } +cfg_rt_core! { + impl Enter { + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, ParkError> + where + F: std::future::Future, + { + use crate::park::thread::CachedParkThread; - park.park()?; + let mut park = CachedParkThread::new(); + park.block_on(f) } - } -} |