diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-07 07:53:40 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-07 07:53:40 -0800 |
commit | 45da5f3510a61599c89dc458ecc859f13a81e255 (patch) | |
tree | c85b31879c37f06d3c58cd7d6db99c7e2ce2cf89 /tokio/src/runtime/blocking | |
parent | 855d39f849cc16d3c68df5abf0bbb28e3351cdf0 (diff) |
rt: cleanup runtime::context (#2063)
Tweak context to remove more fns and usage of `Option`. Remove
`ThreadContext` struct as it is reduced to just `Handle`. Avoid passing
around individual driver handles and instead limit to the
`runtime::Handle` struct.
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 43 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 81 |
2 files changed, 21 insertions, 103 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 70832d46..be56e8f8 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -11,43 +11,23 @@ cfg_blocking_impl! { mod shutdown; mod task; - use crate::runtime::{self, Builder, io, time}; + use crate::runtime::Builder; - pub(crate) fn create_blocking_pool( - builder: &Builder, - spawner: &runtime::Spawner, - io: &io::Handle, - time: &time::Handle, - clock: &time::Clock, - thread_cap: usize, - ) -> BlockingPool { - BlockingPool::new( - builder, - spawner, - io, - time, - clock, - thread_cap) + pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) } } cfg_not_blocking_impl! { - use crate::runtime::{self, io, time, Builder}; + use crate::runtime::Builder; #[derive(Debug, Clone)] pub(crate) struct BlockingPool {} pub(crate) use BlockingPool as Spawner; - pub(crate) fn create_blocking_pool( - _builder: &Builder, - _spawner: &runtime::Spawner, - _io: &io::Handle, - _time: &time::Handle, - _clock: &time::Clock, - _thread_cap: usize, - ) -> BlockingPool { + pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { BlockingPool {} } @@ -55,18 +35,5 @@ cfg_not_blocking_impl! { pub(crate) fn spawner(&self) -> &BlockingPool { self } - - #[cfg(any( - feature = "blocking", - feature = "dns", - feature = "fs", - feature = "io-std", - ))] - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - f() - } } } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 320bd7f2..2a618ff5 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -5,7 +5,7 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; -use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; +use crate::runtime::{Builder, Callback, Handle}; use crate::task::{self, JoinHandle}; use std::collections::VecDeque; @@ -41,18 +41,6 @@ struct Inner { /// Call before a thread stops before_stop: Option<Callback>, - /// Spawns async tasks - spawner: runtime::Spawner, - - /// Runtime I/O driver handle - io_handle: io::Handle, - - /// Runtime time driver handle - time_handle: time::Handle, - - /// Source of `Instant::now()` - clock: time::Clock, - thread_cap: usize, } @@ -74,27 +62,17 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, { - use crate::runtime::context::ThreadContext; - - let schedule = - ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime."); + let rt = Handle::current(); let (task, handle) = task::joinable(BlockingTask::new(func)); - schedule.schedule(task); + rt.blocking_spawner.spawn(task, &rt); handle } // ===== impl BlockingPool ===== impl BlockingPool { - pub(crate) fn new( - builder: &Builder, - spawner: &runtime::Spawner, - io: &io::Handle, - time: &time::Handle, - clock: &time::Clock, - thread_cap: usize, - ) -> BlockingPool { + pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); BlockingPool { @@ -113,10 +91,6 @@ impl BlockingPool { stack_size: builder.thread_stack_size, after_start: builder.after_start.clone(), before_stop: builder.before_stop.clone(), - spawner: spawner.clone(), - io_handle: io.clone(), - time_handle: time.clone(), - clock: clock.clone(), thread_cap, }), }, @@ -152,21 +126,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - /// Set the blocking pool for the duration of the closure - /// - /// If a blocking pool is already set, it will be restored when the closure - /// returns or if it panics. - pub(crate) fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - let ctx = crate::runtime::context::ThreadContext::clone_current(); - let _e = ctx.with_blocking_spawner(self.clone()).enter(); - - f() - } - - fn schedule(&self, task: Task) { + fn spawn(&self, task: Task, rt: &Handle) { let shutdown_tx = { let mut shared = self.inner.shared.lock().unwrap(); @@ -205,41 +165,32 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - self.spawn_thread(shutdown_tx); + self.spawn_thread(shutdown_tx, rt); } } - fn spawn_thread(&self, shutdown_tx: shutdown::Sender) { + fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); if let Some(stack_size) = self.inner.stack_size { builder = builder.stack_size(stack_size); } - let thread_context = ThreadContext::new( - self.inner.spawner.clone(), - self.inner.io_handle.clone(), - self.inner.time_handle.clone(), - Some(self.inner.clock.clone()), - Some(self.clone()), - ); - let spawner = self.clone(); + + let rt = rt.clone(); + builder .spawn(move || { - let _e = thread_context.enter(); - run_thread(spawner); - drop(shutdown_tx); + // Only the reference should be moved into the closure + let rt = &rt; + rt.enter(move || { + rt.blocking_spawner.inner.run(); + drop(shutdown_tx); + }) }) .unwrap(); } } -fn run_thread(spawner: Spawner) { - spawner.enter(|| { - let inner = &*spawner.inner; - inner.run() - }); -} - impl Inner { fn run(&self) { if let Some(f) = &self.after_start { |