diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-06 21:29:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-06 21:29:10 -0800 |
commit | 4dbe6af0a1a1c8e579b92ec8ffc1d419244e0944 (patch) | |
tree | 4ee02032f615464af50aa6858440c5cecccec4cf /tokio/src/runtime/thread_pool | |
parent | 9bec094150e869caae5105d7080f0ae54757b2d9 (diff) |
runtime: misc pool cleanup (#1743)
- Remove builders for internal types
- Avoid duplicating the blocking pool when using the concurrent
scheduler.
- misc smaller cleanup
Diffstat (limited to 'tokio/src/runtime/thread_pool')
-rw-r--r-- | tokio/src/runtime/thread_pool/builder.rs | 208 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 190 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/pool.rs | 84 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 26 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/pool.rs | 38 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/worker.rs | 3 |
6 files changed, 227 insertions, 322 deletions
diff --git a/tokio/src/runtime/thread_pool/builder.rs b/tokio/src/runtime/thread_pool/builder.rs deleted file mode 100644 index 04b5fa23..00000000 --- a/tokio/src/runtime/thread_pool/builder.rs +++ /dev/null @@ -1,208 +0,0 @@ -use crate::loom::sync::Arc; -use crate::loom::sys::num_cpus; -use crate::runtime::park::Park; -use crate::runtime::thread_pool::{shutdown, Spawner, ThreadPool, Worker}; - -use std::{fmt, usize}; - -/// Builds a thread pool with custom configuration values. -pub(crate) struct Builder { - /// Number of worker threads to spawn - pool_size: usize, - - /// Thread name - name: String, - - /// Thread stack size - stack_size: Option<usize>, - - /// Around worker callback - around_worker: Option<Callback>, -} - -// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized -// loom doesn't support that because it requires CoerceUnsized, which is unstable -type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; - -impl Builder { - /// Returns a new thread pool builder initialized with default configuration - /// values. - pub(crate) fn new() -> Builder { - Builder { - pool_size: num_cpus(), - name: "tokio-runtime-worker".to_string(), - stack_size: None, - around_worker: None, - } - } - - /// Set the number of threads running async tasks. - /// - /// This must be a number between 1 and 2,048 though it is advised to keep - /// this value on the smaller side. - /// - /// The default value is the number of cores available to the system. - pub(crate) fn num_threads(&mut self, value: usize) -> &mut Self { - self.pool_size = value; - self - } - - /// Set name of threads spawned by the scheduler - /// - /// If this configuration is not set, then the thread will use the system - /// default naming scheme. - pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self { - self.name = val.into(); - self - } - - /// Set the stack size (in bytes) for worker threads. - /// - /// The actual stack size may be greater than this value if the platform - /// specifies minimal stack size. - /// - /// The default stack size for spawned threads is 2 MiB, though this - /// particular stack size is subject to change in the future. - pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self { - self.stack_size = Some(val); - self - } - - /// Execute function `f` on each worker thread. - /// - /// This function is provided a function that executes the worker and is - /// expected to call it, otherwise the worker thread will shutdown without - /// doing any work. - pub(crate) fn around_worker<F>(&mut self, f: F) -> &mut Self - where - F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static, - { - self.around_worker = Some(Arc::new(Box::new(f))); - self - } - - /// Create the configured `ThreadPool` with a custom `park` instances. - /// - /// The provided closure `build_park` is called once per worker and returns - /// a `Park` instance that is used by the worker to put itself to sleep. - pub(crate) fn build<F, P>(&self, mut build_park: F) -> ThreadPool - where - F: FnMut(usize) -> P, - P: Park + Send + 'static, - { - use crate::runtime::thread_pool::worker; - - let (shutdown_tx, shutdown_rx) = shutdown::channel(); - - let around_worker = self.around_worker.as_ref().map(Arc::clone); - let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| { - // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never - // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually - // the case. Only `build_with_park` and each worker hold onto a copy of this Arc. - // `build_with_park` drops it immediately, and the workers drop theirs when their `run` - // method returns (and their copy of the Arc are dropped). In fact, we don't actually - // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto - // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient. - let shutdown_tx = shutdown_tx.clone(); - let around_worker = around_worker.as_ref().map(Arc::clone); - Box::new(move || { - struct AbortOnPanic; - - impl Drop for AbortOnPanic { - fn drop(&mut self) { - if std::thread::panicking() { - eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); - std::process::abort(); - } - } - } - - let _abort_on_panic = AbortOnPanic; - if let Some(cb) = around_worker.as_ref() { - let idx = worker.id(); - let mut f = Some(move || worker.run()); - cb(idx, &mut || { - (f.take() - .expect("around_thread callback called closure twice"))( - ) - }) - } else { - worker.run() - } - - // Dropping the handle must happen __after__ the callback - drop(shutdown_tx); - }) as Box<dyn FnOnce() + Send + 'static> - }) - as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>); - - let mut blocking = crate::runtime::blocking::Builder::default(); - blocking.name(self.name.clone()); - if let Some(ss) = self.stack_size { - blocking.stack_size(ss); - } - let blocking = Arc::new(blocking.build()); - - let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( - self.pool_size, - |i| Box::new(BoxedPark::new(build_park(i))), - Arc::clone(&launch_worker), - blocking.clone(), - ); - - // Spawn threads for each worker - for worker in workers { - crate::runtime::blocking::Pool::spawn(&blocking, launch_worker(worker)) - } - - let spawner = Spawner::new(pool); - let blocking = crate::runtime::blocking::PoolWaiter::from(blocking); - ThreadPool::from_parts(spawner, shutdown_rx, blocking) - } -} - -impl Default for Builder { - fn default() -> Builder { - Builder::new() - } -} - -impl fmt::Debug for Builder { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Builder") - .field("pool_size", &self.pool_size) - .field("name", &self.name) - .field("stack_size", &self.stack_size) - .finish() - } -} - -pub(crate) struct BoxedPark<P> { - inner: P, -} - -impl<P> BoxedPark<P> { - pub(crate) fn new(inner: P) -> Self { - BoxedPark { inner } - } -} - -impl<P> Park for BoxedPark<P> -where - P: Park, -{ - type Unpark = Box<dyn crate::runtime::park::Unpark>; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - Box::new(self.inner.unpark()) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park() - } - - fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { - self.inner.park_timeout(duration) - } -} diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 5dce2a10..314942d3 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -1,8 +1,5 @@ //! Threadpool -mod builder; -pub(crate) use self::builder::Builder; - mod current; mod idle; @@ -11,9 +8,6 @@ use self::idle::Idle; mod owned; use self::owned::Owned; -mod pool; -pub(crate) use self::pool::ThreadPool; - mod queue; mod spawner; @@ -44,3 +38,187 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; // time. #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; + +use crate::loom::sync::Arc; +use crate::runtime::blocking::{self, PoolWaiter}; +use crate::runtime::task::JoinHandle; +use crate::runtime::Park; + +use std::fmt; +use std::future::Future; + +/// Work-stealing based thread pool for executing futures. +pub(crate) struct ThreadPool { + spawner: Spawner, + + /// Shutdown waiter + shutdown_rx: shutdown::Receiver, + + /// Shutdown valve for Pool + blocking: PoolWaiter, +} + +// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized +// loom doesn't support that because it requires CoerceUnsized, which is +// unstable +type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; + +impl ThreadPool { + pub(crate) fn new<F, P>( + pool_size: usize, + blocking_pool: Arc<blocking::Pool>, + around_worker: Callback, + mut build_park: F, + ) -> ThreadPool + where + F: FnMut(usize) -> P, + P: Park + Send + 'static, + { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + + let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| { + // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never + // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually + // the case. Only `build_with_park` and each worker hold onto a copy of this Arc. + // `build_with_park` drops it immediately, and the workers drop theirs when their `run` + // method returns (and their copy of the Arc are dropped). In fact, we don't actually + // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto + // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient. + let shutdown_tx = shutdown_tx.clone(); + let around_worker = around_worker.clone(); + + Box::new(move || { + struct AbortOnPanic; + + impl Drop for AbortOnPanic { + fn drop(&mut self) { + if std::thread::panicking() { + eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); + std::process::abort(); + } + } + } + + let _abort_on_panic = AbortOnPanic; + + let idx = worker.id(); + let mut f = Some(move || worker.run()); + around_worker(idx, &mut || { + (f.take() + .expect("around_thread callback called closure twice"))( + ) + }); + + // Dropping the handle must happen __after__ the callback + drop(shutdown_tx); + }) as Box<dyn FnOnce() + Send + 'static> + }) + as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>); + + let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( + pool_size, + |i| Box::new(BoxedPark::new(build_park(i))), + Arc::clone(&launch_worker), + blocking_pool.clone(), + ); + + // Spawn threads for each worker + for worker in workers { + crate::runtime::blocking::Pool::spawn(&blocking_pool, launch_worker(worker)) + } + + let spawner = Spawner::new(pool); + let blocking = crate::runtime::blocking::PoolWaiter::from(blocking_pool); + + // ThreadPool::from_parts(spawner, shutdown_rx, blocking) + ThreadPool { + spawner, + shutdown_rx, + blocking, + } + } + + /// Returns reference to `Spawner`. + /// + /// The `Spawner` handle can be cloned and enables spawning tasks from other + /// threads. + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Spawn a task + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawner.spawn(future) + } + + /// Block the current thread waiting for the future to complete. + /// + /// The future will execute on the current thread, but all spawned tasks + /// will be executed on the thread pool. + pub(crate) fn block_on<F>(&self, future: F) -> F::Output + where + F: Future, + { + crate::runtime::global::with_thread_pool(self.spawner(), || { + let mut enter = crate::runtime::enter(); + crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || { + enter.block_on(future) + }) + }) + } + + /// Shutdown the thread pool. + pub(crate) fn shutdown_now(&mut self) { + if self.spawner.workers().close() { + self.shutdown_rx.wait(); + } + self.blocking.shutdown(); + } +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ThreadPool").finish() + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + self.shutdown_now(); + } +} + +// TODO: delete? +pub(crate) struct BoxedPark<P> { + inner: P, +} + +impl<P> BoxedPark<P> { + pub(crate) fn new(inner: P) -> Self { + BoxedPark { inner } + } +} + +impl<P> Park for BoxedPark<P> +where + P: Park, +{ + type Unpark = Box<dyn crate::runtime::park::Unpark>; + type Error = P::Error; + + fn unpark(&self) -> Self::Unpark { + Box::new(self.inner.unpark()) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } +} diff --git a/tokio/src/runtime/thread_pool/pool.rs b/tokio/src/runtime/thread_pool/pool.rs deleted file mode 100644 index a6ef4346..00000000 --- a/tokio/src/runtime/thread_pool/pool.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::runtime::blocking::PoolWaiter; -use crate::runtime::task::JoinHandle; -use crate::runtime::thread_pool::{shutdown, Spawner}; - -use std::fmt; -use std::future::Future; - -/// Work-stealing based thread pool for executing futures. -pub(crate) struct ThreadPool { - spawner: Spawner, - - /// Shutdown waiter - shutdown_rx: shutdown::Receiver, - - /// Shutdown valve for Pool - blocking: PoolWaiter, -} - -impl ThreadPool { - pub(super) fn from_parts( - spawner: Spawner, - shutdown_rx: shutdown::Receiver, - blocking: PoolWaiter, - ) -> ThreadPool { - ThreadPool { - spawner, - shutdown_rx, - blocking, - } - } - - /// Returns reference to `Spawner`. - /// - /// The `Spawner` handle can be cloned and enables spawning tasks from other - /// threads. - pub(crate) fn spawner(&self) -> &Spawner { - &self.spawner - } - - /// Spawn a task - pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawner.spawn(future) - } - - /// Block the current thread waiting for the future to complete. - /// - /// The future will execute on the current thread, but all spawned tasks - /// will be executed on the thread pool. - pub(crate) fn block_on<F>(&self, future: F) -> F::Output - where - F: Future, - { - crate::runtime::global::with_thread_pool(self.spawner(), || { - let mut enter = crate::runtime::enter(); - crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || { - enter.block_on(future) - }) - }) - } - - /// Shutdown the thread pool. - pub(crate) fn shutdown_now(&mut self) { - if self.spawner.workers().close() { - self.shutdown_rx.wait(); - } - self.blocking.shutdown(); - } -} - -impl fmt::Debug for ThreadPool { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ThreadPool").finish() - } -} - -impl Drop for ThreadPool { - fn drop(&mut self) { - self.shutdown_now(); - } -} diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index b7be932c..5eb166ce 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -1,5 +1,5 @@ use crate::runtime::tests::loom_oneshot as oneshot; -use crate::runtime::thread_pool::{self, Builder}; +use crate::runtime::thread_pool::{self, ThreadPool}; use crate::runtime::{Park, Unpark}; use crate::spawn; @@ -13,8 +13,7 @@ use std::time::Duration; #[test] fn pool_multi_spawn() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); - + let pool = mk_pool(2); let c1 = Arc::new(AtomicUsize::new(0)); let (tx, rx) = oneshot::channel(); @@ -47,7 +46,7 @@ fn pool_multi_spawn() { #[test] fn only_blocking() { loom::model(|| { - let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new()); + let mut pool = mk_pool(1); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { @@ -65,7 +64,7 @@ fn only_blocking() { fn blocking_and_regular() { const NUM: usize = 3; loom::model(|| { - let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new()); + let mut pool = mk_pool(1); let cnt = Arc::new(AtomicUsize::new(0)); let (block_tx, block_rx) = oneshot::channel(); @@ -99,7 +98,7 @@ fn blocking_and_regular() { #[test] fn pool_multi_notify() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); + let pool = mk_pool(2); let c1 = Arc::new(AtomicUsize::new(0)); @@ -135,7 +134,7 @@ fn pool_multi_notify() { #[test] fn pool_shutdown() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); + let pool = mk_pool(2); pool.spawn(async move { gated2(true).await; @@ -152,7 +151,7 @@ fn pool_shutdown() { #[test] fn complete_block_on_under_load() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); + let pool = mk_pool(2); pool.block_on(async { // Spin hard @@ -167,6 +166,17 @@ fn complete_block_on_under_load() { }); } +fn mk_pool(num_threads: usize) -> ThreadPool { + use crate::runtime::blocking; + + ThreadPool::new( + num_threads, + blocking::Pool::new("test".into(), None), + Arc::new(Box::new(|_, next| next())), + move |_| LoomPark::new(), + ) +} + use futures::future::poll_fn; use std::task::Poll; async fn yield_once() { diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs index 050ffc7d..6e753b35 100644 --- a/tokio/src/runtime/thread_pool/tests/pool.rs +++ b/tokio/src/runtime/thread_pool/tests/pool.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] -use crate::runtime::{thread_pool, Park, Unpark}; +use crate::runtime::thread_pool::ThreadPool; +use crate::runtime::{blocking, Park, Unpark}; use futures_util::future::poll_fn; use std::future::Future; @@ -62,15 +63,20 @@ fn eagerly_drops_futures() { let (park_tx, park_rx) = mpsc::sync_channel(0); let (unpark_tx, unpark_rx) = mpsc::sync_channel(0); - let pool = thread_pool::Builder::new().num_threads(4).build(move |_| { - let (tx, rx) = mpsc::channel(); - MyPark { - tx: Mutex::new(tx), - rx, - park_tx: park_tx.clone(), - unpark_tx: unpark_tx.clone(), - } - }); + let pool = ThreadPool::new( + 4, + blocking::Pool::new("test".into(), None), + Arc::new(Box::new(|_, next| next())), + move |_| { + let (tx, rx) = mpsc::channel(); + MyPark { + tx: Mutex::new(tx), + rx, + park_tx: park_tx.clone(), + unpark_tx: unpark_tx.clone(), + } + }, + ); struct MyTask { task_tx: Option<mpsc::Sender<Waker>>, @@ -160,15 +166,17 @@ fn park_called_at_interval() { let (done_tx, done_rx) = mpsc::channel(); - // Use 1 thread to ensure the worker stays busy. - let pool = thread_pool::Builder::new() - .num_threads(1) - .build(move |idx| { + let pool = ThreadPool::new( + 1, + blocking::Pool::new("test".into(), None), + Arc::new(Box::new(|_, next| next())), + move |idx| { assert_eq!(idx, 0); MyPark { park_light: park_light_2.clone(), } - }); + }, + ); let mut cnt = 0; diff --git a/tokio/src/runtime/thread_pool/tests/worker.rs b/tokio/src/runtime/thread_pool/tests/worker.rs index edb97160..91ec5804 100644 --- a/tokio/src/runtime/thread_pool/tests/worker.rs +++ b/tokio/src/runtime/thread_pool/tests/worker.rs @@ -1,3 +1,4 @@ +use crate::runtime::blocking; use crate::runtime::tests::track_drop::track_drop; use crate::runtime::thread_pool; @@ -12,7 +13,7 @@ macro_rules! pool { }}; (! $n:expr) => {{ let mut mock_park = crate::runtime::tests::mock_park::MockPark::new(); - let blocking = std::sync::Arc::new(crate::runtime::blocking::Pool::default()); + let blocking = blocking::Pool::new("test".into(), None); let (pool, workers) = thread_pool::worker::create_set( $n, |index| Box::new(mock_park.mk_park(index)), |