summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-07 07:53:40 -0800
committerGitHub <noreply@github.com>2020-01-07 07:53:40 -0800
commit45da5f3510a61599c89dc458ecc859f13a81e255 (patch)
treec85b31879c37f06d3c58cd7d6db99c7e2ce2cf89 /tokio
parent855d39f849cc16d3c68df5abf0bbb28e3351cdf0 (diff)
rt: cleanup runtime::context (#2063)
Tweak context to remove more fns and usage of `Option`. Remove `ThreadContext` struct as it is reduced to just `Handle`. Avoid passing around individual driver handles and instead limit to the `runtime::Handle` struct.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/driver/mod.rs9
-rw-r--r--tokio/src/macros/cfg.rs1
-rw-r--r--tokio/src/runtime/blocking/mod.rs43
-rw-r--r--tokio/src/runtime/blocking/pool.rs81
-rw-r--r--tokio/src/runtime/builder.rs32
-rw-r--r--tokio/src/runtime/context.rs166
-rw-r--r--tokio/src/runtime/handle.rs23
-rw-r--r--tokio/src/runtime/io.rs7
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs9
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs55
-rw-r--r--tokio/src/runtime/time.rs4
-rw-r--r--tokio/src/task/spawn.rs2
-rw-r--r--tokio/src/time/clock.rs8
-rw-r--r--tokio/src/time/driver/handle.rs2
-rw-r--r--tokio/src/time/mod.rs2
-rw-r--r--tokio/src/time/tests/mock_clock.rs212
16 files changed, 120 insertions, 536 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index dfb741be..a36a40fa 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -5,7 +5,6 @@ pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
-#[cfg(all(feature = "io-driver", not(loom)))]
use crate::runtime::context;
use crate::util::slab::{Address, Slab};
@@ -198,14 +197,8 @@ impl Handle {
/// # Panics
///
/// This function panics if there is no current reactor set.
- #[cfg(all(feature = "io-driver", not(loom)))]
pub(super) fn current() -> Self {
- context::ThreadContext::io_handle().expect("no current reactor")
- }
-
- #[cfg(any(not(feature = "io-driver"), loom))]
- pub(super) fn current() -> Self {
- panic!("no current reactor")
+ context::io_handle().expect("no current reactor")
}
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index cc93cc8a..1f168255 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -4,7 +4,6 @@ macro_rules! cfg_resource_drivers {
($($item:item)*) => {
$(
#[cfg(any(feature = "io-driver", feature = "time"))]
- #[cfg(not(loom))]
$item
)*
}
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index 70832d46..be56e8f8 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -11,43 +11,23 @@ cfg_blocking_impl! {
mod shutdown;
mod task;
- use crate::runtime::{self, Builder, io, time};
+ use crate::runtime::Builder;
- pub(crate) fn create_blocking_pool(
- builder: &Builder,
- spawner: &runtime::Spawner,
- io: &io::Handle,
- time: &time::Handle,
- clock: &time::Clock,
- thread_cap: usize,
- ) -> BlockingPool {
- BlockingPool::new(
- builder,
- spawner,
- io,
- time,
- clock,
- thread_cap)
+ pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
}
}
cfg_not_blocking_impl! {
- use crate::runtime::{self, io, time, Builder};
+ use crate::runtime::Builder;
#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}
pub(crate) use BlockingPool as Spawner;
- pub(crate) fn create_blocking_pool(
- _builder: &Builder,
- _spawner: &runtime::Spawner,
- _io: &io::Handle,
- _time: &time::Handle,
- _clock: &time::Clock,
- _thread_cap: usize,
- ) -> BlockingPool {
+ pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
BlockingPool {}
}
@@ -55,18 +35,5 @@ cfg_not_blocking_impl! {
pub(crate) fn spawner(&self) -> &BlockingPool {
self
}
-
- #[cfg(any(
- feature = "blocking",
- feature = "dns",
- feature = "fs",
- feature = "io-std",
- ))]
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- f()
- }
}
}
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 320bd7f2..2a618ff5 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -5,7 +5,7 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
-use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
+use crate::runtime::{Builder, Callback, Handle};
use crate::task::{self, JoinHandle};
use std::collections::VecDeque;
@@ -41,18 +41,6 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,
- /// Spawns async tasks
- spawner: runtime::Spawner,
-
- /// Runtime I/O driver handle
- io_handle: io::Handle,
-
- /// Runtime time driver handle
- time_handle: time::Handle,
-
- /// Source of `Instant::now()`
- clock: time::Clock,
-
thread_cap: usize,
}
@@ -74,27 +62,17 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
- use crate::runtime::context::ThreadContext;
-
- let schedule =
- ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");
+ let rt = Handle::current();
let (task, handle) = task::joinable(BlockingTask::new(func));
- schedule.schedule(task);
+ rt.blocking_spawner.spawn(task, &rt);
handle
}
// ===== impl BlockingPool =====
impl BlockingPool {
- pub(crate) fn new(
- builder: &Builder,
- spawner: &runtime::Spawner,
- io: &io::Handle,
- time: &time::Handle,
- clock: &time::Clock,
- thread_cap: usize,
- ) -> BlockingPool {
+ pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
BlockingPool {
@@ -113,10 +91,6 @@ impl BlockingPool {
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
- spawner: spawner.clone(),
- io_handle: io.clone(),
- time_handle: time.clone(),
- clock: clock.clone(),
thread_cap,
}),
},
@@ -152,21 +126,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
- /// Set the blocking pool for the duration of the closure
- ///
- /// If a blocking pool is already set, it will be restored when the closure
- /// returns or if it panics.
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- let ctx = crate::runtime::context::ThreadContext::clone_current();
- let _e = ctx.with_blocking_spawner(self.clone()).enter();
-
- f()
- }
-
- fn schedule(&self, task: Task) {
+ fn spawn(&self, task: Task, rt: &Handle) {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
@@ -205,41 +165,32 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- self.spawn_thread(shutdown_tx);
+ self.spawn_thread(shutdown_tx, rt);
}
}
- fn spawn_thread(&self, shutdown_tx: shutdown::Sender) {
+ fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
- let thread_context = ThreadContext::new(
- self.inner.spawner.clone(),
- self.inner.io_handle.clone(),
- self.inner.time_handle.clone(),
- Some(self.inner.clock.clone()),
- Some(self.clone()),
- );
- let spawner = self.clone();
+
+ let rt = rt.clone();
+
builder
.spawn(move || {
- let _e = thread_context.enter();
- run_thread(spawner);
- drop(shutdown_tx);
+ // Only the reference should be moved into the closure
+ let rt = &rt;
+ rt.enter(move || {
+ rt.blocking_spawner.inner.run();
+ drop(shutdown_tx);
+ })
})
.unwrap();
}
}
-fn run_thread(spawner: Spawner) {
- spawner.enter(|| {
- let inner = &*spawner.inner;
- inner.run()
- });
-}
-
impl Inner {
fn run(&self) {
if let Some(f) = &self.after_start {
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index fd047a84..a5d80f51 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -325,14 +325,7 @@ impl Builder {
let spawner = Spawner::Shell;
- let blocking_pool = blocking::create_blocking_pool(
- self,
- &spawner,
- &io_handle,
- &time_handle,
- &clock,
- self.max_threads,
- );
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
@@ -425,7 +418,7 @@ cfg_rt_core! {
let spawner = Spawner::Basic(scheduler.spawner());
// Blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads);
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
@@ -465,21 +458,24 @@ cfg_rt_threaded! {
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads);
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
+ // Create the runtime handle
+ let handle = Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ };
+
// Spawn the thread pool workers
- workers.spawn(&blocking_spawner);
+ workers.spawn(&handle);
Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
- handle: Handle {
- spawner,
- io_handle,
- time_handle,
- clock,
- blocking_spawner,
- },
+ handle,
blocking_pool,
})
}
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
index 07c311ed..cfc51def 100644
--- a/tokio/src/runtime/context.rs
+++ b/tokio/src/runtime/context.rs
@@ -1,99 +1,26 @@
//! Thread local runtime context
-use crate::runtime::Spawner;
+use crate::runtime::Handle;
+
use std::cell::RefCell;
thread_local! {
- static CONTEXT: RefCell<Option<ThreadContext>> = RefCell::new(None)
-}
-
-/// ThreadContext makes Runtime context accessible to each Runtime thread.
-#[derive(Debug, Clone)]
-pub(crate) struct ThreadContext {
- /// Handles to the executor.
- spawner: Spawner,
-
- /// Handles to the I/O drivers
- io_handle: crate::runtime::io::Handle,
-
- /// Handles to the time drivers
- time_handle: crate::runtime::time::Handle,
-
- /// Source of `Instant::now()`
- clock: Option<crate::runtime::time::Clock>,
-
- /// Blocking pool spawner
- blocking_spawner: Option<crate::runtime::blocking::Spawner>,
+ static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
}
-impl Default for ThreadContext {
- fn default() -> Self {
- ThreadContext {
- spawner: Spawner::Shell,
- #[cfg(all(feature = "io-driver", not(loom)))]
- io_handle: None,
- #[cfg(any(not(feature = "io-driver"), loom))]
- io_handle: (),
- #[cfg(all(feature = "time", not(loom)))]
- time_handle: None,
- #[cfg(any(not(feature = "time"), loom))]
- time_handle: (),
- clock: None,
- blocking_spawner: None,
- }
- }
+pub(crate) fn current() -> Option<Handle> {
+ CONTEXT.with(|ctx| ctx.borrow().clone())
}
-impl ThreadContext {
- /// Construct a new [`ThreadContext`]
- ///
- /// [`ThreadContext`]: struct.ThreadContext.html
- pub(crate) fn new(
- spawner: Spawner,
- io_handle: crate::runtime::io::Handle,
- time_handle: crate::runtime::time::Handle,
- clock: Option<crate::runtime::time::Clock>,
- blocking_spawner: Option<crate::runtime::blocking::Spawner>,
- ) -> Self {
- ThreadContext {
- spawner,
- #[cfg(all(feature = "io-driver", not(loom)))]
- io_handle,
- #[cfg(any(not(feature = "io-driver"), loom))]
- io_handle,
- #[cfg(all(feature = "time", not(loom)))]
- time_handle,
- #[cfg(any(not(feature = "time"), loom))]
- time_handle,
- clock,
- blocking_spawner,
- }
- }
-
- /// Clone the current [`ThreadContext`] if one is set, otherwise construct a new [`ThreadContext`].
- ///
- /// [`ThreadContext`]: struct.ThreadContext.html
- #[allow(dead_code)]
- pub(crate) fn clone_current() -> Self {
- CONTEXT.with(|ctx| ctx.borrow().clone().unwrap_or_else(Default::default))
- }
-
- /// Set this [`ThreadContext`] as the current active [`ThreadContext`].
- ///
- /// [`ThreadContext`]: struct.ThreadContext.html
- pub(crate) fn enter(self) -> ThreadContextDropGuard {
- CONTEXT.with(|ctx| {
- let previous = ctx.borrow_mut().replace(self);
- ThreadContextDropGuard { previous }
- })
- }
-
+cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(),
})
}
+}
+cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
@@ -101,61 +28,46 @@ impl ThreadContext {
})
}
- pub(crate) fn spawn_handle() -> Option<Spawner> {
+ cfg_test_util! {
+ pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => Some(ctx.clock.clone()),
+ None => None,
+ })
+ }
+ }
+}
+
+cfg_rt_core! {
+ pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.spawner.clone()),
None => None,
})
}
-
- pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
- CONTEXT.with(
- |ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
- Some(Some(clock)) => Some(clock),
- _ => None,
- },
- )
- }
-
- pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> {
- CONTEXT.with(|ctx| {
- match ctx
- .borrow()
- .as_ref()
- .map(|ctx| ctx.blocking_spawner.clone())
- {
- Some(Some(blocking_spawner)) => Some(blocking_spawner),
- _ => None,
- }
- })
- }
}
-cfg_blocking_impl! {
- impl ThreadContext {
- pub(crate) fn with_blocking_spawner(
- mut self,
- blocking_spawner: crate::runtime::blocking::Spawner,
- ) -> Self {
- self.blocking_spawner.replace(blocking_spawner);
- self
+/// Set this [`ThreadContext`] as the current active [`ThreadContext`].
+///
+/// [`ThreadContext`]: struct.ThreadContext.html
+pub(crate) fn enter<F, R>(new: Handle, f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ struct DropGuard(Option<Handle>);
+
+ impl Drop for DropGuard {
+ fn drop(&mut self) {
+ CONTEXT.with(|ctx| {
+ *ctx.borrow_mut() = self.0.take();
+ });
}
}
-}
-/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop.
-///
-/// [`ThreadContextDropGuard`]: struct.ThreadContextDropGuard.html
-#[derive(Debug)]
-pub(crate) struct ThreadContextDropGuard {
- previous: Option<ThreadContext>,
-}
+ let _guard = CONTEXT.with(|ctx| {
+ let old = ctx.borrow_mut().replace(new);
+ DropGuard(old)
+ });
-impl Drop for ThreadContextDropGuard {
- fn drop(&mut self) {
- CONTEXT.with(|ctx| match self.previous.clone() {
- Some(prev) => ctx.borrow_mut().replace(prev),
- None => ctx.borrow_mut().take(),
- });
- }
+ f()
}
diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs
index 72365025..ae7b416e 100644
--- a/tokio/src/runtime/handle.rs
+++ b/tokio/src/runtime/handle.rs
@@ -30,16 +30,7 @@ impl Handle {
where
F: FnOnce() -> R,
{
- let _e = context::ThreadContext::new(
- self.spawner.clone(),
- self.io_handle.clone(),
- self.time_handle.clone(),
- Some(self.clock.clone()),
- Some(self.blocking_spawner.clone()),
- )
- .enter();
-
- f()
+ context::enter(self.clone(), f)
}
/// Returns a Handle view over the currently running Runtime
@@ -68,17 +59,7 @@ impl Handle {
/// # }
/// ```
pub fn current() -> Self {
- use crate::runtime::context::ThreadContext;
-
- Handle {
- spawner: ThreadContext::spawn_handle()
- .expect("not currently running on the Tokio runtime."),
- io_handle: ThreadContext::io_handle(),
- time_handle: ThreadContext::time_handle(),
- clock: ThreadContext::clock().expect("not currently running on the Tokio runtime."),
- blocking_spawner: ThreadContext::blocking_spawner()
- .expect("not currently running on the Tokio runtime."),
- }
+ context::current().expect("not currently running on the Tokio runtime.")
}
}
diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs
index dca7310f..6a0953af 100644
--- a/tokio/src/runtime/io.rs
+++ b/tokio/src/runtime/io.rs
@@ -8,7 +8,7 @@ pub(crate) use std::io::Result;
pub(crate) use variant::*;
-#[cfg(all(feature = "io-driver", not(loom)))]
+#[cfg(feature = "io-driver")]
mod variant {
use crate::io::driver;
use crate::park::{Either, ParkThread};
@@ -28,6 +28,9 @@ mod variant {
pub(crate) type Handle = Option<driver::Handle>;
pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> {
+ #[cfg(loom)]
+ assert!(!enable);
+
if enable {
let driver = driver::Driver::new()?;
let handle = driver.handle();
@@ -40,7 +43,7 @@ mod variant {
}
}
-#[cfg(any(not(feature = "io-driver"), loom))]
+#[cfg(not(feature = "io-driver"))]
mod variant {
use crate::park::ParkThread;
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index c22ce8b9..fd38c013 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -38,7 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
-use crate::runtime::{self, blocking, Parker};
+use crate::runtime::{self, Parker};
use crate::task::JoinHandle;
use std::fmt;
@@ -107,11 +107,10 @@ impl Drop for ThreadPool {
}
impl Workers {
- pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) {
- blocking_pool.enter(|| {
+ pub(crate) fn spawn(self, rt: &runtime::Handle) {
+ rt.enter(|| {
for worker in self.workers {
- let b = blocking_pool.clone();
- runtime::spawn_blocking(move || worker.run(b));
+ runtime::spawn_blocking(move || worker.run());
}
});
}
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index fbf7a1fc..44df8b74 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -1,9 +1,9 @@
use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc;
use crate::park::Park;
+use crate::runtime;
use crate::runtime::park::Parker;
use crate::runtime::thread_pool::{current, slice, Owned, Shared};
-use crate::runtime::{self, blocking};
use crate::task::Task;
use std::cell::Cell;
@@ -119,7 +119,7 @@ impl Worker {
}
}
- pub(super) fn run(self, blocking_pool: blocking::Spawner) {
+ pub(super) fn run(self) {
// First, acquire a lock on the worker.
let guard = match self.acquire_lock() {
Some(guard) => guard,
@@ -131,37 +131,35 @@ impl Worker {
// Enter a runtime context
let _enter = crate::runtime::enter();
- blocking_pool.enter(|| {
- ON_BLOCK.with(|ob| {
- // Ensure that the ON_BLOCK is removed from the thread-local context
- // when leaving the scope. This handles cases that involve panicking.
- struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);
+ ON_BLOCK.with(|ob| {
+ // Ensure that the ON_BLOCK is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);
- impl<'a> Drop for Reset<'a> {
- fn drop(&mut self) {
- self.0.set(None);
- }
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.0.set(None);
}
+ }
- let _reset = Reset(ob);
+ let _reset = Reset(ob);
- let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool);
+ let allow_blocking: &dyn Fn() = &|| self.block_in_place();
- ob.set(Some(unsafe {
- // NOTE: We cannot use a safe cast to raw pointer here, since we are
- // _also_ erasing the lifetime of these pointers. That is safe here,
- // because we know that ob will set back to None before allow_blocking
- // is dropped.
- #[allow(clippy::useless_transmute)]
- std::mem::transmute::<_, *const dyn Fn()>(allow_blocking)
- }));
+ ob.set(Some(unsafe {
+ // NOTE: We cannot use a safe cast to raw pointer here, since we are
+ // _also_ erasing the lifetime of these pointers. That is safe here,
+ // because we know that ob will set back to None before allow_blocking
+ // is dropped.
+ #[allow(clippy::useless_transmute)]
+ std::mem::transmute::<_, *const dyn Fn()>(allow_blocking)
+ }));
- let _ = guard.run();
+ let _ = guard.run();
- // Ensure that we reset ob before allow_blocking is dropped.
- drop(_reset);
- });
- })
+ // Ensure that we reset ob before allow_blocking is dropped.
+ drop(_reset);
+ });
});
if self.gone.get() {
@@ -206,7 +204,7 @@ impl Worker {
}
/// Enter an in-place blocking section
- fn block_in_place(&self, blocking_pool: &blocking::Spawner) {
+ fn block_in_place(&self) {
// If our Worker has already been given away, then blocking is fine!
if self.gone.get() {
return;
@@ -259,8 +257,7 @@ impl Worker {
};
// Give away the worker
- let b = blocking_pool.clone();
- runtime::spawn_blocking(move || worker.run(b));
+ runtime::spawn_blocking(move || worker.run());
}
}
diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs
index 6259c87a..c623d964 100644
--- a/tokio/src/runtime/time.rs
+++ b/tokio/src/runtime/time.rs
@@ -5,7 +5,7 @@
pub(crate) use variant::*;
-#[cfg(all(feature = "time", not(loom)))]
+#[cfg(feature = "time")]
mod variant {
use crate::park::Either;
use crate::runtime::io;
@@ -36,7 +36,7 @@ mod variant {
}
}
-#[cfg(any(not(feature = "time"), loom))]
+#[cfg(not(feature = "time"))]
mod variant {
use crate::runtime::io;
diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs
index c4589308..4e19f559 100644
--- a/tokio/src/task/spawn.rs
+++ b/tokio/src/task/spawn.rs
@@ -123,7 +123,7 @@ doc_rt_core! {
T: Future + Send + 'static,
T::Output: Send + 'static,
{
- let spawn_handle = runtime::context::ThreadContext::spawn_handle()
+ let spawn_handle = runtime::context::spawn_handle()
.expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
spawn_handle.spawn(task)
}
diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs
index ae75740c..bd3045a9 100644
--- a/tokio/src/time/clock.rs
+++ b/tokio/src/time/clock.rs
@@ -64,7 +64,7 @@ cfg_test_util! {
/// Panics if time is already frozen or if called from outside of the Tokio
/// runtime.
pub fn pause() {
- let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime");
+ let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut frozen = clock.inner.frozen.lock().unwrap();
if frozen.is_some() {
panic!("time is already frozen");
@@ -82,7 +82,7 @@ cfg_test_util! {
/// Panics if time is not frozen or if called from outside of the Tokio
/// runtime.
pub fn resume() {
- let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime");
+ let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut frozen = clock.inner.frozen.lock().unwrap();
if frozen.is_none() {
@@ -102,14 +102,14 @@ cfg_test_util! {
/// Panics if time is not frozen or if called from outside of the Tokio
/// runtime.
pub async fn advance(duration: Duration) {
- let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime");
+ let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
crate::task::yield_now().await;
}
/// Return the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
- if let Some(clock) = context::ThreadContext::clock() {
+ if let Some(clock) = context::clock() {
if let Some(frozen) = *clock.inner.frozen.lock().unwrap() {
Instant::from_std(clock.inner.start + frozen)
} else {
diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs
index 6a8fe2bb..f24eaeb6 100644
--- a/tokio/src/time/driver/handle.rs
+++ b/tokio/src/time/driver/handle.rs
@@ -21,7 +21,7 @@ impl Handle {
///
/// This function panics if there is no current timer set.
pub(crate) fn current() -> Self {
- context::ThreadContext::time_handle().expect("no current timer")
+ context::time_handle().expect("no current timer")
}
/// Try to return a strong ref to the inner
diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs
index 24aae11e..7070d6b2 100644
--- a/tokio/src/time/mod.rs
+++ b/tokio/src/time/mod.rs
@@ -1,5 +1,3 @@
-#![cfg(not(loom))]
-
//! Utilities for tracking time.
//!
//! This module provides a number of types for executing code after a set period
diff --git a/tokio/src/time/tests/mock_clock.rs b/tokio/src/time/tests/mock_clock.rs
deleted file mode 100644
index ac509e3f..00000000
--- a/tokio/src/time/tests/mock_clock.rs
+++ /dev/null
@@ -1,212 +0,0 @@
-use crate::park::{Park, Unpark};
-use crate::runtime::context;
-use crate::time::driver::Driver;
-use crate::time::{Clock, Duration, Instant};
-
-use std::marker::PhantomData;
-use std::rc::Rc;
-use std::sync::{Arc, Mutex};
-
-/// Run the provided closure with a `MockClock` that starts at the current time.
-pub(crate) fn mock<F, R>(f: F) -> R
-where
- F: FnOnce(&mut Handle) -> R,
-{
- let mut mock = MockClock::new();
- mock.enter(f)
-}
-
-/// Mock clock for use with `tokio-timer` futures.
-///
-/// A mock timer that is able to advance and wake after a
-/// certain duration.
-#[derive(Debug)]
-pub(crate) struct MockClock {
- time: MockTime,
- clock: Clock,
-}
-
-/// A handle to the `MockClock`.
-#[derive(Debug)]
-pub(crate) struct Handle {
- timer: Driver<MockPark>,
- time: MockTime,
- clock: Clock,
-}
-
-type Inner = Arc<Mutex<State>>;
-
-#[derive(Debug, Clone)]
-struct MockTime {
- inner: Inner,
- _pd: PhantomData<Rc<()>>,
-}
-
-#[derive(Debug)]
-struct MockNow {
- inner: Inner,
-}
-
-#[derive(Debug)]
-struct MockPark {
- inner: Inner,
- _pd: PhantomData<Rc<()>>,
-}
-
-#[derive(Debug)]
-struct MockUnpark {
- inner: Inner,
-}
-
-#[derive(Debug)]
-struct State {
- clock: Clock,
- unparked: bool,
- park_for: Option<Duration>,
-}
-
-impl MockClock {
- /// Create a new `MockClock` with the current time.
- pub(crate) fn new() -> Self {
- let clock = Clock::new_frozen();
- let time = MockTime::new(clock.clone());
-
- MockClock { time, clock }
- }
-
- /// Enter the `MockClock` context.
- pub(