summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/blocking
diff options
context:
space:
mode:
authorBenjamin Fry <benjaminfry@me.com>2020-01-06 11:32:21 -0800
committerCarl Lerche <me@carllerche.com>2020-01-06 11:32:21 -0800
commit0193df3a593cb69d23414109118784de2948024c (patch)
tree2b0ced32fb708b5a64b0411ad745bf0b98ce9ed0 /tokio/src/runtime/blocking
parent5930acef736d45733dc182e420a2417a164c71ba (diff)
rt: add a Handle::current() (#2040)
Adds `Handle::current()` for accessing a handle to the runtime associated with the current thread. This handle can then be passed to other threads in order to spawn or perform other runtime related tasks.
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r--tokio/src/runtime/blocking/mod.rs6
-rw-r--r--tokio/src/runtime/blocking/pool.rs49
2 files changed, 17 insertions, 38 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index fa4c77a1..70832d46 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -56,6 +56,12 @@ cfg_not_blocking_impl! {
self
}
+ #[cfg(any(
+ feature = "blocking",
+ feature = "dns",
+ feature = "fs",
+ feature = "io-std",
+ ))]
pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 2aa4c8bf..320bd7f2 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
use crate::task::{self, JoinHandle};
-use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
@@ -68,11 +67,6 @@ struct Shared {
type Task = task::Task<NoopSchedule>;
-thread_local! {
- /// Thread-local tracking the current executor
- static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
-}
-
const KEEP_ALIVE: Duration = Duration::from_secs(10);
/// Run the provided function on an executor dedicated to blocking operations.
@@ -80,16 +74,14 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
- BLOCKING.with(|cell| {
- let schedule = match cell.get() {
- Some(ptr) => unsafe { &*ptr },
- None => panic!("not currently running on the Tokio runtime."),
- };
+ use crate::runtime::context::ThreadContext;
+
+ let schedule =
+ ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");
- let (task, handle) = task::joinable(BlockingTask::new(func));
- schedule.schedule(task);
- handle
- })
+ let (task, handle) = task::joinable(BlockingTask::new(func));
+ schedule.schedule(task);
+ handle
}
// ===== impl BlockingPool =====
@@ -168,30 +160,10 @@ impl Spawner {
where
F: FnOnce() -> R,
{
- // While scary, this is safe. The function takes a `&BlockingPool`,
- // which guarantees that the reference lives for the duration of
- // `with_pool`.
- //
- // Because we are always clearing the TLS value at the end of the
- // function, we can cast the reference to 'static which thread-local
- // cells require.
- BLOCKING.with(|cell| {
- let was = cell.replace(None);
-
- // Ensure that the pool is removed from the thread-local context
- // when leaving the scope. This handles cases that involve panicking.
- struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>);
-
- impl Drop for Reset<'_> {
- fn drop(&mut self) {
- self.0.set(self.1);
- }
- }
+ let ctx = crate::runtime::context::ThreadContext::clone_current();
+ let _e = ctx.with_blocking_spawner(self.clone()).enter();
- let _reset = Reset(cell, was);
- cell.set(Some(self as *const Spawner));
- f()
- })
+ f()
}
fn schedule(&self, task: Task) {
@@ -248,6 +220,7 @@ impl Spawner {
self.inner.io_handle.clone(),
self.inner.time_handle.clone(),
Some(self.inner.clock.clone()),
+ Some(self.clone()),
);
let spawner = self.clone();
builder