diff options
author | Gardner Vickers <gardner@vickers.me> | 2019-12-24 18:34:47 -0500 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-24 15:34:47 -0800 |
commit | 67bf9c36f347031ca05872d102a7f9abc8b465f0 (patch) | |
tree | 22c0f472b594382a1bb6fa186b7f40fe4c17013d | |
parent | 101f770af33ae65820e1cc0e9b89d998b3c1317a (diff) |
rt: coalesce thread-locals used by the runtime (#1925)
Previously, thread-locals used by the various drivers were situated
with the driver code. This resulted in state being spread out and many
thread-locals being required to run a runtime.
This PR coalesces the thread-locals into a single struct.
-rw-r--r-- | tokio/Cargo.toml | 5 | ||||
-rw-r--r-- | tokio/benches/spawn.rs | 70 | ||||
-rw-r--r-- | tokio/src/io/driver/mod.rs | 54 | ||||
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 71 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 18 | ||||
-rw-r--r-- | tokio/src/runtime/context.rs | 146 | ||||
-rw-r--r-- | tokio/src/runtime/global.rs | 102 | ||||
-rw-r--r-- | tokio/src/runtime/handle.rs | 15 | ||||
-rw-r--r-- | tokio/src/runtime/io.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/spawner.rs | 16 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/spawner.rs | 8 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 54 | ||||
-rw-r--r-- | tokio/src/runtime/time.rs | 15 | ||||
-rw-r--r-- | tokio/src/task/spawn.rs | 4 | ||||
-rw-r--r-- | tokio/src/time/clock.rs | 120 | ||||
-rw-r--r-- | tokio/src/time/driver/handle.rs | 48 | ||||
-rw-r--r-- | tokio/src/time/driver/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/tests/mock_clock.rs | 27 |
20 files changed, 345 insertions, 448 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 48b76c74..6f0591ce 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -127,6 +127,7 @@ futures = { version = "0.3.0", features = ["async-await"] } loom = { version = "0.2.13", features = ["futures", "checkpoint"] } proptest = "0.9.4" tempfile = "3.1.0" +bencher = "0.1.5" [package.metadata.docs.rs] all-features = true @@ -134,3 +135,7 @@ rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] features = ["full"] + +[[bench]] +name = "spawn" +harness = false
\ No newline at end of file diff --git a/tokio/benches/spawn.rs b/tokio/benches/spawn.rs new file mode 100644 index 00000000..78e0b784 --- /dev/null +++ b/tokio/benches/spawn.rs @@ -0,0 +1,70 @@ +//! Benchmark spawning a task onto the basic and threaded Tokio executors. +//! This essentially measure the time to enqueue a task in the local and remote +//! case. + +use bencher::{black_box, Bencher}; + +async fn work() -> usize { + let val = 1 + 1; + black_box(val) +} + +fn basic_scheduler_local_spawn(bench: &mut Bencher) { + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .build() + .unwrap(); + runtime.block_on(async { + bench.iter(|| { + let h = tokio::spawn(work()); + black_box(h); + }) + }); +} + +fn threaded_scheduler_local_spawn(bench: &mut Bencher) { + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + runtime.block_on(async { + bench.iter(|| { + let h = tokio::spawn(work()); + black_box(h); + }) + }); +} + +fn basic_scheduler_remote_spawn(bench: &mut Bencher) { + let runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .build() + .unwrap(); + let handle = runtime.handle(); + bench.iter(|| { + let h = handle.spawn(work()); + black_box(h); + }); +} + +fn threaded_scheduler_remote_spawn(bench: &mut Bencher) { + let runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + let handle = runtime.handle(); + bench.iter(|| { + let h = handle.spawn(work()); + black_box(h); + }); +} + +bencher::benchmark_group!( + benches, + basic_scheduler_local_spawn, + threaded_scheduler_local_spawn, + basic_scheduler_remote_spawn, + threaded_scheduler_remote_spawn +); + +bencher::benchmark_main!(benches); diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 58ce5124..dfb741be 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -5,13 +5,13 @@ pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::loom::sync::atomic::AtomicUsize; use crate::park::{Park, Unpark}; +#[cfg(all(feature = "io-driver", not(loom)))] +use crate::runtime::context; use crate::util::slab::{Address, Slab}; use mio::event::Evented; -use std::cell::RefCell; use std::fmt; use std::io; -use std::marker::PhantomData; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Weak}; use std::task::Waker; @@ -54,11 +54,6 @@ pub(super) enum Direction { Write, } -thread_local! { - /// Tracks the reactor for the current execution context. - static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None) -} - const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL); fn _assert_kinds() { @@ -69,40 +64,6 @@ fn _assert_kinds() { // ===== impl Driver ===== -#[derive(Debug)] -/// Guard that resets current reactor on drop. -pub(crate) struct DefaultGuard<'a> { - _lifetime: PhantomData<&'a u8>, -} - -impl Drop for DefaultGuard<'_> { - fn drop(&mut self) { - CURRENT_REACTOR.with(|current| { - let mut current = current.borrow_mut(); - *current = None; - }); - } -} - -/// Sets handle for a default reactor, returning guard that unsets it on drop. -pub(crate) fn set_default(handle: &Handle) -> DefaultGuard<'_> { - CURRENT_REACTOR.with(|current| { - let mut current = current.borrow_mut(); - - assert!( - current.is_none(), - "default Tokio reactor already set \ - for execution context" - ); - - *current = Some(handle.clone()); - }); - - DefaultGuard { - _lifetime: PhantomData, - } -} - impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. @@ -237,11 +198,14 @@ impl Handle { /// # Panics /// /// This function panics if there is no current reactor set. + #[cfg(all(feature = "io-driver", not(loom)))] pub(super) fn current() -> Self { - CURRENT_REACTOR.with(|current| match *current.borrow() { - Some(ref handle) => handle.clone(), - None => panic!("no current reactor"), - }) + context::ThreadContext::io_handle().expect("no current reactor") + } + + #[cfg(any(not(feature = "io-driver"), loom))] + pub(super) fn current() -> Self { + panic!("no current reactor") } /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 53b8bcc9..f809db41 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -119,37 +119,35 @@ where guard }); - runtime::global::with_basic_scheduler(scheduler, || { - let mut _enter = runtime::enter(); + let mut _enter = runtime::enter(); - let raw_waker = RawWaker::new( - scheduler as *const SchedulerPriv as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), - ); + let raw_waker = RawWaker::new( + scheduler as *const SchedulerPriv as *const (), + &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), + ); - let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); - let mut cx = Context::from_waker(&waker); + let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); + let mut cx = Context::from_waker(&waker); - // `block_on` takes ownership of `f`. Once it is pinned here, the - // original `f` binding can no longer be accessed, making the - // pinning safe. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; + // `block_on` takes ownership of `f`. Once it is pinned here, the + // original `f` binding can no longer be accessed, making the + // pinning safe. + let mut future = unsafe { Pin::new_unchecked(&mut future) }; - loop { - if let Ready(v) = future.as_mut().poll(&mut cx) { - return v; - } + loop { + if let Ready(v) = future.as_mut().poll(&mut cx) { + return v; + } - scheduler.tick(local); + scheduler.tick(local); - // Maintenance work - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on (which we are). - scheduler.queues.drain_pending_drop(); - } + // Maintenance work + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on (which we are). + scheduler.queues.drain_pending_drop(); } - }) + } } } @@ -164,15 +162,6 @@ impl Spawner { self.scheduler.schedule(task, true); handle } - - /// Enter the executor context - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - use crate::runtime::global; - global::with_basic_scheduler(&*self.scheduler, f) - } } // === impl SchedulerPriv === @@ -217,20 +206,10 @@ impl SchedulerPriv { .expect("failed to park"); } - /// # Safety + /// Schedule the provided task on the scheduler. /// - /// Must be called from the same thread that holds the `BasicScheduler` - /// value. - pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (task, handle) = task::joinable(future); - self.queues.push_local(task); - handle - } - + /// If this scheduler is the `ACTIVE` scheduler, enqueue this task on the local queue, otherwise + /// the task is enqueued on the remote queue. fn schedule(&self, task: Task<Self>, spawn: bool) { let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 43f94a48..2aa4c8bf 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -5,7 +5,7 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; -use crate::runtime::{self, io, time, Builder, Callback}; +use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; use crate::task::{self, JoinHandle}; use std::cell::Cell; @@ -243,13 +243,17 @@ impl Spawner { if let Some(stack_size) = self.inner.stack_size { builder = builder.stack_size(stack_size); } - + let thread_context = ThreadContext::new( + self.inner.spawner.clone(), + self.inner.io_handle.clone(), + self.inner.time_handle.clone(), + Some(self.inner.clock.clone()), + ); let spawner = self.clone(); - builder .spawn(move || { + let _e = thread_context.enter(); run_thread(spawner); - drop(shutdown_tx); }) .unwrap(); @@ -259,11 +263,7 @@ impl Spawner { fn run_thread(spawner: Spawner) { spawner.enter(|| { let inner = &*spawner.inner; - let _io = io::set_default(&inner.io_handle); - - time::with_default(&inner.time_handle, &inner.clock, || { - inner.spawner.enter(|| inner.run()); - }); + inner.run() }); } diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs new file mode 100644 index 00000000..6ac60b07 --- /dev/null +++ b/tokio/src/runtime/context.rs @@ -0,0 +1,146 @@ +//! Thread local runtime context +use crate::runtime::Spawner; +use std::cell::RefCell; + +thread_local! { + static CONTEXT: RefCell<Option<ThreadContext>> = RefCell::new(None) +} + +/// ThreadContext makes Runtime context accessible to each Runtime thread. +#[derive(Debug, Clone)] +pub(crate) struct ThreadContext { + /// Handles to the executor. + spawner: Spawner, + + /// Handles to the I/O drivers + io_handle: crate::runtime::io::Handle, + + /// Handles to the time drivers + time_handle: crate::runtime::time::Handle, + + /// Source of `Instant::now()` + clock: Option<crate::runtime::time::Clock>, +} + +impl Default for ThreadContext { + fn default() -> Self { + ThreadContext { + spawner: Spawner::Shell, + #[cfg(all(feature = "io-driver", not(loom)))] + io_handle: None, + #[cfg(any(not(feature = "io-driver"), loom))] + io_handle: (), + #[cfg(all(feature = "time", not(loom)))] + time_handle: None, + #[cfg(any(not(feature = "time"), loom))] + time_handle: (), + clock: None, + } + } +} + +impl ThreadContext { + /// Construct a new [`ThreadContext`] + /// + /// [`ThreadContext`]: struct.ThreadContext.html + pub(crate) fn new( + spawner: Spawner, + io_handle: crate::runtime::io::Handle, + time_handle: crate::runtime::time::Handle, + clock: Option<crate::runtime::time::Clock>, + ) -> Self { + ThreadContext { + spawner, + #[cfg(all(feature = "io-driver", not(loom)))] + io_handle, + #[cfg(any(not(feature = "io-driver"), loom))] + io_handle, + #[cfg(all(feature = "time", not(loom)))] + time_handle, + #[cfg(any(not(feature = "time"), loom))] + time_handle, + clock, + } + } + + /// Clone the current [`ThreadContext`] if one is set, otherwise construct a new [`ThreadContext`]. + /// + /// [`ThreadContext`]: struct.ThreadContext.html + #[allow(dead_code)] + pub(crate) fn clone_current() -> Self { + CONTEXT.with(|ctx| ctx.borrow().clone().unwrap_or_else(Default::default)) + } + + /// Set this [`ThreadContext`] as the current active [`ThreadContext`]. + /// + /// [`ThreadContext`]: struct.ThreadContext.html + pub(crate) fn enter(self) -> ThreadContextDropGuard { + CONTEXT.with(|ctx| { + let previous = ctx.borrow_mut().replace(self); + ThreadContextDropGuard { previous } + }) + } + + #[cfg(all(feature = "test-util", feature = "time", test))] + pub(crate) fn with_time_handle(mut self, handle: crate::runtime::time::Handle) -> Self { + self.time_handle = handle; + self + } + + #[cfg(all(feature = "test-util", feature = "time", test))] + pub(crate) fn with_clock(mut self, clock: crate::runtime::time::Clock) -> Self { + self.clock.replace(clock); + self + } + + #[cfg(all(feature = "io-driver", not(loom)))] + pub(crate) fn io_handle() -> crate::runtime::io::Handle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.io_handle.clone(), + None => None, + }) + } + + #[cfg(all(feature = "time", not(loom)))] + pub(crate) fn time_handle() -> crate::runtime::time::Handle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.time_handle.clone(), + None => None, + }) + } + + #[cfg(feature = "rt-core")] + pub(crate) fn spawn_handle() -> Option<Spawner> { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => Some(ctx.spawner.clone()), + None => None, + }) + } + + #[cfg(all(feature = "test-util", feature = "time"))] + pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { + CONTEXT.with( + |ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) { + Some(Some(clock)) => Some(clock), + _ => None, + }, + ) + } +} + +/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop. +/// +/// [`ThreadContextDropGuard`]: struct.ThreadContextDropGuard.html +#[derive(Debug)] +pub(crate) struct ThreadContextDropGuard { + previous: Option<ThreadContext>, +} + +impl Drop for ThreadContextDropGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| match self.previous.clone() { + Some(prev) => ctx.borrow_mut().replace(prev), + None => ctx.borrow_mut().take(), + }); + } +} diff --git a/tokio/src/runtime/global.rs b/tokio/src/runtime/global.rs deleted file mode 100644 index c84f348b..00000000 --- a/tokio/src/runtime/global.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::runtime::basic_scheduler; -use crate::task::JoinHandle; - -use std::cell::Cell; -use std::future::Future; - -#[derive(Clone, Copy)] -enum State { - // default executor not defined - Empty, - - // Basic scheduler (runs on the current-thread) - Basic(*const basic_scheduler::SchedulerPriv), - - // default executor is a thread pool instance. - #[cfg(feature = "rt-threaded")] - ThreadPool(*const thread_pool::Spawner), -} - -thread_local! { - /// Thread-local tracking the current executor - static EXECUTOR: Cell<State> = Cell::new(State::Empty) -} - -// ===== global spawn fns ===== - -/// Spawns a future on the default executor. -pub(crate) fn spawn<T>(future: T) -> JoinHandle<T::Output> -where - T: Future + Send + 'static, - T::Output: Send + 'static, -{ - EXECUTOR.with(|current_executor| match current_executor.get() { - #[cfg(feature = "rt-threaded")] - State::ThreadPool(thread_pool_ptr) => { - let thread_pool = unsafe { &*thread_pool_ptr }; - thread_pool.spawn(future) - } - State::Basic(basic_scheduler_ptr) => { - let basic_scheduler = unsafe { &*basic_scheduler_ptr }; - - // Safety: The `BasicScheduler` value set the thread-local (same - // thread). - unsafe { basic_scheduler.spawn(future) } - } - State::Empty => { - // Explicit drop of `future` silences the warning that `future` is - // not used when neither rt-* feature flags are enabled. - drop(future); - panic!("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); - } - }) -} - -pub(super) fn with_basic_scheduler<F, R>( - basic_scheduler: &basic_scheduler::SchedulerPriv, - f: F, -) -> R -where - F: FnOnce() -> R, -{ - with_state( - State::Basic(basic_scheduler as *const basic_scheduler::SchedulerPriv), - f, - ) -} - -cfg_rt_threaded! { - use crate::runtime::thread_pool; - - pub(super) fn with_thread_pool<F, R>(thread_pool: &thread_pool::Spawner, f: F) -> R - where - F: FnOnce() -> R, - { - with_state(State::ThreadPool(thread_pool as *const _), f) - } -} - -fn with_state<F, R>(state: State, f: F) -> R -where - F: FnOnce() -> R, -{ - EXECUTOR.with(|cell| { - let was = cell.replace(State::Empty); - - // Ensure that the executor is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell<State>, State); - - impl Drop for Reset<'_> { - fn drop(&mut self) { - self.0.set(self.1); - } - } - - let _reset = Reset(cell, was); - - cell.set(state); - - f() - }) -} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 562a33ce..ed3348df 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,4 +1,4 @@ -use crate::runtime::{blocking, io, time, Spawner}; +use crate::runtime::{blocking, context, io, time, Spawner}; cfg_rt_core! { use crate::task::JoinHandle; @@ -30,11 +30,14 @@ impl Handle { where F: FnOnce() -> R, { - self.blocking_spawner.enter(|| { - let _io = io::set_default(&self.io_handle); - - time::with_default(&self.time_handle, &self.clock, || self.spawner.enter(f)) - }) + let _e = context::ThreadContext::new( + self.spawner.clone(), + self.io_handle.clone(), + self.time_handle.clone(), + Some(self.clock.clone()), + ) + .enter(); + self.blocking_spawner.enter(|| f()) } } diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs index e912f6f7..dca7310f 100644 --- a/tokio/src/runtime/io.rs +++ b/tokio/src/runtime/io.rs @@ -38,10 +38,6 @@ mod variant { Ok((Either::B(driver), None)) } } - - pub(crate) fn set_default(handle: &Handle) -> Option<driver::DefaultGuard<'_>> { - handle.as_ref().map(|handle| driver::set_default(handle)) - } } #[cfg(any(not(feature = "io-driver"), loom))] @@ -61,6 +57,4 @@ mod variant { Ok((driver, ())) } - - pub(crate) fn set_default(_handle: &Handle) {} } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 800df861..54524a91 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -187,6 +187,7 @@ #[cfg(test)] #[macro_use] mod tests; +pub(crate) mod context; cfg_rt_core! { mod basic_scheduler; @@ -206,11 +207,6 @@ pub use self::builder::Builder; pub(crate) mod enter; use self::enter::enter; -cfg_rt_core! { - mod global; - pub(crate) use global::spawn; -} - mod handle; pub use self::handle::Handle; diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index cc2fbb53..d136945c 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -18,22 +18,6 @@ pub(crate) enum Spawner { ThreadPool(thread_pool::Spawner), } -impl Spawner { - /// Enter the scheduler context - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - match self { - Spawner::Shell => f(), - #[cfg(feature = "rt-core")] - Spawner::Basic(spawner) => spawner.enter(f), - #[cfg(feature = "rt-threaded")] - Spawner::ThreadPool(spawner) => spawner.enter(f), - } - } -} - cfg_rt_core! { impl Spawner { pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 6a50fe9c..c22ce8b9 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -89,10 +89,8 @@ impl ThreadPool { where F: Future, { - self.spawner.enter(|| { - let mut enter = crate::runtime::enter(); - enter.block_on(future).expect("failed to park thread") - }) + let mut enter = crate::runtime::enter(); + enter.block_on(future).expect("failed to park thread") } } diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs index 4fccad96..976fd32d 100644 --- a/tokio/src/runtime/thread_pool/spawner.rs +++ b/tokio/src/runtime/thread_pool/spawner.rs @@ -36,14 +36,6 @@ impl Spawner { self.workers.spawn_typed(future) } - /// Enter the executor context - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - crate::runtime::global::with_thread_pool(self, f) - } - /// Reference to the worker set. Used by `ThreadPool` to initiate shutdown. pub(super) fn workers(&self) -> &slice::Set { &*self.workers diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 18c0db1f..fbf7a1fc 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -2,7 +2,7 @@ use crate::loom::cell::CausalCell; use crate::loom::sync::Arc; use crate::park::Park; use crate::runtime::park::Parker; -use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner}; +use crate::runtime::thread_pool::{current, slice, Owned, Shared}; use crate::runtime::{self, blocking}; use crate::task::Task; @@ -126,45 +126,41 @@ impl Worker { None => return, }; - let spawner = Spawner::new(self.slices.clone()); - // Track the current worker current::set(&self.slices, self.index, || { // Enter a runtime context let _enter = crate::runtime::enter(); - crate::runtime::global::with_thread_pool(&spawner, || { - blocking_pool.enter(|| { - ON_BLOCK.with(|ob| { - // Ensure that the ON_BLOCK is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>); - - impl<'a> Drop for Reset<'a> { - fn drop(&mut self) { - self.0.set(None); - } + blocking_pool.enter(|| { + ON_BLOCK.with(|ob| { + // Ensure that the ON_BLOCK is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.set(None); } + } - let _reset = Reset(ob); + let _reset = Reset(ob); - let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool); + let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool); - ob.set(Some(unsafe { - // NOTE: We cannot use a safe cast to raw pointer here, since we are - // _also_ erasing the lifetime of these pointers. That is safe here, - // because we know that ob will set back to None before allow_blocking - // is dropped. - #[allow(clippy::useless_transmute)] - std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) - })); + ob.set(Some(unsafe { + // NOTE: We cannot use a safe cast to raw pointer here, since we are + // _also_ erasing the lifetime of these pointers. That is safe here, + // because we know that ob will set back to None before allow_blocking + // is dropped. + #[allow(clippy::useless_transmute)] + std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) + })); - let _ = guard.run(); + let _ = guard.run(); - // Ensure that we reset ob before allow_blocking is dropped. - drop(_reset); - }); - }) + // Ensure that we reset ob before allow_blocking is dropped. + drop(_reset); + }); }) }); diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs index 1cd58cd6..6259c87a 100644 --- a/tokio/src/runtime/time.rs +++ b/tokio/src/runtime/time.rs @@ -34,14 +34,6 @@ mod variant { (Either::B(io_driver), None) } } - - pub(crate) fn with_default<F, R>(handle: &Handle, clock: &Clock, f: F) -> R - where - F: FnOnce() -> R, - { - let _time = handle.as_ref().map(|handle| driver::set_default(handle)); - clock.enter(f) - } } #[cfg(any(not(feature = "time"), loom))] @@ -64,11 +56,4 @@ mod variant { ) -> (Driver, Handle) { (io_driver, ()) } - - pub(crate) fn with_default<F, R>(_handler: &Handle, _clock: &Clock, f: F) -> R - where - F: FnOnce() -> R, - { - f() - } } diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 4d18ee23..c4589308 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -123,6 +123,8 @@ doc_rt_core! { T: Future + Send + 'static, T::Output: Send + 'static, { - runtime::spawn(task) + let spawn_handle = runtime::context::ThreadContext::spawn_handle() + .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); + spawn_handle.spawn(task) } } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index e44f4925..5ece7f5a 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -22,21 +22,13 @@ cfg_not_test_util! { pub(crate) fn now(&self) -> Instant { now() } - - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - f() - } } } cfg_test_util! { use crate::time::{Duration, Instant}; - - use std::cell::Cell; use std::sync::{Arc, Mutex}; + use crate::runtime::context; /// A handle to a source of time. #[derive(Debug, Clone)] @@ -53,11 +45,6 @@ cfg_test_util! { frozen: Mutex<Option<Duration>>, } - thread_local! { - /// Thread-local tracking the current clock - static CLOCK: Cell<Option<*const Clock>> = Cell::new(None) - } - /// Pause time /// /// The current value of `Instant::now()` is saved and all subsequent calls @@ -69,21 +56 |