summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGardner Vickers <gardner@vickers.me>2019-12-24 18:34:47 -0500
committerCarl Lerche <me@carllerche.com>2019-12-24 15:34:47 -0800
commit67bf9c36f347031ca05872d102a7f9abc8b465f0 (patch)
tree22c0f472b594382a1bb6fa186b7f40fe4c17013d
parent101f770af33ae65820e1cc0e9b89d998b3c1317a (diff)
rt: coalesce thread-locals used by the runtime (#1925)
Previously, thread-locals used by the various drivers were situated with the driver code. This resulted in state being spread out and many thread-locals being required to run a runtime. This PR coalesces the thread-locals into a single struct.
-rw-r--r--tokio/Cargo.toml5
-rw-r--r--tokio/benches/spawn.rs70
-rw-r--r--tokio/src/io/driver/mod.rs54
-rw-r--r--tokio/src/runtime/basic_scheduler.rs71
-rw-r--r--tokio/src/runtime/blocking/pool.rs18
-rw-r--r--tokio/src/runtime/context.rs146
-rw-r--r--tokio/src/runtime/global.rs102
-rw-r--r--tokio/src/runtime/handle.rs15
-rw-r--r--tokio/src/runtime/io.rs6
-rw-r--r--tokio/src/runtime/mod.rs6
-rw-r--r--tokio/src/runtime/spawner.rs16
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs6
-rw-r--r--tokio/src/runtime/thread_pool/spawner.rs8
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs54
-rw-r--r--tokio/src/runtime/time.rs15
-rw-r--r--tokio/src/task/spawn.rs4
-rw-r--r--tokio/src/time/clock.rs120
-rw-r--r--tokio/src/time/driver/handle.rs48
-rw-r--r--tokio/src/time/driver/mod.rs2
-rw-r--r--tokio/src/time/tests/mock_clock.rs27
20 files changed, 345 insertions, 448 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 48b76c74..6f0591ce 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -127,6 +127,7 @@ futures = { version = "0.3.0", features = ["async-await"] }
loom = { version = "0.2.13", features = ["futures", "checkpoint"] }
proptest = "0.9.4"
tempfile = "3.1.0"
+bencher = "0.1.5"
[package.metadata.docs.rs]
all-features = true
@@ -134,3 +135,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["full"]
+
+[[bench]]
+name = "spawn"
+harness = false \ No newline at end of file
diff --git a/tokio/benches/spawn.rs b/tokio/benches/spawn.rs
new file mode 100644
index 00000000..78e0b784
--- /dev/null
+++ b/tokio/benches/spawn.rs
@@ -0,0 +1,70 @@
+//! Benchmark spawning a task onto the basic and threaded Tokio executors.
+//! This essentially measure the time to enqueue a task in the local and remote
+//! case.
+
+use bencher::{black_box, Bencher};
+
+async fn work() -> usize {
+ let val = 1 + 1;
+ black_box(val)
+}
+
+fn basic_scheduler_local_spawn(bench: &mut Bencher) {
+ let mut runtime = tokio::runtime::Builder::new()
+ .basic_scheduler()
+ .build()
+ .unwrap();
+ runtime.block_on(async {
+ bench.iter(|| {
+ let h = tokio::spawn(work());
+ black_box(h);
+ })
+ });
+}
+
+fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
+ let mut runtime = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .build()
+ .unwrap();
+ runtime.block_on(async {
+ bench.iter(|| {
+ let h = tokio::spawn(work());
+ black_box(h);
+ })
+ });
+}
+
+fn basic_scheduler_remote_spawn(bench: &mut Bencher) {
+ let runtime = tokio::runtime::Builder::new()
+ .basic_scheduler()
+ .build()
+ .unwrap();
+ let handle = runtime.handle();
+ bench.iter(|| {
+ let h = handle.spawn(work());
+ black_box(h);
+ });
+}
+
+fn threaded_scheduler_remote_spawn(bench: &mut Bencher) {
+ let runtime = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .build()
+ .unwrap();
+ let handle = runtime.handle();
+ bench.iter(|| {
+ let h = handle.spawn(work());
+ black_box(h);
+ });
+}
+
+bencher::benchmark_group!(
+ benches,
+ basic_scheduler_local_spawn,
+ threaded_scheduler_local_spawn,
+ basic_scheduler_remote_spawn,
+ threaded_scheduler_remote_spawn
+);
+
+bencher::benchmark_main!(benches);
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index 58ce5124..dfb741be 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -5,13 +5,13 @@ pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
+#[cfg(all(feature = "io-driver", not(loom)))]
+use crate::runtime::context;
use crate::util::slab::{Address, Slab};
use mio::event::Evented;
-use std::cell::RefCell;
use std::fmt;
use std::io;
-use std::marker::PhantomData;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::Waker;
@@ -54,11 +54,6 @@ pub(super) enum Direction {
Write,
}
-thread_local! {
- /// Tracks the reactor for the current execution context.
- static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None)
-}
-
const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
fn _assert_kinds() {
@@ -69,40 +64,6 @@ fn _assert_kinds() {
// ===== impl Driver =====
-#[derive(Debug)]
-/// Guard that resets current reactor on drop.
-pub(crate) struct DefaultGuard<'a> {
- _lifetime: PhantomData<&'a u8>,
-}
-
-impl Drop for DefaultGuard<'_> {
- fn drop(&mut self) {
- CURRENT_REACTOR.with(|current| {
- let mut current = current.borrow_mut();
- *current = None;
- });
- }
-}
-
-/// Sets handle for a default reactor, returning guard that unsets it on drop.
-pub(crate) fn set_default(handle: &Handle) -> DefaultGuard<'_> {
- CURRENT_REACTOR.with(|current| {
- let mut current = current.borrow_mut();
-
- assert!(
- current.is_none(),
- "default Tokio reactor already set \
- for execution context"
- );
-
- *current = Some(handle.clone());
- });
-
- DefaultGuard {
- _lifetime: PhantomData,
- }
-}
-
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
@@ -237,11 +198,14 @@ impl Handle {
/// # Panics
///
/// This function panics if there is no current reactor set.
+ #[cfg(all(feature = "io-driver", not(loom)))]
pub(super) fn current() -> Self {
- CURRENT_REACTOR.with(|current| match *current.borrow() {
- Some(ref handle) => handle.clone(),
- None => panic!("no current reactor"),
- })
+ context::ThreadContext::io_handle().expect("no current reactor")
+ }
+
+ #[cfg(any(not(feature = "io-driver"), loom))]
+ pub(super) fn current() -> Self {
+ panic!("no current reactor")
}
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 53b8bcc9..f809db41 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -119,37 +119,35 @@ where
guard
});
- runtime::global::with_basic_scheduler(scheduler, || {
- let mut _enter = runtime::enter();
+ let mut _enter = runtime::enter();
- let raw_waker = RawWaker::new(
- scheduler as *const SchedulerPriv as *const (),
- &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
- );
+ let raw_waker = RawWaker::new(
+ scheduler as *const SchedulerPriv as *const (),
+ &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
+ );
- let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
- let mut cx = Context::from_waker(&waker);
+ let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
+ let mut cx = Context::from_waker(&waker);
- // `block_on` takes ownership of `f`. Once it is pinned here, the
- // original `f` binding can no longer be accessed, making the
- // pinning safe.
- let mut future = unsafe { Pin::new_unchecked(&mut future) };
+ // `block_on` takes ownership of `f`. Once it is pinned here, the
+ // original `f` binding can no longer be accessed, making the
+ // pinning safe.
+ let mut future = unsafe { Pin::new_unchecked(&mut future) };
- loop {
- if let Ready(v) = future.as_mut().poll(&mut cx) {
- return v;
- }
+ loop {
+ if let Ready(v) = future.as_mut().poll(&mut cx) {
+ return v;
+ }
- scheduler.tick(local);
+ scheduler.tick(local);
- // Maintenance work
- unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on (which we are).
- scheduler.queues.drain_pending_drop();
- }
+ // Maintenance work
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on (which we are).
+ scheduler.queues.drain_pending_drop();
}
- })
+ }
}
}
@@ -164,15 +162,6 @@ impl Spawner {
self.scheduler.schedule(task, true);
handle
}
-
- /// Enter the executor context
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- use crate::runtime::global;
- global::with_basic_scheduler(&*self.scheduler, f)
- }
}
// === impl SchedulerPriv ===
@@ -217,20 +206,10 @@ impl SchedulerPriv {
.expect("failed to park");
}
- /// # Safety
+ /// Schedule the provided task on the scheduler.
///
- /// Must be called from the same thread that holds the `BasicScheduler`
- /// value.
- pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let (task, handle) = task::joinable(future);
- self.queues.push_local(task);
- handle
- }
-
+ /// If this scheduler is the `ACTIVE` scheduler, enqueue this task on the local queue, otherwise
+ /// the task is enqueued on the remote queue.
fn schedule(&self, task: Task<Self>, spawn: bool) {
let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 43f94a48..2aa4c8bf 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -5,7 +5,7 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
-use crate::runtime::{self, io, time, Builder, Callback};
+use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
use crate::task::{self, JoinHandle};
use std::cell::Cell;
@@ -243,13 +243,17 @@ impl Spawner {
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
-
+ let thread_context = ThreadContext::new(
+ self.inner.spawner.clone(),
+ self.inner.io_handle.clone(),
+ self.inner.time_handle.clone(),
+ Some(self.inner.clock.clone()),
+ );
let spawner = self.clone();
-
builder
.spawn(move || {
+ let _e = thread_context.enter();
run_thread(spawner);
-
drop(shutdown_tx);
})
.unwrap();
@@ -259,11 +263,7 @@ impl Spawner {
fn run_thread(spawner: Spawner) {
spawner.enter(|| {
let inner = &*spawner.inner;
- let _io = io::set_default(&inner.io_handle);
-
- time::with_default(&inner.time_handle, &inner.clock, || {
- inner.spawner.enter(|| inner.run());
- });
+ inner.run()
});
}
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
new file mode 100644
index 00000000..6ac60b07
--- /dev/null
+++ b/tokio/src/runtime/context.rs
@@ -0,0 +1,146 @@
+//! Thread local runtime context
+use crate::runtime::Spawner;
+use std::cell::RefCell;
+
+thread_local! {
+ static CONTEXT: RefCell<Option<ThreadContext>> = RefCell::new(None)
+}
+
+/// ThreadContext makes Runtime context accessible to each Runtime thread.
+#[derive(Debug, Clone)]
+pub(crate) struct ThreadContext {
+ /// Handles to the executor.
+ spawner: Spawner,
+
+ /// Handles to the I/O drivers
+ io_handle: crate::runtime::io::Handle,
+
+ /// Handles to the time drivers
+ time_handle: crate::runtime::time::Handle,
+
+ /// Source of `Instant::now()`
+ clock: Option<crate::runtime::time::Clock>,
+}
+
+impl Default for ThreadContext {
+ fn default() -> Self {
+ ThreadContext {
+ spawner: Spawner::Shell,
+ #[cfg(all(feature = "io-driver", not(loom)))]
+ io_handle: None,
+ #[cfg(any(not(feature = "io-driver"), loom))]
+ io_handle: (),
+ #[cfg(all(feature = "time", not(loom)))]
+ time_handle: None,
+ #[cfg(any(not(feature = "time"), loom))]
+ time_handle: (),
+ clock: None,
+ }
+ }
+}
+
+impl ThreadContext {
+ /// Construct a new [`ThreadContext`]
+ ///
+ /// [`ThreadContext`]: struct.ThreadContext.html
+ pub(crate) fn new(
+ spawner: Spawner,
+ io_handle: crate::runtime::io::Handle,
+ time_handle: crate::runtime::time::Handle,
+ clock: Option<crate::runtime::time::Clock>,
+ ) -> Self {
+ ThreadContext {
+ spawner,
+ #[cfg(all(feature = "io-driver", not(loom)))]
+ io_handle,
+ #[cfg(any(not(feature = "io-driver"), loom))]
+ io_handle,
+ #[cfg(all(feature = "time", not(loom)))]
+ time_handle,
+ #[cfg(any(not(feature = "time"), loom))]
+ time_handle,
+ clock,
+ }
+ }
+
+ /// Clone the current [`ThreadContext`] if one is set, otherwise construct a new [`ThreadContext`].
+ ///
+ /// [`ThreadContext`]: struct.ThreadContext.html
+ #[allow(dead_code)]
+ pub(crate) fn clone_current() -> Self {
+ CONTEXT.with(|ctx| ctx.borrow().clone().unwrap_or_else(Default::default))
+ }
+
+ /// Set this [`ThreadContext`] as the current active [`ThreadContext`].
+ ///
+ /// [`ThreadContext`]: struct.ThreadContext.html
+ pub(crate) fn enter(self) -> ThreadContextDropGuard {
+ CONTEXT.with(|ctx| {
+ let previous = ctx.borrow_mut().replace(self);
+ ThreadContextDropGuard { previous }
+ })
+ }
+
+ #[cfg(all(feature = "test-util", feature = "time", test))]
+ pub(crate) fn with_time_handle(mut self, handle: crate::runtime::time::Handle) -> Self {
+ self.time_handle = handle;
+ self
+ }
+
+ #[cfg(all(feature = "test-util", feature = "time", test))]
+ pub(crate) fn with_clock(mut self, clock: crate::runtime::time::Clock) -> Self {
+ self.clock.replace(clock);
+ self
+ }
+
+ #[cfg(all(feature = "io-driver", not(loom)))]
+ pub(crate) fn io_handle() -> crate::runtime::io::Handle {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => ctx.io_handle.clone(),
+ None => None,
+ })
+ }
+
+ #[cfg(all(feature = "time", not(loom)))]
+ pub(crate) fn time_handle() -> crate::runtime::time::Handle {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => ctx.time_handle.clone(),
+ None => None,
+ })
+ }
+
+ #[cfg(feature = "rt-core")]
+ pub(crate) fn spawn_handle() -> Option<Spawner> {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => Some(ctx.spawner.clone()),
+ None => None,
+ })
+ }
+
+ #[cfg(all(feature = "test-util", feature = "time"))]
+ pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
+ CONTEXT.with(
+ |ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
+ Some(Some(clock)) => Some(clock),
+ _ => None,
+ },
+ )
+ }
+}
+
+/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop.
+///
+/// [`ThreadContextDropGuard`]: struct.ThreadContextDropGuard.html
+#[derive(Debug)]
+pub(crate) struct ThreadContextDropGuard {
+ previous: Option<ThreadContext>,
+}
+
+impl Drop for ThreadContextDropGuard {
+ fn drop(&mut self) {
+ CONTEXT.with(|ctx| match self.previous.clone() {
+ Some(prev) => ctx.borrow_mut().replace(prev),
+ None => ctx.borrow_mut().take(),
+ });
+ }
+}
diff --git a/tokio/src/runtime/global.rs b/tokio/src/runtime/global.rs
deleted file mode 100644
index c84f348b..00000000
--- a/tokio/src/runtime/global.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-use crate::runtime::basic_scheduler;
-use crate::task::JoinHandle;
-
-use std::cell::Cell;
-use std::future::Future;
-
-#[derive(Clone, Copy)]
-enum State {
- // default executor not defined
- Empty,
-
- // Basic scheduler (runs on the current-thread)
- Basic(*const basic_scheduler::SchedulerPriv),
-
- // default executor is a thread pool instance.
- #[cfg(feature = "rt-threaded")]
- ThreadPool(*const thread_pool::Spawner),
-}
-
-thread_local! {
- /// Thread-local tracking the current executor
- static EXECUTOR: Cell<State> = Cell::new(State::Empty)
-}
-
-// ===== global spawn fns =====
-
-/// Spawns a future on the default executor.
-pub(crate) fn spawn<T>(future: T) -> JoinHandle<T::Output>
-where
- T: Future + Send + 'static,
- T::Output: Send + 'static,
-{
- EXECUTOR.with(|current_executor| match current_executor.get() {
- #[cfg(feature = "rt-threaded")]
- State::ThreadPool(thread_pool_ptr) => {
- let thread_pool = unsafe { &*thread_pool_ptr };
- thread_pool.spawn(future)
- }
- State::Basic(basic_scheduler_ptr) => {
- let basic_scheduler = unsafe { &*basic_scheduler_ptr };
-
- // Safety: The `BasicScheduler` value set the thread-local (same
- // thread).
- unsafe { basic_scheduler.spawn(future) }
- }
- State::Empty => {
- // Explicit drop of `future` silences the warning that `future` is
- // not used when neither rt-* feature flags are enabled.
- drop(future);
- panic!("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
- }
- })
-}
-
-pub(super) fn with_basic_scheduler<F, R>(
- basic_scheduler: &basic_scheduler::SchedulerPriv,
- f: F,
-) -> R
-where
- F: FnOnce() -> R,
-{
- with_state(
- State::Basic(basic_scheduler as *const basic_scheduler::SchedulerPriv),
- f,
- )
-}
-
-cfg_rt_threaded! {
- use crate::runtime::thread_pool;
-
- pub(super) fn with_thread_pool<F, R>(thread_pool: &thread_pool::Spawner, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- with_state(State::ThreadPool(thread_pool as *const _), f)
- }
-}
-
-fn with_state<F, R>(state: State, f: F) -> R
-where
- F: FnOnce() -> R,
-{
- EXECUTOR.with(|cell| {
- let was = cell.replace(State::Empty);
-
- // Ensure that the executor is removed from the thread-local context
- // when leaving the scope. This handles cases that involve panicking.
- struct Reset<'a>(&'a Cell<State>, State);
-
- impl Drop for Reset<'_> {
- fn drop(&mut self) {
- self.0.set(self.1);
- }
- }
-
- let _reset = Reset(cell, was);
-
- cell.set(state);
-
- f()
- })
-}
diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs
index 562a33ce..ed3348df 100644
--- a/tokio/src/runtime/handle.rs
+++ b/tokio/src/runtime/handle.rs
@@ -1,4 +1,4 @@
-use crate::runtime::{blocking, io, time, Spawner};
+use crate::runtime::{blocking, context, io, time, Spawner};
cfg_rt_core! {
use crate::task::JoinHandle;
@@ -30,11 +30,14 @@ impl Handle {
where
F: FnOnce() -> R,
{
- self.blocking_spawner.enter(|| {
- let _io = io::set_default(&self.io_handle);
-
- time::with_default(&self.time_handle, &self.clock, || self.spawner.enter(f))
- })
+ let _e = context::ThreadContext::new(
+ self.spawner.clone(),
+ self.io_handle.clone(),
+ self.time_handle.clone(),
+ Some(self.clock.clone()),
+ )
+ .enter();
+ self.blocking_spawner.enter(|| f())
}
}
diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs
index e912f6f7..dca7310f 100644
--- a/tokio/src/runtime/io.rs
+++ b/tokio/src/runtime/io.rs
@@ -38,10 +38,6 @@ mod variant {
Ok((Either::B(driver), None))
}
}
-
- pub(crate) fn set_default(handle: &Handle) -> Option<driver::DefaultGuard<'_>> {
- handle.as_ref().map(|handle| driver::set_default(handle))
- }
}
#[cfg(any(not(feature = "io-driver"), loom))]
@@ -61,6 +57,4 @@ mod variant {
Ok((driver, ()))
}
-
- pub(crate) fn set_default(_handle: &Handle) {}
}
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index 800df861..54524a91 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -187,6 +187,7 @@
#[cfg(test)]
#[macro_use]
mod tests;
+pub(crate) mod context;
cfg_rt_core! {
mod basic_scheduler;
@@ -206,11 +207,6 @@ pub use self::builder::Builder;
pub(crate) mod enter;
use self::enter::enter;
-cfg_rt_core! {
- mod global;
- pub(crate) use global::spawn;
-}
-
mod handle;
pub use self::handle::Handle;
diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs
index cc2fbb53..d136945c 100644
--- a/tokio/src/runtime/spawner.rs
+++ b/tokio/src/runtime/spawner.rs
@@ -18,22 +18,6 @@ pub(crate) enum Spawner {
ThreadPool(thread_pool::Spawner),
}
-impl Spawner {
- /// Enter the scheduler context
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- match self {
- Spawner::Shell => f(),
- #[cfg(feature = "rt-core")]
- Spawner::Basic(spawner) => spawner.enter(f),
- #[cfg(feature = "rt-threaded")]
- Spawner::ThreadPool(spawner) => spawner.enter(f),
- }
- }
-}
-
cfg_rt_core! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 6a50fe9c..c22ce8b9 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -89,10 +89,8 @@ impl ThreadPool {
where
F: Future,
{
- self.spawner.enter(|| {
- let mut enter = crate::runtime::enter();
- enter.block_on(future).expect("failed to park thread")
- })
+ let mut enter = crate::runtime::enter();
+ enter.block_on(future).expect("failed to park thread")
}
}
diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs
index 4fccad96..976fd32d 100644
--- a/tokio/src/runtime/thread_pool/spawner.rs
+++ b/tokio/src/runtime/thread_pool/spawner.rs
@@ -36,14 +36,6 @@ impl Spawner {
self.workers.spawn_typed(future)
}
- /// Enter the executor context
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- crate::runtime::global::with_thread_pool(self, f)
- }
-
/// Reference to the worker set. Used by `ThreadPool` to initiate shutdown.
pub(super) fn workers(&self) -> &slice::Set {
&*self.workers
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 18c0db1f..fbf7a1fc 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -2,7 +2,7 @@ use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc;
use crate::park::Park;
use crate::runtime::park::Parker;
-use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner};
+use crate::runtime::thread_pool::{current, slice, Owned, Shared};
use crate::runtime::{self, blocking};
use crate::task::Task;
@@ -126,45 +126,41 @@ impl Worker {
None => return,
};
- let spawner = Spawner::new(self.slices.clone());
-
// Track the current worker
current::set(&self.slices, self.index, || {
// Enter a runtime context
let _enter = crate::runtime::enter();
- crate::runtime::global::with_thread_pool(&spawner, || {
- blocking_pool.enter(|| {
- ON_BLOCK.with(|ob| {
- // Ensure that the ON_BLOCK is removed from the thread-local context
- // when leaving the scope. This handles cases that involve panicking.
- struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);
-
- impl<'a> Drop for Reset<'a> {
- fn drop(&mut self) {
- self.0.set(None);
- }
+ blocking_pool.enter(|| {
+ ON_BLOCK.with(|ob| {
+ // Ensure that the ON_BLOCK is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);
+
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.0.set(None);
}
+ }
- let _reset = Reset(ob);
+ let _reset = Reset(ob);
- let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool);
+ let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool);
- ob.set(Some(unsafe {
- // NOTE: We cannot use a safe cast to raw pointer here, since we are
- // _also_ erasing the lifetime of these pointers. That is safe here,
- // because we know that ob will set back to None before allow_blocking
- // is dropped.
- #[allow(clippy::useless_transmute)]
- std::mem::transmute::<_, *const dyn Fn()>(allow_blocking)
- }));
+ ob.set(Some(unsafe {
+ // NOTE: We cannot use a safe cast to raw pointer here, since we are
+ // _also_ erasing the lifetime of these pointers. That is safe here,
+ // because we know that ob will set back to None before allow_blocking
+ // is dropped.
+ #[allow(clippy::useless_transmute)]
+ std::mem::transmute::<_, *const dyn Fn()>(allow_blocking)
+ }));
- let _ = guard.run();
+ let _ = guard.run();
- // Ensure that we reset ob before allow_blocking is dropped.
- drop(_reset);
- });
- })
+ // Ensure that we reset ob before allow_blocking is dropped.
+ drop(_reset);
+ });
})
});
diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs
index 1cd58cd6..6259c87a 100644
--- a/tokio/src/runtime/time.rs
+++ b/tokio/src/runtime/time.rs
@@ -34,14 +34,6 @@ mod variant {
(Either::B(io_driver), None)
}
}
-
- pub(crate) fn with_default<F, R>(handle: &Handle, clock: &Clock, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- let _time = handle.as_ref().map(|handle| driver::set_default(handle));
- clock.enter(f)
- }
}
#[cfg(any(not(feature = "time"), loom))]
@@ -64,11 +56,4 @@ mod variant {
) -> (Driver, Handle) {
(io_driver, ())
}
-
- pub(crate) fn with_default<F, R>(_handler: &Handle, _clock: &Clock, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- f()
- }
}
diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs
index 4d18ee23..c4589308 100644
--- a/tokio/src/task/spawn.rs
+++ b/tokio/src/task/spawn.rs
@@ -123,6 +123,8 @@ doc_rt_core! {
T: Future + Send + 'static,
T::Output: Send + 'static,
{
- runtime::spawn(task)
+ let spawn_handle = runtime::context::ThreadContext::spawn_handle()
+ .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
+ spawn_handle.spawn(task)
}
}
diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs
index e44f4925..5ece7f5a 100644
--- a/tokio/src/time/clock.rs
+++ b/tokio/src/time/clock.rs
@@ -22,21 +22,13 @@ cfg_not_test_util! {
pub(crate) fn now(&self) -> Instant {
now()
}
-
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- f()
- }
}
}
cfg_test_util! {
use crate::time::{Duration, Instant};
-
- use std::cell::Cell;
use std::sync::{Arc, Mutex};
+ use crate::runtime::context;
/// A handle to a source of time.
#[derive(Debug, Clone)]
@@ -53,11 +45,6 @@ cfg_test_util! {
frozen: Mutex<Option<Duration>>,
}
- thread_local! {
- /// Thread-local tracking the current clock
- static CLOCK: Cell<Option<*const Clock>> = Cell::new(None)
- }
-
/// Pause time
///
/// The current value of `Instant::now()` is saved and all subsequent calls
@@ -69,21 +56