diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-07 07:53:40 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-07 07:53:40 -0800 |
commit | 45da5f3510a61599c89dc458ecc859f13a81e255 (patch) | |
tree | c85b31879c37f06d3c58cd7d6db99c7e2ce2cf89 /tokio | |
parent | 855d39f849cc16d3c68df5abf0bbb28e3351cdf0 (diff) |
rt: cleanup runtime::context (#2063)
Tweak context to remove more fns and usage of `Option`. Remove
`ThreadContext` struct as it is reduced to just `Handle`. Avoid passing
around individual driver handles and instead limit to the
`runtime::Handle` struct.
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/io/driver/mod.rs | 9 | ||||
-rw-r--r-- | tokio/src/macros/cfg.rs | 1 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 43 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 81 | ||||
-rw-r--r-- | tokio/src/runtime/builder.rs | 32 | ||||
-rw-r--r-- | tokio/src/runtime/context.rs | 166 | ||||
-rw-r--r-- | tokio/src/runtime/handle.rs | 23 | ||||
-rw-r--r-- | tokio/src/runtime/io.rs | 7 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 9 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 55 | ||||
-rw-r--r-- | tokio/src/runtime/time.rs | 4 | ||||
-rw-r--r-- | tokio/src/task/spawn.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/clock.rs | 8 | ||||
-rw-r--r-- | tokio/src/time/driver/handle.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/tests/mock_clock.rs | 212 |
16 files changed, 120 insertions, 536 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index dfb741be..a36a40fa 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -5,7 +5,6 @@ pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::loom::sync::atomic::AtomicUsize; use crate::park::{Park, Unpark}; -#[cfg(all(feature = "io-driver", not(loom)))] use crate::runtime::context; use crate::util::slab::{Address, Slab}; @@ -198,14 +197,8 @@ impl Handle { /// # Panics /// /// This function panics if there is no current reactor set. - #[cfg(all(feature = "io-driver", not(loom)))] pub(super) fn current() -> Self { - context::ThreadContext::io_handle().expect("no current reactor") - } - - #[cfg(any(not(feature = "io-driver"), loom))] - pub(super) fn current() -> Self { - panic!("no current reactor") + context::io_handle().expect("no current reactor") } /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index cc93cc8a..1f168255 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -4,7 +4,6 @@ macro_rules! cfg_resource_drivers { ($($item:item)*) => { $( #[cfg(any(feature = "io-driver", feature = "time"))] - #[cfg(not(loom))] $item )* } diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 70832d46..be56e8f8 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -11,43 +11,23 @@ cfg_blocking_impl! { mod shutdown; mod task; - use crate::runtime::{self, Builder, io, time}; + use crate::runtime::Builder; - pub(crate) fn create_blocking_pool( - builder: &Builder, - spawner: &runtime::Spawner, - io: &io::Handle, - time: &time::Handle, - clock: &time::Clock, - thread_cap: usize, - ) -> BlockingPool { - BlockingPool::new( - builder, - spawner, - io, - time, - clock, - thread_cap) + pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) } } cfg_not_blocking_impl! { - use crate::runtime::{self, io, time, Builder}; + use crate::runtime::Builder; #[derive(Debug, Clone)] pub(crate) struct BlockingPool {} pub(crate) use BlockingPool as Spawner; - pub(crate) fn create_blocking_pool( - _builder: &Builder, - _spawner: &runtime::Spawner, - _io: &io::Handle, - _time: &time::Handle, - _clock: &time::Clock, - _thread_cap: usize, - ) -> BlockingPool { + pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { BlockingPool {} } @@ -55,18 +35,5 @@ cfg_not_blocking_impl! { pub(crate) fn spawner(&self) -> &BlockingPool { self } - - #[cfg(any( - feature = "blocking", - feature = "dns", - feature = "fs", - feature = "io-std", - ))] - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - f() - } } } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 320bd7f2..2a618ff5 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -5,7 +5,7 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; -use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; +use crate::runtime::{Builder, Callback, Handle}; use crate::task::{self, JoinHandle}; use std::collections::VecDeque; @@ -41,18 +41,6 @@ struct Inner { /// Call before a thread stops before_stop: Option<Callback>, - /// Spawns async tasks - spawner: runtime::Spawner, - - /// Runtime I/O driver handle - io_handle: io::Handle, - - /// Runtime time driver handle - time_handle: time::Handle, - - /// Source of `Instant::now()` - clock: time::Clock, - thread_cap: usize, } @@ -74,27 +62,17 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, { - use crate::runtime::context::ThreadContext; - - let schedule = - ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime."); + let rt = Handle::current(); let (task, handle) = task::joinable(BlockingTask::new(func)); - schedule.schedule(task); + rt.blocking_spawner.spawn(task, &rt); handle } // ===== impl BlockingPool ===== impl BlockingPool { - pub(crate) fn new( - builder: &Builder, - spawner: &runtime::Spawner, - io: &io::Handle, - time: &time::Handle, - clock: &time::Clock, - thread_cap: usize, - ) -> BlockingPool { + pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); BlockingPool { @@ -113,10 +91,6 @@ impl BlockingPool { stack_size: builder.thread_stack_size, after_start: builder.after_start.clone(), before_stop: builder.before_stop.clone(), - spawner: spawner.clone(), - io_handle: io.clone(), - time_handle: time.clone(), - clock: clock.clone(), thread_cap, }), }, @@ -152,21 +126,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - /// Set the blocking pool for the duration of the closure - /// - /// If a blocking pool is already set, it will be restored when the closure - /// returns or if it panics. - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - let ctx = crate::runtime::context::ThreadContext::clone_current(); - let _e = ctx.with_blocking_spawner(self.clone()).enter(); - - f() - } - - fn schedule(&self, task: Task) { + fn spawn(&self, task: Task, rt: &Handle) { let shutdown_tx = { let mut shared = self.inner.shared.lock().unwrap(); @@ -205,41 +165,32 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - self.spawn_thread(shutdown_tx); + self.spawn_thread(shutdown_tx, rt); } } - fn spawn_thread(&self, shutdown_tx: shutdown::Sender) { + fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); if let Some(stack_size) = self.inner.stack_size { builder = builder.stack_size(stack_size); } - let thread_context = ThreadContext::new( - self.inner.spawner.clone(), - self.inner.io_handle.clone(), - self.inner.time_handle.clone(), - Some(self.inner.clock.clone()), - Some(self.clone()), - ); - let spawner = self.clone(); + + let rt = rt.clone(); + builder .spawn(move || { - let _e = thread_context.enter(); - run_thread(spawner); - drop(shutdown_tx); + // Only the reference should be moved into the closure + let rt = &rt; + rt.enter(move || { + rt.blocking_spawner.inner.run(); + drop(shutdown_tx); + }) }) .unwrap(); } } -fn run_thread(spawner: Spawner) { - spawner.enter(|| { - let inner = &*spawner.inner; - inner.run() - }); -} - impl Inner { fn run(&self) { if let Some(f) = &self.after_start { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index fd047a84..a5d80f51 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -325,14 +325,7 @@ impl Builder { let spawner = Spawner::Shell; - let blocking_pool = blocking::create_blocking_pool( - self, - &spawner, - &io_handle, - &time_handle, - &clock, - self.max_threads, - ); + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { @@ -425,7 +418,7 @@ cfg_rt_core! { let spawner = Spawner::Basic(scheduler.spawner()); // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads); + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { @@ -465,21 +458,24 @@ cfg_rt_threaded! { let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads); + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); let blocking_spawner = blocking_pool.spawner().clone(); + // Create the runtime handle + let handle = Handle { + spawner, + io_handle, + time_handle, + clock, + blocking_spawner, + }; + // Spawn the thread pool workers - workers.spawn(&blocking_spawner); + workers.spawn(&handle); Ok(Runtime { kind: Kind::ThreadPool(scheduler), - handle: Handle { - spawner, - io_handle, - time_handle, - clock, - blocking_spawner, - }, + handle, blocking_pool, }) } diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 07c311ed..cfc51def 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -1,99 +1,26 @@ //! Thread local runtime context -use crate::runtime::Spawner; +use crate::runtime::Handle; + use std::cell::RefCell; thread_local! { - static CONTEXT: RefCell<Option<ThreadContext>> = RefCell::new(None) -} - -/// ThreadContext makes Runtime context accessible to each Runtime thread. -#[derive(Debug, Clone)] -pub(crate) struct ThreadContext { - /// Handles to the executor. - spawner: Spawner, - - /// Handles to the I/O drivers - io_handle: crate::runtime::io::Handle, - - /// Handles to the time drivers - time_handle: crate::runtime::time::Handle, - - /// Source of `Instant::now()` - clock: Option<crate::runtime::time::Clock>, - - /// Blocking pool spawner - blocking_spawner: Option<crate::runtime::blocking::Spawner>, + static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None) } -impl Default for ThreadContext { - fn default() -> Self { - ThreadContext { - spawner: Spawner::Shell, - #[cfg(all(feature = "io-driver", not(loom)))] - io_handle: None, - #[cfg(any(not(feature = "io-driver"), loom))] - io_handle: (), - #[cfg(all(feature = "time", not(loom)))] - time_handle: None, - #[cfg(any(not(feature = "time"), loom))] - time_handle: (), - clock: None, - blocking_spawner: None, - } - } +pub(crate) fn current() -> Option<Handle> { + CONTEXT.with(|ctx| ctx.borrow().clone()) } -impl ThreadContext { - /// Construct a new [`ThreadContext`] - /// - /// [`ThreadContext`]: struct.ThreadContext.html - pub(crate) fn new( - spawner: Spawner, - io_handle: crate::runtime::io::Handle, - time_handle: crate::runtime::time::Handle, - clock: Option<crate::runtime::time::Clock>, - blocking_spawner: Option<crate::runtime::blocking::Spawner>, - ) -> Self { - ThreadContext { - spawner, - #[cfg(all(feature = "io-driver", not(loom)))] - io_handle, - #[cfg(any(not(feature = "io-driver"), loom))] - io_handle, - #[cfg(all(feature = "time", not(loom)))] - time_handle, - #[cfg(any(not(feature = "time"), loom))] - time_handle, - clock, - blocking_spawner, - } - } - - /// Clone the current [`ThreadContext`] if one is set, otherwise construct a new [`ThreadContext`]. - /// - /// [`ThreadContext`]: struct.ThreadContext.html - #[allow(dead_code)] - pub(crate) fn clone_current() -> Self { - CONTEXT.with(|ctx| ctx.borrow().clone().unwrap_or_else(Default::default)) - } - - /// Set this [`ThreadContext`] as the current active [`ThreadContext`]. - /// - /// [`ThreadContext`]: struct.ThreadContext.html - pub(crate) fn enter(self) -> ThreadContextDropGuard { - CONTEXT.with(|ctx| { - let previous = ctx.borrow_mut().replace(self); - ThreadContextDropGuard { previous } - }) - } - +cfg_io_driver! { pub(crate) fn io_handle() -> crate::runtime::io::Handle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.io_handle.clone(), None => Default::default(), }) } +} +cfg_time! { pub(crate) fn time_handle() -> crate::runtime::time::Handle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.time_handle.clone(), @@ -101,61 +28,46 @@ impl ThreadContext { }) } - pub(crate) fn spawn_handle() -> Option<Spawner> { + cfg_test_util! { + pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => Some(ctx.clock.clone()), + None => None, + }) + } + } +} + +cfg_rt_core! { + pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => Some(ctx.spawner.clone()), None => None, }) } - - pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { - CONTEXT.with( - |ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) { - Some(Some(clock)) => Some(clock), - _ => None, - }, - ) - } - - pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> { - CONTEXT.with(|ctx| { - match ctx - .borrow() - .as_ref() - .map(|ctx| ctx.blocking_spawner.clone()) - { - Some(Some(blocking_spawner)) => Some(blocking_spawner), - _ => None, - } - }) - } } -cfg_blocking_impl! { - impl ThreadContext { - pub(crate) fn with_blocking_spawner( - mut self, - blocking_spawner: crate::runtime::blocking::Spawner, - ) -> Self { - self.blocking_spawner.replace(blocking_spawner); - self +/// Set this [`ThreadContext`] as the current active [`ThreadContext`]. +/// +/// [`ThreadContext`]: struct.ThreadContext.html +pub(crate) fn enter<F, R>(new: Handle, f: F) -> R +where + F: FnOnce() -> R, +{ + struct DropGuard(Option<Handle>); + + impl Drop for DropGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| { + *ctx.borrow_mut() = self.0.take(); + }); } } -} -/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop. -/// -/// [`ThreadContextDropGuard`]: struct.ThreadContextDropGuard.html -#[derive(Debug)] -pub(crate) struct ThreadContextDropGuard { - previous: Option<ThreadContext>, -} + let _guard = CONTEXT.with(|ctx| { + let old = ctx.borrow_mut().replace(new); + DropGuard(old) + }); -impl Drop for ThreadContextDropGuard { - fn drop(&mut self) { - CONTEXT.with(|ctx| match self.previous.clone() { - Some(prev) => ctx.borrow_mut().replace(prev), - None => ctx.borrow_mut().take(), - }); - } + f() } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 72365025..ae7b416e 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -30,16 +30,7 @@ impl Handle { where F: FnOnce() -> R, { - let _e = context::ThreadContext::new( - self.spawner.clone(), - self.io_handle.clone(), - self.time_handle.clone(), - Some(self.clock.clone()), - Some(self.blocking_spawner.clone()), - ) - .enter(); - - f() + context::enter(self.clone(), f) } /// Returns a Handle view over the currently running Runtime @@ -68,17 +59,7 @@ impl Handle { /// # } /// ``` pub fn current() -> Self { - use crate::runtime::context::ThreadContext; - - Handle { - spawner: ThreadContext::spawn_handle() - .expect("not currently running on the Tokio runtime."), - io_handle: ThreadContext::io_handle(), - time_handle: ThreadContext::time_handle(), - clock: ThreadContext::clock().expect("not currently running on the Tokio runtime."), - blocking_spawner: ThreadContext::blocking_spawner() - .expect("not currently running on the Tokio runtime."), - } + context::current().expect("not currently running on the Tokio runtime.") } } diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs index dca7310f..6a0953af 100644 --- a/tokio/src/runtime/io.rs +++ b/tokio/src/runtime/io.rs @@ -8,7 +8,7 @@ pub(crate) use std::io::Result; pub(crate) use variant::*; -#[cfg(all(feature = "io-driver", not(loom)))] +#[cfg(feature = "io-driver")] mod variant { use crate::io::driver; use crate::park::{Either, ParkThread}; @@ -28,6 +28,9 @@ mod variant { pub(crate) type Handle = Option<driver::Handle>; pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { + #[cfg(loom)] + assert!(!enable); + if enable { let driver = driver::Driver::new()?; let handle = driver.handle(); @@ -40,7 +43,7 @@ mod variant { } } -#[cfg(any(not(feature = "io-driver"), loom))] +#[cfg(not(feature = "io-driver"))] mod variant { use crate::park::ParkThread; diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index c22ce8b9..fd38c013 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -38,7 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; -use crate::runtime::{self, blocking, Parker}; +use crate::runtime::{self, Parker}; use crate::task::JoinHandle; use std::fmt; @@ -107,11 +107,10 @@ impl Drop for ThreadPool { } impl Workers { - pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) { - blocking_pool.enter(|| { + pub(crate) fn spawn(self, rt: &runtime::Handle) { + rt.enter(|| { for worker in self.workers { - let b = blocking_pool.clone(); - runtime::spawn_blocking(move || worker.run(b)); + runtime::spawn_blocking(move || worker.run()); } }); } diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index fbf7a1fc..44df8b74 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -1,9 +1,9 @@ use crate::loom::cell::CausalCell; use crate::loom::sync::Arc; use crate::park::Park; +use crate::runtime; use crate::runtime::park::Parker; use crate::runtime::thread_pool::{current, slice, Owned, Shared}; -use crate::runtime::{self, blocking}; use crate::task::Task; use std::cell::Cell; @@ -119,7 +119,7 @@ impl Worker { } } - pub(super) fn run(self, blocking_pool: blocking::Spawner) { + pub(super) fn run(self) { // First, acquire a lock on the worker. let guard = match self.acquire_lock() { Some(guard) => guard, @@ -131,37 +131,35 @@ impl Worker { // Enter a runtime context let _enter = crate::runtime::enter(); - blocking_pool.enter(|| { - ON_BLOCK.with(|ob| { - // Ensure that the ON_BLOCK is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>); + ON_BLOCK.with(|ob| { + // Ensure that the ON_BLOCK is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>); - impl<'a> Drop for Reset<'a> { - fn drop(&mut self) { - self.0.set(None); - } + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.set(None); } + } - let _reset = Reset(ob); + let _reset = Reset(ob); - let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool); + let allow_blocking: &dyn Fn() = &|| self.block_in_place(); - ob.set(Some(unsafe { - // NOTE: We cannot use a safe cast to raw pointer here, since we are - // _also_ erasing the lifetime of these pointers. That is safe here, - // because we know that ob will set back to None before allow_blocking - // is dropped. - #[allow(clippy::useless_transmute)] - std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) - })); + ob.set(Some(unsafe { + // NOTE: We cannot use a safe cast to raw pointer here, since we are + // _also_ erasing the lifetime of these pointers. That is safe here, + // because we know that ob will set back to None before allow_blocking + // is dropped. + #[allow(clippy::useless_transmute)] + std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) + })); - let _ = guard.run(); + let _ = guard.run(); - // Ensure that we reset ob before allow_blocking is dropped. - drop(_reset); - }); - }) + // Ensure that we reset ob before allow_blocking is dropped. + drop(_reset); + }); }); if self.gone.get() { @@ -206,7 +204,7 @@ impl Worker { } /// Enter an in-place blocking section - fn block_in_place(&self, blocking_pool: &blocking::Spawner) { + fn block_in_place(&self) { // If our Worker has already been given away, then blocking is fine! if self.gone.get() { return; @@ -259,8 +257,7 @@ impl Worker { }; // Give away the worker - let b = blocking_pool.clone(); - runtime::spawn_blocking(move || worker.run(b)); + runtime::spawn_blocking(move || worker.run()); } } diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs index 6259c87a..c623d964 100644 --- a/tokio/src/runtime/time.rs +++ b/tokio/src/runtime/time.rs @@ -5,7 +5,7 @@ pub(crate) use variant::*; -#[cfg(all(feature = "time", not(loom)))] +#[cfg(feature = "time")] mod variant { use crate::park::Either; use crate::runtime::io; @@ -36,7 +36,7 @@ mod variant { } } -#[cfg(any(not(feature = "time"), loom))] +#[cfg(not(feature = "time"))] mod variant { use crate::runtime::io; diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index c4589308..4e19f559 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -123,7 +123,7 @@ doc_rt_core! { T: Future + Send + 'static, T::Output: Send + 'static, { - let spawn_handle = runtime::context::ThreadContext::spawn_handle() + let spawn_handle = runtime::context::spawn_handle() .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); spawn_handle.spawn(task) } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index ae75740c..bd3045a9 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -64,7 +64,7 @@ cfg_test_util! { /// Panics if time is already frozen or if called from outside of the Tokio /// runtime. pub fn pause() { - let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime"); + let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime"); let mut frozen = clock.inner.frozen.lock().unwrap(); if frozen.is_some() { panic!("time is already frozen"); @@ -82,7 +82,7 @@ cfg_test_util! { /// Panics if time is not frozen or if called from outside of the Tokio /// runtime. pub fn resume() { - let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime"); + let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime"); let mut frozen = clock.inner.frozen.lock().unwrap(); if frozen.is_none() { @@ -102,14 +102,14 @@ cfg_test_util! { /// Panics if time is not frozen or if called from outside of the Tokio /// runtime. pub async fn advance(duration: Duration) { - let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime"); + let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime"); clock.advance(duration); crate::task::yield_now().await; } /// Return the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { - if let Some(clock) = context::ThreadContext::clock() { + if let Some(clock) = context::clock() { if let Some(frozen) = *clock.inner.frozen.lock().unwrap() { Instant::from_std(clock.inner.start + frozen) } else { diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index 6a8fe2bb..f24eaeb6 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -21,7 +21,7 @@ impl Handle { /// /// This function panics if there is no current timer set. pub(crate) fn current() -> Self { - context::ThreadContext::time_handle().expect("no current timer") + context::time_handle().expect("no current timer") } /// Try to return a strong ref to the inner diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index 24aae11e..7070d6b2 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -1,5 +1,3 @@ -#![cfg(not(loom))] - //! Utilities for tracking time. //! //! This module provides a number of types for executing code after a set period diff --git a/tokio/src/time/tests/mock_clock.rs b/tokio/src/time/tests/mock_clock.rs deleted file mode 100644 index ac509e3f..00000000 --- a/tokio/src/time/tests/mock_clock.rs +++ /dev/null @@ -1,212 +0,0 @@ -use crate::park::{Park, Unpark}; -use crate::runtime::context; -use crate::time::driver::Driver; -use crate::time::{Clock, Duration, Instant}; - -use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::{Arc, Mutex}; - -/// Run the provided closure with a `MockClock` that starts at the current time. -pub(crate) fn mock<F, R>(f: F) -> R -where - F: FnOnce(&mut Handle) -> R, -{ - let mut mock = MockClock::new(); - mock.enter(f) -} - -/// Mock clock for use with `tokio-timer` futures. -/// -/// A mock timer that is able to advance and wake after a -/// certain duration. -#[derive(Debug)] -pub(crate) struct MockClock { - time: MockTime, - clock: Clock, -} - -/// A handle to the `MockClock`. -#[derive(Debug)] -pub(crate) struct Handle { - timer: Driver<MockPark>, - time: MockTime, - clock: Clock, -} - -type Inner = Arc<Mutex<State>>; - -#[derive(Debug, Clone)] -struct MockTime { - inner: Inner, - _pd: PhantomData<Rc<()>>, -} - -#[derive(Debug)] -struct MockNow { - inner: Inner, -} - -#[derive(Debug)] -struct MockPark { - inner: Inner, - _pd: PhantomData<Rc<()>>, -} - -#[derive(Debug)] -struct MockUnpark { - inner: Inner, -} - -#[derive(Debug)] -struct State { - clock: Clock, - unparked: bool, - park_for: Option<Duration>, -} - -impl MockClock { - /// Create a new `MockClock` with the current time. - pub(crate) fn new() -> Self { - let clock = Clock::new_frozen(); - let time = MockTime::new(clock.clone()); - - MockClock { time, clock } - } - - /// Enter the `MockClock` context. - pub( |