summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/blocking
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-07 07:53:40 -0800
committerGitHub <noreply@github.com>2020-01-07 07:53:40 -0800
commit45da5f3510a61599c89dc458ecc859f13a81e255 (patch)
treec85b31879c37f06d3c58cd7d6db99c7e2ce2cf89 /tokio/src/runtime/blocking
parent855d39f849cc16d3c68df5abf0bbb28e3351cdf0 (diff)
rt: cleanup runtime::context (#2063)
Tweak context to remove more fns and usage of `Option`. Remove `ThreadContext` struct as it is reduced to just `Handle`. Avoid passing around individual driver handles and instead limit to the `runtime::Handle` struct.
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r--tokio/src/runtime/blocking/mod.rs43
-rw-r--r--tokio/src/runtime/blocking/pool.rs81
2 files changed, 21 insertions, 103 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index 70832d46..be56e8f8 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -11,43 +11,23 @@ cfg_blocking_impl! {
mod shutdown;
mod task;
- use crate::runtime::{self, Builder, io, time};
+ use crate::runtime::Builder;
- pub(crate) fn create_blocking_pool(
- builder: &Builder,
- spawner: &runtime::Spawner,
- io: &io::Handle,
- time: &time::Handle,
- clock: &time::Clock,
- thread_cap: usize,
- ) -> BlockingPool {
- BlockingPool::new(
- builder,
- spawner,
- io,
- time,
- clock,
- thread_cap)
+ pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
}
}
cfg_not_blocking_impl! {
- use crate::runtime::{self, io, time, Builder};
+ use crate::runtime::Builder;
#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}
pub(crate) use BlockingPool as Spawner;
- pub(crate) fn create_blocking_pool(
- _builder: &Builder,
- _spawner: &runtime::Spawner,
- _io: &io::Handle,
- _time: &time::Handle,
- _clock: &time::Clock,
- _thread_cap: usize,
- ) -> BlockingPool {
+ pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
BlockingPool {}
}
@@ -55,18 +35,5 @@ cfg_not_blocking_impl! {
pub(crate) fn spawner(&self) -> &BlockingPool {
self
}
-
- #[cfg(any(
- feature = "blocking",
- feature = "dns",
- feature = "fs",
- feature = "io-std",
- ))]
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- f()
- }
}
}
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 320bd7f2..2a618ff5 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -5,7 +5,7 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
-use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
+use crate::runtime::{Builder, Callback, Handle};
use crate::task::{self, JoinHandle};
use std::collections::VecDeque;
@@ -41,18 +41,6 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,
- /// Spawns async tasks
- spawner: runtime::Spawner,
-
- /// Runtime I/O driver handle
- io_handle: io::Handle,
-
- /// Runtime time driver handle
- time_handle: time::Handle,
-
- /// Source of `Instant::now()`
- clock: time::Clock,
-
thread_cap: usize,
}
@@ -74,27 +62,17 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
- use crate::runtime::context::ThreadContext;
-
- let schedule =
- ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");
+ let rt = Handle::current();
let (task, handle) = task::joinable(BlockingTask::new(func));
- schedule.schedule(task);
+ rt.blocking_spawner.spawn(task, &rt);
handle
}
// ===== impl BlockingPool =====
impl BlockingPool {
- pub(crate) fn new(
- builder: &Builder,
- spawner: &runtime::Spawner,
- io: &io::Handle,
- time: &time::Handle,
- clock: &time::Clock,
- thread_cap: usize,
- ) -> BlockingPool {
+ pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
BlockingPool {
@@ -113,10 +91,6 @@ impl BlockingPool {
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
- spawner: spawner.clone(),
- io_handle: io.clone(),
- time_handle: time.clone(),
- clock: clock.clone(),
thread_cap,
}),
},
@@ -152,21 +126,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
- /// Set the blocking pool for the duration of the closure
- ///
- /// If a blocking pool is already set, it will be restored when the closure
- /// returns or if it panics.
- pub(crate) fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- let ctx = crate::runtime::context::ThreadContext::clone_current();
- let _e = ctx.with_blocking_spawner(self.clone()).enter();
-
- f()
- }
-
- fn schedule(&self, task: Task) {
+ fn spawn(&self, task: Task, rt: &Handle) {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
@@ -205,41 +165,32 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- self.spawn_thread(shutdown_tx);
+ self.spawn_thread(shutdown_tx, rt);
}
}
- fn spawn_thread(&self, shutdown_tx: shutdown::Sender) {
+ fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
- let thread_context = ThreadContext::new(
- self.inner.spawner.clone(),
- self.inner.io_handle.clone(),
- self.inner.time_handle.clone(),
- Some(self.inner.clock.clone()),
- Some(self.clone()),
- );
- let spawner = self.clone();
+
+ let rt = rt.clone();
+
builder
.spawn(move || {
- let _e = thread_context.enter();
- run_thread(spawner);
- drop(shutdown_tx);
+ // Only the reference should be moved into the closure
+ let rt = &rt;
+ rt.enter(move || {
+ rt.blocking_spawner.inner.run();
+ drop(shutdown_tx);
+ })
})
.unwrap();
}
}
-fn run_thread(spawner: Spawner) {
- spawner.enter(|| {
- let inner = &*spawner.inner;
- inner.run()
- });
-}
-
impl Inner {
fn run(&self) {
if let Some(f) = &self.after_start {