summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorLucio Franco <luciofranco14@gmail.com>2020-10-12 13:44:54 -0400
committerGitHub <noreply@github.com>2020-10-12 13:44:54 -0400
commit8880222036f37c6204c8466f25e828447f16dacb (patch)
treefd623afc20f73bbce65746a3d1b1b2731ecf30a5 /tokio/src/runtime
parent0893841f31542b2b04c5050a8a4a3c45cf867e55 (diff)
rt: Remove `threaded_scheduler()` and `basic_scheduler()` (#2876)
Co-authored-by: Alice Ryhl <alice@ryhl.io> Co-authored-by: Carl Lerche <me@carllerche.com>
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs2
-rw-r--r--tokio/src/runtime/blocking/mod.rs20
-rw-r--r--tokio/src/runtime/blocking/pool.rs3
-rw-r--r--tokio/src/runtime/builder.rs262
-rw-r--r--tokio/src/runtime/context.rs6
-rw-r--r--tokio/src/runtime/driver.rs20
-rw-r--r--tokio/src/runtime/enter.rs214
-rw-r--r--tokio/src/runtime/mod.rs648
-rw-r--r--tokio/src/runtime/spawner.rs2
-rw-r--r--tokio/src/runtime/task/error.rs2
-rw-r--r--tokio/src/runtime/task/join.rs2
-rw-r--r--tokio/src/runtime/task/mod.rs34
-rw-r--r--tokio/src/runtime/tests/loom_blocking.rs5
-rw-r--r--tokio/src/runtime/tests/loom_pool.rs5
-rw-r--r--tokio/src/runtime/thread_pool/atomic_cell.rs1
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs4
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs161
17 files changed, 631 insertions, 760 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 60fe92c3..5ca84671 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -2,7 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
-use crate::sync::Notify;
+use crate::sync::notify::Notify;
use crate::util::linked_list::{Link, LinkedList};
use crate::util::{waker_ref, Wake, WakerRef};
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index a819e9e9..fece3c27 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -3,21 +3,20 @@
//! shells. This isolates the complexity of dealing with conditional
//! compilation.
-cfg_blocking_impl! {
- mod pool;
- pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner};
+mod pool;
+pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
- mod schedule;
- mod shutdown;
- pub(crate) mod task;
+mod schedule;
+mod shutdown;
+pub(crate) mod task;
- use crate::runtime::Builder;
+use crate::runtime::Builder;
- pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
- BlockingPool::new(builder, thread_cap)
- }
+pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
}
+/*
cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;
@@ -40,3 +39,4 @@ cfg_not_blocking_impl! {
}
}
}
+*/
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index df0175b1..2d44f896 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -94,10 +94,7 @@ where
impl BlockingPool {
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
- #[cfg(feature = "blocking")]
let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
- #[cfg(not(feature = "blocking"))]
- let keep_alive = KEEP_ALIVE;
BlockingPool {
spawner: Spawner {
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index d43666d3..8e76f52b 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,9 +1,8 @@
use crate::runtime::handle::Handle;
-use crate::runtime::shell::Shell;
-use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
+use crate::runtime::{blocking, driver, Callback, Runtime, Spawner};
use std::fmt;
-#[cfg(feature = "blocking")]
+use std::io;
use std::time::Duration;
/// Builds Tokio Runtime with custom configuration values.
@@ -26,9 +25,8 @@ use std::time::Duration;
///
/// fn main() {
/// // build runtime
-/// let runtime = Builder::new()
-/// .threaded_scheduler()
-/// .core_threads(4)
+/// let runtime = Builder::new_multi_thread()
+/// .worker_threads(4)
/// .thread_name("my-custom-name")
/// .thread_stack_size(3 * 1024 * 1024)
/// .build()
@@ -38,7 +36,7 @@ use std::time::Duration;
/// }
/// ```
pub struct Builder {
- /// The task execution model to use.
+ /// Runtime type
kind: Kind,
/// Whether or not to enable the I/O driver
@@ -50,7 +48,7 @@ pub struct Builder {
/// The number of worker threads, used by Runtime.
///
/// Only used when not using the current-thread executor.
- core_threads: Option<usize>,
+ worker_threads: Option<usize>,
/// Cap on thread usage.
max_threads: usize,
@@ -67,32 +65,37 @@ pub struct Builder {
/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,
- #[cfg(feature = "blocking")]
- #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))]
/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
}
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
-#[derive(Debug, Clone, Copy)]
-enum Kind {
- Shell,
- #[cfg(feature = "rt-core")]
- Basic,
+pub(crate) enum Kind {
+ CurrentThread,
#[cfg(feature = "rt-threaded")]
- ThreadPool,
+ MultiThread,
}
impl Builder {
+ /// TODO
+ pub fn new_current_thread() -> Builder {
+ Builder::new(Kind::CurrentThread)
+ }
+
+ /// TODO
+ #[cfg(feature = "rt-threaded")]
+ pub fn new_multi_thread() -> Builder {
+ Builder::new(Kind::MultiThread)
+ }
+
/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
- pub fn new() -> Builder {
+ pub(crate) fn new(kind: Kind) -> Builder {
Builder {
- // No task execution by default
- kind: Kind::Shell,
+ kind,
// I/O defaults to "off"
enable_io: false,
@@ -101,7 +104,7 @@ impl Builder {
enable_time: false,
// Default to lazy auto-detection (one thread per CPU core)
- core_threads: None,
+ worker_threads: None,
max_threads: 512,
@@ -115,7 +118,6 @@ impl Builder {
after_start: None,
before_stop: None,
- #[cfg(feature = "blocking")]
keep_alive: None,
}
}
@@ -131,8 +133,7 @@ impl Builder {
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .enable_all()
/// .build()
/// .unwrap();
@@ -152,51 +153,63 @@ impl Builder {
self
}
- #[deprecated(note = "In future will be replaced by core_threads method")]
- /// Sets the maximum number of worker threads for the `Runtime`'s thread pool.
+ /// Sets the number of worker threads the `Runtime` will use.
+ ///
+ /// This should be a number between 0 and 32,768 though it is advised to
+ /// keep this value on the smaller side.
///
- /// This must be a number between 1 and 32,768 though it is advised to keep
- /// this value on the smaller side.
+ /// # Default
///
/// The default value is the number of cores available to the system.
- pub fn num_threads(&mut self, val: usize) -> &mut Self {
- self.core_threads = Some(val);
- self
- }
-
- /// Sets the core number of worker threads for the `Runtime`'s thread pool.
///
- /// This should be a number between 1 and 32,768 though it is advised to keep
- /// this value on the smaller side.
+ /// # Panic
///
- /// The default value is the number of cores available to the system.
+ /// When using the `current_thread` runtime this method will panic, since
+ /// those variants do not allow setting worker thread counts.
///
- /// These threads will be always active and running.
///
/// # Examples
///
+ /// ## Multi threaded runtime with 4 threads
+ ///
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
- /// .threaded_scheduler()
- /// .core_threads(4)
+ /// // This will spawn a work-stealing runtime with 4 worker threads.
+ /// let rt = runtime::Builder::new_multi_thread()
+ /// .worker_threads(4)
/// .build()
/// .unwrap();
+ ///
+ /// rt.spawn(async move {});
/// ```
- pub fn core_threads(&mut self, val: usize) -> &mut Self {
- assert_ne!(val, 0, "Core threads cannot be zero");
- self.core_threads = Some(val);
+ ///
+ /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// // Create a runtime that _must_ be driven from a call
+ /// // to `Runtime::block_on`.
+ /// let rt = runtime::Builder::new_current_thread()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// // This will run the runtime and future on the current thread
+ /// rt.block_on(async move {});
+ /// ```
+ pub fn worker_threads(&mut self, val: usize) -> &mut Self {
+ self.worker_threads = Some(val);
self
}
/// Specifies limit for threads, spawned by the Runtime.
///
/// This is number of threads to be used by Runtime, including `core_threads`
- /// Having `max_threads` less than `core_threads` results in invalid configuration
+ /// Having `max_threads` less than `worker_threads` results in invalid configuration
/// when building multi-threaded `Runtime`, which would cause a panic.
///
- /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
+ /// Similarly to the `worker_threads`, this number should be between 0 and 32,768.
///
/// The default value is 512.
///
@@ -205,7 +218,6 @@ impl Builder {
/// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
/// blocking annotations) as `max_threads - core_threads`.
pub fn max_threads(&mut self, val: usize) -> &mut Self {
- assert_ne!(val, 0, "Thread limit cannot be zero");
self.max_threads = val;
self
}
@@ -220,7 +232,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .thread_name("my-pool")
/// .build();
/// # }
@@ -242,7 +254,7 @@ impl Builder {
/// # use std::sync::atomic::{AtomicUsize, Ordering};
///
/// # pub fn main() {
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .thread_name_fn(|| {
/// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
/// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
@@ -273,8 +285,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let rt = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .thread_stack_size(32 * 1024)
/// .build();
/// # }
@@ -295,8 +306,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let runtime = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_start(|| {
/// println!("thread started");
/// })
@@ -322,8 +332,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let runtime = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_stop(|| {
/// println!("thread stopping");
/// })
@@ -341,26 +350,24 @@ impl Builder {
/// Creates the configured `Runtime`.
///
- /// The returned `ThreadPool` instance is ready to spawn tasks.
+ /// The returned `Runtime` instance is ready to spawn tasks.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
- /// let rt = Builder::new().build().unwrap();
+ /// let rt = Builder::new_multi_thread().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
- match self.kind {
- Kind::Shell => self.build_shell_runtime(),
- #[cfg(feature = "rt-core")]
- Kind::Basic => self.build_basic_runtime(),
+ match &self.kind {
+ Kind::CurrentThread => self.build_basic_runtime(),
#[cfg(feature = "rt-threaded")]
- Kind::ThreadPool => self.build_threaded_runtime(),
+ Kind::MultiThread => self.build_threaded_runtime(),
}
}
@@ -371,32 +378,6 @@ impl Builder {
}
}
- fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
- use crate::runtime::Kind;
-
- let (driver, resources) = driver::Driver::new(self.get_cfg())?;
-
- let spawner = Spawner::Shell;
-
- let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
- let blocking_spawner = blocking_pool.spawner().clone();
-
- Ok(Runtime {
- kind: Kind::Shell(Shell::new(driver)),
- handle: Handle {
- spawner,
- io_handle: resources.io_handle,
- time_handle: resources.time_handle,
- signal_handle: resources.signal_handle,
- clock: resources.clock,
- blocking_spawner,
- },
- blocking_pool,
- })
- }
-
- #[cfg(feature = "blocking")]
- #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))]
/// Sets a custom timeout for a thread in the blocking pool.
///
/// By default, the timeout for a thread is set to 10 seconds. This can
@@ -409,7 +390,7 @@ impl Builder {
/// # use std::time::Duration;
///
/// # pub fn main() {
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .thread_keep_alive(Duration::from_millis(100))
/// .build();
/// # }
@@ -418,6 +399,36 @@ impl Builder {
self.keep_alive = Some(duration);
self
}
+
+ fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::{BasicScheduler, Kind};
+
+ let (driver, resources) = driver::Driver::new(self.get_cfg())?;
+
+ // And now put a single-threaded scheduler on top of the timer. When
+ // there are no futures ready to do something, it'll let the timer or
+ // the reactor to generate some new stimuli for the futures to continue
+ // in their life.
+ let scheduler = BasicScheduler::new(driver);
+ let spawner = Spawner::Basic(scheduler.spawner().clone());
+
+ // Blocking pool
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ Ok(Runtime {
+ kind: Kind::CurrentThread(scheduler),
+ handle: Handle {
+ spawner,
+ io_handle: resources.io_handle,
+ time_handle: resources.time_handle,
+ signal_handle: resources.signal_handle,
+ clock: resources.clock,
+ blocking_spawner,
+ },
+ blocking_pool,
+ })
+ }
}
cfg_io_driver! {
@@ -432,7 +443,7 @@ cfg_io_driver! {
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .enable_io()
/// .build()
/// .unwrap();
@@ -455,7 +466,7 @@ cfg_time! {
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .enable_time()
/// .build()
/// .unwrap();
@@ -467,75 +478,15 @@ cfg_time! {
}
}
-cfg_rt_core! {
- impl Builder {
- /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread.
- ///
- /// The executor and all necessary drivers will all be run on the current
- /// thread during [`block_on`] calls.
- ///
- /// See also [the module level documentation][1], which has a section on scheduler
- /// types.
- ///
- /// [1]: index.html#runtime-configurations
- /// [`block_on`]: Runtime::block_on
- pub fn basic_scheduler(&mut self) -> &mut Self {
- self.kind = Kind::Basic;
- self
- }
-
- fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
- use crate::runtime::{BasicScheduler, Kind};
-
- let (driver, resources) = driver::Driver::new(self.get_cfg())?;
-
- // And now put a single-threaded scheduler on top of the timer. When
- // there are no futures ready to do something, it'll let the timer or
- // the reactor to generate some new stimuli for the futures to continue
- // in their life.
- let scheduler = BasicScheduler::new(driver);
- let spawner = Spawner::Basic(scheduler.spawner().clone());
-
- // Blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
- let blocking_spawner = blocking_pool.spawner().clone();
-
- Ok(Runtime {
- kind: Kind::Basic(scheduler),
- handle: Handle {
- spawner,
- io_handle: resources.io_handle,
- time_handle: resources.time_handle,
- signal_handle: resources.signal_handle,
- clock: resources.clock,
- blocking_spawner,
- },
- blocking_pool,
- })
- }
- }
-}
-
cfg_rt_threaded! {
impl Builder {
- /// Sets runtime to use a multi-threaded scheduler for executing tasks.
- ///
- /// See also [the module level documentation][1], which has a section on scheduler
- /// types.
- ///
- /// [1]: index.html#runtime-configurations
- pub fn threaded_scheduler(&mut self) -> &mut Self {
- self.kind = Kind::ThreadPool;
- self
- }
-
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
use std::cmp;
- let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
+ let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
@@ -569,17 +520,10 @@ cfg_rt_threaded! {
}
}
-impl Default for Builder {
- fn default() -> Self {
- Self::new()
- }
-}
-
impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
- .field("kind", &self.kind)
- .field("core_threads", &self.core_threads)
+ .field("worker_threads", &self.worker_threads)
.field("max_threads", &self.max_threads)
.field(
"thread_name",
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
index a4f88e90..e28d5282 100644
--- a/tokio/src/runtime/context.rs
+++ b/tokio/src/runtime/context.rs
@@ -7,10 +7,8 @@ thread_local! {
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
}
-cfg_blocking_impl! {
- pub(crate) fn current() -> Option<Handle> {
- CONTEXT.with(|ctx| ctx.borrow().clone())
- }
+pub(crate) fn current() -> Option<Handle> {
+ CONTEXT.with(|ctx| ctx.borrow().clone())
}
cfg_io_driver! {
diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs
index af8e17a3..6fccb11e 100644
--- a/tokio/src/runtime/driver.rs
+++ b/tokio/src/runtime/driver.rs
@@ -1,16 +1,18 @@
//! Abstracts out the entire chain of runtime sub-drivers into common types.
-use crate::park::{Park, ParkThread};
+use crate::park::thread::ParkThread;
+use crate::park::Park;
+
use std::io;
use std::time::Duration;
// ===== io driver =====
cfg_io_driver! {
- type IoDriver = crate::park::Either<crate::io::driver::Driver, crate::park::ParkThread>;
+ type IoDriver = crate::park::either::Either<crate::io::driver::Driver, ParkThread>;
pub(crate) type IoHandle = Option<crate::io::driver::Handle>;
fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> {
- use crate::park::Either;
+ use crate::park::either::Either;
#[cfg(loom)]
assert!(!enable);
@@ -47,11 +49,11 @@ macro_rules! cfg_signal_internal_and_unix {
}
cfg_signal_internal_and_unix! {
- type SignalDriver = crate::park::Either<crate::signal::unix::driver::Driver, IoDriver>;
+ type SignalDriver = crate::park::either::Either<crate::signal::unix::driver::Driver, IoDriver>;
pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>;
fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
- use crate::park::Either;
+ use crate::park::either::Either;
// Enable the signal driver if IO is also enabled
match io_driver {
@@ -77,10 +79,10 @@ cfg_not_signal_internal! {
// ===== process driver =====
cfg_process_driver! {
- type ProcessDriver = crate::park::Either<crate::process::unix::driver::Driver, SignalDriver>;
+ type ProcessDriver = crate::park::either::Either<crate::process::unix::driver::Driver, SignalDriver>;
fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
- use crate::park::Either;
+ use crate::park::either::Either;
// Enable the signal driver if IO is also enabled
match signal_driver {
@@ -104,7 +106,7 @@ cfg_not_process_driver! {
// ===== time driver =====
cfg_time! {
- type TimeDriver = crate::park::Either<crate::time::driver::Driver<ProcessDriver>, ProcessDriver>;
+ type TimeDriver = crate::park::either::Either<crate::time::driver::Driver<ProcessDriver>, ProcessDriver>;
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
@@ -118,7 +120,7 @@ cfg_time! {
process_driver: ProcessDriver,
clock: Clock,
) -> (TimeDriver, TimeHandle) {
- use crate::park::Either;
+ use crate::park::either::Either;
if enable {
let driver = crate::time::driver::Driver::new(process_driver, clock);
diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs
index f934162b..79ed4d17 100644
--- a/tokio/src/runtime/enter.rs
+++ b/tokio/src/runtime/enter.rs
@@ -4,8 +4,8 @@ use std::marker::PhantomData;
#[derive(Debug, Clone, Copy)]
pub(crate) enum EnterContext {
+ #[cfg_attr(not(feature = "rt-core"), allow(dead_code))]
Entered {
- #[allow(dead_code)]
allow_blocking: bool,
},
NotEntered,
@@ -24,32 +24,38 @@ pub(crate) struct Enter {
_p: PhantomData<RefCell<()>>,
}
-/// Marks the current thread as being within the dynamic extent of an
-/// executor.
-pub(crate) fn enter(allow_blocking: bool) -> Enter {
- if let Some(enter) = try_enter(allow_blocking) {
- return enter;
- }
+cfg_rt_core! {
+ use crate::park::thread::ParkError;
- panic!(
- "Cannot start a runtime from within a runtime. This happens \
- because a function (like `block_on`) attempted to block the \
- current thread while the thread is being used to drive \
- asynchronous tasks."
- );
-}
+ use std::time::Duration;
-/// Tries to enter a runtime context, returns `None` if already in a runtime
-/// context.
-pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
- ENTERED.with(|c| {
- if c.get().is_entered() {
- None
- } else {
- c.set(EnterContext::Entered { allow_blocking });
- Some(Enter { _p: PhantomData })
+ /// Marks the current thread as being within the dynamic extent of an
+ /// executor.
+ pub(crate) fn enter(allow_blocking: bool) -> Enter {
+ if let Some(enter) = try_enter(allow_blocking) {
+ return enter;
}
- })
+
+ panic!(
+ "Cannot start a runtime from within a runtime. This happens \
+ because a function (like `block_on`) attempted to block the \
+ current thread while the thread is being used to drive \
+ asynchronous tasks."
+ );
+ }
+
+ /// Tries to enter a runtime context, returns `None` if already in a runtime
+ /// context.
+ pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
+ ENTERED.with(|c| {
+ if c.get().is_entered() {
+ None
+ } else {
+ c.set(EnterContext::Entered { allow_blocking });
+ Some(Enter { _p: PhantomData })
+ }
+ })
+ }
}
// Forces the current "entered" state to be cleared while the closure
@@ -59,113 +65,92 @@ pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
//
// This is hidden for a reason. Do not use without fully understanding
// executors. Misuing can easily cause your program to deadlock.
-#[cfg(all(feature = "rt-threaded", feature = "blocking"))]
-pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
- // Reset in case the closure panics
- struct Reset(EnterContext);
- impl Drop for Reset {
- fn drop(&mut self) {
- ENTERED.with(|c| {
- assert!(!c.get().is_entered(), "closure claimed permanent executor");
- c.set(self.0);
- });
+cfg_rt_threaded! {
+ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
+ // Reset in case the closure panics
+ struct Reset(EnterContext);
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ ENTERED.with(|c| {
+ assert!(!c.get().is_entered(), "closure claimed permanent executor");
+ c.set(self.0);
+ });
+ }
}
- }
- let was = ENTERED.with(|c| {
- let e = c.get();
- assert!(e.is_entered(), "asked to exit when not entered");
- c.set(EnterContext::NotEntered);
- e
- });
+ let was = ENTERED.with(|c| {
+ let e = c.get();
+ assert!(e.is_entered(), "asked to exit when not entered");
+ c.set(EnterContext::NotEntered);
+ e
+ });
- let _reset = Reset(was);
- // dropping _reset after f() will reset ENTERED
- f()
+ let _reset = Reset(was);
+ // dropping _reset after f() will reset ENTERED
+ f()
+ }
}
-cfg_rt_core! {
- cfg_rt_util! {
- /// Disallow blocking in the current runtime context until the guard is dropped.
- pub(crate) fn disallow_blocking() -> DisallowBlockingGuard {
- let reset = ENTERED.with(|c| {
- if let EnterContext::Entered {
- allow_blocking: true,
- } = c.get()
- {
- c.set(EnterContext::Entered {
- allow_blocking: false,
- });
- true
- } else {
- false
- }
- });
- DisallowBlockingGuard(reset)
- }
+cfg_rt_util! {
+ /// Disallow blocking in the current runtime context until the guard is dropped.
+ pub(crate) fn disallow_blocking() -> DisallowBlockingGuard {
+ let reset = ENTERED.with(|c| {
+ if let EnterContext::Entered {
+ allow_blocking: true,
+ } = c.get()
+ {
+ c.set(EnterContext::Entered {
+ allow_blocking: false,
+ });
+ true
+ } else {
+ false
+ }
+ });
+ DisallowBlockingGuard(reset)
+ }
- pub(crate) struct DisallowBlockingGuard(bool);
- impl Drop for DisallowBlockingGuard {
- fn drop(&mut self) {
- if self.0 {
- // XXX: Do we want some kind of assertion here, or is "best effort" okay?
- ENTERED.with(|c| {
- if let EnterContext::Entered {
- allow_blocking: false,
- } = c.get()
- {
- c.set(EnterContext::Entered {
- allow_blocking: true,
- });
- }
- })
- }
+ pub(crate) struct DisallowBlockingGuard(bool);
+ impl Drop for DisallowBlockingGuard {
+ fn drop(&mut self) {
+ if self.0 {
+ // XXX: Do we want some kind of assertion here, or is "best effort" okay?
+ ENTERED.with(|c| {
+ if let EnterContext::Entered {
+ allow_blocking: false,
+ } = c.get()
+ {
+ c.set(EnterContext::Entered {
+ allow_blocking: true,
+ });
+ }
+ })
}
}
}
}
cfg_rt_threaded! {
- cfg_blocking! {
- /// Returns true if in a runtime context.
- pub(crate) fn context() -> EnterContext {
- ENTERED.with(|c| c.get())
- }
+ /// Returns true if in a runtime context.
+ pub(crate) fn context() -> EnterContext {
+ ENTERED.with(|c| c.get())
}
}
-impl Enter {
- /// Blocks the thread on the specified future, returning the value with
- /// which that future completes.
- pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
- where
- F: std::future::Future,
- {
- use crate::park::{CachedParkThread, Park};
- use std::task::Context;
- use std::task::Poll::Ready;
-
- let mut park = CachedParkThread::new();
- let waker = park.get_unpark()?.into_waker();
- let mut cx = Context::from_waker(&waker);
-
- pin!(f);
-
- loop {
- if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
- return Ok(v);
- }
+cfg_rt_core! {
+ impl Enter {
+ /// Blocks the thread on the specified future, returning the value with
+ /// which that future completes.
+ pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, ParkError>
+ where
+ F: std::future::Future,
+ {
+ use crate::park::thread::CachedParkThread;
- park.park()?;
+ let mut park = CachedParkThread::new();
+ park.block_on(f)
}
- }
-}