diff options
author | Benjamin Fry <benjaminfry@me.com> | 2020-01-06 11:32:21 -0800 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2020-01-06 11:32:21 -0800 |
commit | 0193df3a593cb69d23414109118784de2948024c (patch) | |
tree | 2b0ced32fb708b5a64b0411ad745bf0b98ce9ed0 /tokio/src/runtime/blocking | |
parent | 5930acef736d45733dc182e420a2417a164c71ba (diff) |
rt: add a Handle::current() (#2040)
Adds `Handle::current()` for accessing a handle to the runtime
associated with the current thread. This handle can then be
passed to other threads in order to spawn or perform other
runtime related tasks.
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 49 |
2 files changed, 17 insertions, 38 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index fa4c77a1..70832d46 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -56,6 +56,12 @@ cfg_not_blocking_impl! { self } + #[cfg(any( + feature = "blocking", + feature = "dns", + feature = "fs", + feature = "io-std", + ))] pub(crate) fn enter<F, R>(&self, f: F) -> R where F: FnOnce() -> R, diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 2aa4c8bf..320bd7f2 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask; use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; use crate::task::{self, JoinHandle}; -use std::cell::Cell; use std::collections::VecDeque; use std::fmt; use std::time::Duration; @@ -68,11 +67,6 @@ struct Shared { type Task = task::Task<NoopSchedule>; -thread_local! { - /// Thread-local tracking the current executor - static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None) -} - const KEEP_ALIVE: Duration = Duration::from_secs(10); /// Run the provided function on an executor dedicated to blocking operations. @@ -80,16 +74,14 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, { - BLOCKING.with(|cell| { - let schedule = match cell.get() { - Some(ptr) => unsafe { &*ptr }, - None => panic!("not currently running on the Tokio runtime."), - }; + use crate::runtime::context::ThreadContext; + + let schedule = + ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime."); - let (task, handle) = task::joinable(BlockingTask::new(func)); - schedule.schedule(task); - handle - }) + let (task, handle) = task::joinable(BlockingTask::new(func)); + schedule.schedule(task); + handle } // ===== impl BlockingPool ===== @@ -168,30 +160,10 @@ impl Spawner { where F: FnOnce() -> R, { - // While scary, this is safe. The function takes a `&BlockingPool`, - // which guarantees that the reference lives for the duration of - // `with_pool`. - // - // Because we are always clearing the TLS value at the end of the - // function, we can cast the reference to 'static which thread-local - // cells require. - BLOCKING.with(|cell| { - let was = cell.replace(None); - - // Ensure that the pool is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>); - - impl Drop for Reset<'_> { - fn drop(&mut self) { - self.0.set(self.1); - } - } + let ctx = crate::runtime::context::ThreadContext::clone_current(); + let _e = ctx.with_blocking_spawner(self.clone()).enter(); - let _reset = Reset(cell, was); - cell.set(Some(self as *const Spawner)); - f() - }) + f() } fn schedule(&self, task: Task) { @@ -248,6 +220,7 @@ impl Spawner { self.inner.io_handle.clone(), self.inner.time_handle.clone(), Some(self.inner.clock.clone()), + Some(self.clone()), ); let spawner = self.clone(); builder |