summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-06 21:29:10 -0800
committerGitHub <noreply@github.com>2019-11-06 21:29:10 -0800
commit4dbe6af0a1a1c8e579b92ec8ffc1d419244e0944 (patch)
tree4ee02032f615464af50aa6858440c5cecccec4cf /tokio/src/runtime/thread_pool
parent9bec094150e869caae5105d7080f0ae54757b2d9 (diff)
runtime: misc pool cleanup (#1743)
- Remove builders for internal types - Avoid duplicating the blocking pool when using the concurrent scheduler. - misc smaller cleanup
Diffstat (limited to 'tokio/src/runtime/thread_pool')
-rw-r--r--tokio/src/runtime/thread_pool/builder.rs208
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs190
-rw-r--r--tokio/src/runtime/thread_pool/pool.rs84
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs26
-rw-r--r--tokio/src/runtime/thread_pool/tests/pool.rs38
-rw-r--r--tokio/src/runtime/thread_pool/tests/worker.rs3
6 files changed, 227 insertions, 322 deletions
diff --git a/tokio/src/runtime/thread_pool/builder.rs b/tokio/src/runtime/thread_pool/builder.rs
deleted file mode 100644
index 04b5fa23..00000000
--- a/tokio/src/runtime/thread_pool/builder.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use crate::loom::sync::Arc;
-use crate::loom::sys::num_cpus;
-use crate::runtime::park::Park;
-use crate::runtime::thread_pool::{shutdown, Spawner, ThreadPool, Worker};
-
-use std::{fmt, usize};
-
-/// Builds a thread pool with custom configuration values.
-pub(crate) struct Builder {
- /// Number of worker threads to spawn
- pool_size: usize,
-
- /// Thread name
- name: String,
-
- /// Thread stack size
- stack_size: Option<usize>,
-
- /// Around worker callback
- around_worker: Option<Callback>,
-}
-
-// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
-// loom doesn't support that because it requires CoerceUnsized, which is unstable
-type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
-
-impl Builder {
- /// Returns a new thread pool builder initialized with default configuration
- /// values.
- pub(crate) fn new() -> Builder {
- Builder {
- pool_size: num_cpus(),
- name: "tokio-runtime-worker".to_string(),
- stack_size: None,
- around_worker: None,
- }
- }
-
- /// Set the number of threads running async tasks.
- ///
- /// This must be a number between 1 and 2,048 though it is advised to keep
- /// this value on the smaller side.
- ///
- /// The default value is the number of cores available to the system.
- pub(crate) fn num_threads(&mut self, value: usize) -> &mut Self {
- self.pool_size = value;
- self
- }
-
- /// Set name of threads spawned by the scheduler
- ///
- /// If this configuration is not set, then the thread will use the system
- /// default naming scheme.
- pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
- self.name = val.into();
- self
- }
-
- /// Set the stack size (in bytes) for worker threads.
- ///
- /// The actual stack size may be greater than this value if the platform
- /// specifies minimal stack size.
- ///
- /// The default stack size for spawned threads is 2 MiB, though this
- /// particular stack size is subject to change in the future.
- pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
- self.stack_size = Some(val);
- self
- }
-
- /// Execute function `f` on each worker thread.
- ///
- /// This function is provided a function that executes the worker and is
- /// expected to call it, otherwise the worker thread will shutdown without
- /// doing any work.
- pub(crate) fn around_worker<F>(&mut self, f: F) -> &mut Self
- where
- F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static,
- {
- self.around_worker = Some(Arc::new(Box::new(f)));
- self
- }
-
- /// Create the configured `ThreadPool` with a custom `park` instances.
- ///
- /// The provided closure `build_park` is called once per worker and returns
- /// a `Park` instance that is used by the worker to put itself to sleep.
- pub(crate) fn build<F, P>(&self, mut build_park: F) -> ThreadPool
- where
- F: FnMut(usize) -> P,
- P: Park + Send + 'static,
- {
- use crate::runtime::thread_pool::worker;
-
- let (shutdown_tx, shutdown_rx) = shutdown::channel();
-
- let around_worker = self.around_worker.as_ref().map(Arc::clone);
- let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| {
- // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never
- // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually
- // the case. Only `build_with_park` and each worker hold onto a copy of this Arc.
- // `build_with_park` drops it immediately, and the workers drop theirs when their `run`
- // method returns (and their copy of the Arc are dropped). In fact, we don't actually
- // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto
- // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient.
- let shutdown_tx = shutdown_tx.clone();
- let around_worker = around_worker.as_ref().map(Arc::clone);
- Box::new(move || {
- struct AbortOnPanic;
-
- impl Drop for AbortOnPanic {
- fn drop(&mut self) {
- if std::thread::panicking() {
- eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported.");
- std::process::abort();
- }
- }
- }
-
- let _abort_on_panic = AbortOnPanic;
- if let Some(cb) = around_worker.as_ref() {
- let idx = worker.id();
- let mut f = Some(move || worker.run());
- cb(idx, &mut || {
- (f.take()
- .expect("around_thread callback called closure twice"))(
- )
- })
- } else {
- worker.run()
- }
-
- // Dropping the handle must happen __after__ the callback
- drop(shutdown_tx);
- }) as Box<dyn FnOnce() + Send + 'static>
- })
- as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>);
-
- let mut blocking = crate::runtime::blocking::Builder::default();
- blocking.name(self.name.clone());
- if let Some(ss) = self.stack_size {
- blocking.stack_size(ss);
- }
- let blocking = Arc::new(blocking.build());
-
- let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
- self.pool_size,
- |i| Box::new(BoxedPark::new(build_park(i))),
- Arc::clone(&launch_worker),
- blocking.clone(),
- );
-
- // Spawn threads for each worker
- for worker in workers {
- crate::runtime::blocking::Pool::spawn(&blocking, launch_worker(worker))
- }
-
- let spawner = Spawner::new(pool);
- let blocking = crate::runtime::blocking::PoolWaiter::from(blocking);
- ThreadPool::from_parts(spawner, shutdown_rx, blocking)
- }
-}
-
-impl Default for Builder {
- fn default() -> Builder {
- Builder::new()
- }
-}
-
-impl fmt::Debug for Builder {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Builder")
- .field("pool_size", &self.pool_size)
- .field("name", &self.name)
- .field("stack_size", &self.stack_size)
- .finish()
- }
-}
-
-pub(crate) struct BoxedPark<P> {
- inner: P,
-}
-
-impl<P> BoxedPark<P> {
- pub(crate) fn new(inner: P) -> Self {
- BoxedPark { inner }
- }
-}
-
-impl<P> Park for BoxedPark<P>
-where
- P: Park,
-{
- type Unpark = Box<dyn crate::runtime::park::Unpark>;
- type Error = P::Error;
-
- fn unpark(&self) -> Self::Unpark {
- Box::new(self.inner.unpark())
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.inner.park()
- }
-
- fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
- self.inner.park_timeout(duration)
- }
-}
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 5dce2a10..314942d3 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -1,8 +1,5 @@
//! Threadpool
-mod builder;
-pub(crate) use self::builder::Builder;
-
mod current;
mod idle;
@@ -11,9 +8,6 @@ use self::idle::Idle;
mod owned;
use self::owned::Owned;
-mod pool;
-pub(crate) use self::pool::ThreadPool;
-
mod queue;
mod spawner;
@@ -44,3 +38,187 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
// time.
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
+
+use crate::loom::sync::Arc;
+use crate::runtime::blocking::{self, PoolWaiter};
+use crate::runtime::task::JoinHandle;
+use crate::runtime::Park;
+
+use std::fmt;
+use std::future::Future;
+
+/// Work-stealing based thread pool for executing futures.
+pub(crate) struct ThreadPool {
+ spawner: Spawner,
+
+ /// Shutdown waiter
+ shutdown_rx: shutdown::Receiver,
+
+ /// Shutdown valve for Pool
+ blocking: PoolWaiter,
+}
+
+// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
+// loom doesn't support that because it requires CoerceUnsized, which is
+// unstable
+type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
+
+impl ThreadPool {
+ pub(crate) fn new<F, P>(
+ pool_size: usize,
+ blocking_pool: Arc<blocking::Pool>,
+ around_worker: Callback,
+ mut build_park: F,
+ ) -> ThreadPool
+ where
+ F: FnMut(usize) -> P,
+ P: Park + Send + 'static,
+ {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+
+ let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| {
+ // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never
+ // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually
+ // the case. Only `build_with_park` and each worker hold onto a copy of this Arc.
+ // `build_with_park` drops it immediately, and the workers drop theirs when their `run`
+ // method returns (and their copy of the Arc are dropped). In fact, we don't actually
+ // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto
+ // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient.
+ let shutdown_tx = shutdown_tx.clone();
+ let around_worker = around_worker.clone();
+
+ Box::new(move || {
+ struct AbortOnPanic;
+
+ impl Drop for AbortOnPanic {
+ fn drop(&mut self) {
+ if std::thread::panicking() {
+ eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported.");
+ std::process::abort();
+ }
+ }
+ }
+
+ let _abort_on_panic = AbortOnPanic;
+
+ let idx = worker.id();
+ let mut f = Some(move || worker.run());
+ around_worker(idx, &mut || {
+ (f.take()
+ .expect("around_thread callback called closure twice"))(
+ )
+ });
+
+ // Dropping the handle must happen __after__ the callback
+ drop(shutdown_tx);
+ }) as Box<dyn FnOnce() + Send + 'static>
+ })
+ as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>);
+
+ let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
+ pool_size,
+ |i| Box::new(BoxedPark::new(build_park(i))),
+ Arc::clone(&launch_worker),
+ blocking_pool.clone(),
+ );
+
+ // Spawn threads for each worker
+ for worker in workers {
+ crate::runtime::blocking::Pool::spawn(&blocking_pool, launch_worker(worker))
+ }
+
+ let spawner = Spawner::new(pool);
+ let blocking = crate::runtime::blocking::PoolWaiter::from(blocking_pool);
+
+ // ThreadPool::from_parts(spawner, shutdown_rx, blocking)
+ ThreadPool {
+ spawner,
+ shutdown_rx,
+ blocking,
+ }
+ }
+
+ /// Returns reference to `Spawner`.
+ ///
+ /// The `Spawner` handle can be cloned and enables spawning tasks from other
+ /// threads.
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ /// Spawn a task
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ self.spawner.spawn(future)
+ }
+
+ /// Block the current thread waiting for the future to complete.
+ ///
+ /// The future will execute on the current thread, but all spawned tasks
+ /// will be executed on the thread pool.
+ pub(crate) fn block_on<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ crate::runtime::global::with_thread_pool(self.spawner(), || {
+ let mut enter = crate::runtime::enter();
+ crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || {
+ enter.block_on(future)
+ })
+ })
+ }
+
+ /// Shutdown the thread pool.
+ pub(crate) fn shutdown_now(&mut self) {
+ if self.spawner.workers().close() {
+ self.shutdown_rx.wait();
+ }
+ self.blocking.shutdown();
+ }
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ThreadPool").finish()
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ self.shutdown_now();
+ }
+}
+
+// TODO: delete?
+pub(crate) struct BoxedPark<P> {
+ inner: P,
+}
+
+impl<P> BoxedPark<P> {
+ pub(crate) fn new(inner: P) -> Self {
+ BoxedPark { inner }
+ }
+}
+
+impl<P> Park for BoxedPark<P>
+where
+ P: Park,
+{
+ type Unpark = Box<dyn crate::runtime::park::Unpark>;
+ type Error = P::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ Box::new(self.inner.unpark())
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+}
diff --git a/tokio/src/runtime/thread_pool/pool.rs b/tokio/src/runtime/thread_pool/pool.rs
deleted file mode 100644
index a6ef4346..00000000
--- a/tokio/src/runtime/thread_pool/pool.rs
+++ /dev/null
@@ -1,84 +0,0 @@
-use crate::runtime::blocking::PoolWaiter;
-use crate::runtime::task::JoinHandle;
-use crate::runtime::thread_pool::{shutdown, Spawner};
-
-use std::fmt;
-use std::future::Future;
-
-/// Work-stealing based thread pool for executing futures.
-pub(crate) struct ThreadPool {
- spawner: Spawner,
-
- /// Shutdown waiter
- shutdown_rx: shutdown::Receiver,
-
- /// Shutdown valve for Pool
- blocking: PoolWaiter,
-}
-
-impl ThreadPool {
- pub(super) fn from_parts(
- spawner: Spawner,
- shutdown_rx: shutdown::Receiver,
- blocking: PoolWaiter,
- ) -> ThreadPool {
- ThreadPool {
- spawner,
- shutdown_rx,
- blocking,
- }
- }
-
- /// Returns reference to `Spawner`.
- ///
- /// The `Spawner` handle can be cloned and enables spawning tasks from other
- /// threads.
- pub(crate) fn spawner(&self) -> &Spawner {
- &self.spawner
- }
-
- /// Spawn a task
- pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- self.spawner.spawn(future)
- }
-
- /// Block the current thread waiting for the future to complete.
- ///
- /// The future will execute on the current thread, but all spawned tasks
- /// will be executed on the thread pool.
- pub(crate) fn block_on<F>(&self, future: F) -> F::Output
- where
- F: Future,
- {
- crate::runtime::global::with_thread_pool(self.spawner(), || {
- let mut enter = crate::runtime::enter();
- crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || {
- enter.block_on(future)
- })
- })
- }
-
- /// Shutdown the thread pool.
- pub(crate) fn shutdown_now(&mut self) {
- if self.spawner.workers().close() {
- self.shutdown_rx.wait();
- }
- self.blocking.shutdown();
- }
-}
-
-impl fmt::Debug for ThreadPool {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("ThreadPool").finish()
- }
-}
-
-impl Drop for ThreadPool {
- fn drop(&mut self) {
- self.shutdown_now();
- }
-}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index b7be932c..5eb166ce 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -1,5 +1,5 @@
use crate::runtime::tests::loom_oneshot as oneshot;
-use crate::runtime::thread_pool::{self, Builder};
+use crate::runtime::thread_pool::{self, ThreadPool};
use crate::runtime::{Park, Unpark};
use crate::spawn;
@@ -13,8 +13,7 @@ use std::time::Duration;
#[test]
fn pool_multi_spawn() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
-
+ let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
let (tx, rx) = oneshot::channel();
@@ -47,7 +46,7 @@ fn pool_multi_spawn() {
#[test]
fn only_blocking() {
loom::model(|| {
- let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new());
+ let mut pool = mk_pool(1);
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(async move {
@@ -65,7 +64,7 @@ fn only_blocking() {
fn blocking_and_regular() {
const NUM: usize = 3;
loom::model(|| {
- let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new());
+ let mut pool = mk_pool(1);
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
@@ -99,7 +98,7 @@ fn blocking_and_regular() {
#[test]
fn pool_multi_notify() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
+ let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
@@ -135,7 +134,7 @@ fn pool_multi_notify() {
#[test]
fn pool_shutdown() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
+ let pool = mk_pool(2);
pool.spawn(async move {
gated2(true).await;
@@ -152,7 +151,7 @@ fn pool_shutdown() {
#[test]
fn complete_block_on_under_load() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
+ let pool = mk_pool(2);
pool.block_on(async {
// Spin hard
@@ -167,6 +166,17 @@ fn complete_block_on_under_load() {
});
}
+fn mk_pool(num_threads: usize) -> ThreadPool {
+ use crate::runtime::blocking;
+
+ ThreadPool::new(
+ num_threads,
+ blocking::Pool::new("test".into(), None),
+ Arc::new(Box::new(|_, next| next())),
+ move |_| LoomPark::new(),
+ )
+}
+
use futures::future::poll_fn;
use std::task::Poll;
async fn yield_once() {
diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs
index 050ffc7d..6e753b35 100644
--- a/tokio/src/runtime/thread_pool/tests/pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/pool.rs
@@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)]
-use crate::runtime::{thread_pool, Park, Unpark};
+use crate::runtime::thread_pool::ThreadPool;
+use crate::runtime::{blocking, Park, Unpark};
use futures_util::future::poll_fn;
use std::future::Future;
@@ -62,15 +63,20 @@ fn eagerly_drops_futures() {
let (park_tx, park_rx) = mpsc::sync_channel(0);
let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
- let pool = thread_pool::Builder::new().num_threads(4).build(move |_| {
- let (tx, rx) = mpsc::channel();
- MyPark {
- tx: Mutex::new(tx),
- rx,
- park_tx: park_tx.clone(),
- unpark_tx: unpark_tx.clone(),
- }
- });
+ let pool = ThreadPool::new(
+ 4,
+ blocking::Pool::new("test".into(), None),
+ Arc::new(Box::new(|_, next| next())),
+ move |_| {
+ let (tx, rx) = mpsc::channel();
+ MyPark {
+ tx: Mutex::new(tx),
+ rx,
+ park_tx: park_tx.clone(),
+ unpark_tx: unpark_tx.clone(),
+ }
+ },
+ );
struct MyTask {
task_tx: Option<mpsc::Sender<Waker>>,
@@ -160,15 +166,17 @@ fn park_called_at_interval() {
let (done_tx, done_rx) = mpsc::channel();
- // Use 1 thread to ensure the worker stays busy.
- let pool = thread_pool::Builder::new()
- .num_threads(1)
- .build(move |idx| {
+ let pool = ThreadPool::new(
+ 1,
+ blocking::Pool::new("test".into(), None),
+ Arc::new(Box::new(|_, next| next())),
+ move |idx| {
assert_eq!(idx, 0);
MyPark {
park_light: park_light_2.clone(),
}
- });
+ },
+ );
let mut cnt = 0;
diff --git a/tokio/src/runtime/thread_pool/tests/worker.rs b/tokio/src/runtime/thread_pool/tests/worker.rs
index edb97160..91ec5804 100644
--- a/tokio/src/runtime/thread_pool/tests/worker.rs
+++ b/tokio/src/runtime/thread_pool/tests/worker.rs
@@ -1,3 +1,4 @@
+use crate::runtime::blocking;
use crate::runtime::tests::track_drop::track_drop;
use crate::runtime::thread_pool;
@@ -12,7 +13,7 @@ macro_rules! pool {
}};
(! $n:expr) => {{
let mut mock_park = crate::runtime::tests::mock_park::MockPark::new();
- let blocking = std::sync::Arc::new(crate::runtime::blocking::Pool::default());
+ let blocking = blocking::Pool::new("test".into(), None);
let (pool, workers) = thread_pool::worker::create_set(
$n,
|index| Box::new(mock_park.mk_park(index)),