diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-06 21:29:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-06 21:29:10 -0800 |
commit | 4dbe6af0a1a1c8e579b92ec8ffc1d419244e0944 (patch) | |
tree | 4ee02032f615464af50aa6858440c5cecccec4cf | |
parent | 9bec094150e869caae5105d7080f0ae54757b2d9 (diff) |
runtime: misc pool cleanup (#1743)
- Remove builders for internal types
- Avoid duplicating the blocking pool when using the concurrent
scheduler.
- misc smaller cleanup
-rw-r--r-- | tokio/src/runtime/blocking/builder.rs | 58 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 65 | ||||
-rw-r--r-- | tokio/src/runtime/builder.rs | 104 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 13 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/builder.rs | 208 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 190 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/pool.rs | 84 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 26 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/pool.rs | 38 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/worker.rs | 3 |
10 files changed, 324 insertions, 465 deletions
diff --git a/tokio/src/runtime/blocking/builder.rs b/tokio/src/runtime/blocking/builder.rs deleted file mode 100644 index 59178f25..00000000 --- a/tokio/src/runtime/blocking/builder.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::loom::thread; -use crate::runtime::blocking::Pool; - -use std::usize; - -/// Builds a blocking thread pool with custom configuration values. -pub(crate) struct Builder { - /// Thread name - name: String, - - /// Thread stack size - stack_size: Option<usize>, -} - -impl Default for Builder { - fn default() -> Self { - Builder { - name: "tokio-blocking-thread".to_string(), - stack_size: None, - } - } -} - -impl Builder { - /// Set name of threads spawned by the pool - /// - /// If this configuration is not set, then the thread will use the system - /// default naming scheme. - pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self { - self.name = val.into(); - self - } - - /// Set the stack size (in bytes) for worker threads. - /// - /// The actual stack size may be greater than this value if the platform - /// specifies minimal stack size. - /// - /// The default stack size for spawned threads is 2 MiB, though this - /// particular stack size is subject to change in the future. - pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self { - self.stack_size = Some(val); - self - } - - pub(crate) fn build(self) -> Pool { - let mut p = Pool::default(); - let Builder { stack_size, name } = self; - p.new_thread = Box::new(move || { - let mut b = thread::Builder::new().name(name.clone()); - if let Some(stack_size) = stack_size { - b = b.stack_size(stack_size); - } - b - }); - p - } -} diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index e634ea59..941babaa 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -17,12 +17,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -#[cfg(feature = "rt-full")] -mod builder; - -#[cfg(feature = "rt-full")] -pub(crate) use builder::Builder; - #[derive(Clone, Copy)] enum State { Empty, @@ -69,11 +63,22 @@ where } pub(crate) struct Pool { + /// State shared between worker threads shared: Mutex<Shared>, + + /// Pool threads wait on this. condvar: Condvar, - new_thread: Box<dyn Fn() -> thread::Builder + Send + Sync + 'static>, + + /// Spawned threads use this name + thread_name: String, + + /// Spawned thread stack size + stack_size: Option<usize>, } +#[derive(Debug)] +pub(crate) struct PoolWaiter(Arc<Pool>); + impl fmt::Debug for Pool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Pool").finish() @@ -99,6 +104,21 @@ pub struct Blocking<T> { } impl Pool { + pub(crate) fn new(thread_name: String, stack_size: Option<usize>) -> Arc<Pool> { + Arc::new(Pool { + shared: Mutex::new(Shared { + queue: VecDeque::new(), + num_th: 0, + num_idle: 0, + num_notify: 0, + shutdown: false, + }), + condvar: Condvar::new(), + thread_name, + stack_size, + }) + } + /// Run the provided function on an executor dedicated to blocking operations. pub(crate) fn spawn(this: &Arc<Self>, f: Box<dyn FnOnce() + Send + 'static>) { let should_spawn = { @@ -135,12 +155,18 @@ impl Pool { }; if should_spawn { - Pool::spawn_thread(Arc::clone(this), (this.new_thread)()); + Pool::spawn_thread(Arc::clone(this)); } } // NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc - fn spawn_thread(this: Arc<Self>, builder: thread::Builder) { + fn spawn_thread(this: Arc<Self>) { + let mut builder = thread::Builder::new().name(this.thread_name.clone()); + + if let Some(stack_size) = this.stack_size { + builder = builder.stack_size(stack_size); + } + builder .spawn(move || { let mut shared = this.shared.lock().unwrap(); @@ -221,9 +247,6 @@ impl Pool { } } -#[derive(Debug)] -pub(crate) struct PoolWaiter(Arc<Pool>); - impl From<Pool> for PoolWaiter { fn from(p: Pool) -> Self { Self::from(Arc::new(p)) @@ -341,21 +364,3 @@ fn run_task(f: Box<dyn FnOnce() + Send>) { let _ = catch_unwind(AssertUnwindSafe(|| f())); } - -impl Default for Pool { - fn default() -> Self { - Pool { - shared: Mutex::new(Shared { - queue: VecDeque::new(), - num_th: 0, - num_idle: 0, - num_notify: 0, - shutdown: false, - }), - condvar: Condvar::new(), - new_thread: Box::new(|| { - thread::Builder::new().name("tokio-blocking-driver".to_string()) - }), - } - } -} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 92fdfa1a..6081c10e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,13 +1,9 @@ +use crate::loom::sync::Arc; #[cfg(feature = "blocking")] -use crate::runtime::blocking::{Pool, PoolWaiter}; -#[cfg(feature = "rt-current-thread")] -use crate::runtime::current_thread::CurrentThread; -#[cfg(feature = "rt-full")] -use crate::runtime::thread_pool; +use crate::runtime::blocking; use crate::runtime::{io, timer, Runtime}; use std::fmt; -use std::sync::Arc; /// Builds Tokio Runtime with custom configuration values. /// @@ -57,10 +53,10 @@ pub struct Builder { thread_stack_size: Option<usize>, /// Callback to run after each thread starts. - after_start: Option<Arc<dyn Fn() + Send + Sync>>, + after_start: Option<Callback>, /// To run before each worker thread stops - before_stop: Option<Arc<dyn Fn() + Send + Sync>>, + before_stop: Option<Callback>, /// The clock to use clock: timer::Clock, @@ -75,6 +71,12 @@ enum Kind { ThreadPool, } +#[cfg(not(loom))] +type Callback = Arc<dyn Fn() + Send + Sync>; + +#[cfg(loom)] +type Callback = Arc<Box<dyn Fn() + Send + Sync>>; + impl Builder { /// Returns a new runtime builder initialized with default configuration /// values. @@ -206,6 +208,7 @@ impl Builder { /// .build(); /// # } /// ``` + #[cfg(not(loom))] pub fn after_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, @@ -231,6 +234,7 @@ impl Builder { /// .build(); /// # } /// ``` + #[cfg(not(loom))] pub fn before_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static, @@ -285,13 +289,13 @@ impl Builder { net_handles, timer_handles, #[cfg(feature = "blocking")] - blocking_pool: PoolWaiter::from(Pool::default()), + blocking_pool: self.build_blocking_pool().into(), }) } #[cfg(feature = "rt-current-thread")] fn build_current_thread(&mut self) -> io::Result<Runtime> { - use crate::runtime::Kind; + use crate::runtime::{CurrentThread, Kind}; // Create network driver let (net, handle) = io::create()?; @@ -307,19 +311,19 @@ impl Builder { let executor = CurrentThread::new(timer); // Blocking pool - let blocking_pool = PoolWaiter::from(Pool::default()); + let blocking_pool = self.build_blocking_pool(); Ok(Runtime { kind: Kind::CurrentThread(executor), net_handles, timer_handles, - blocking_pool, + blocking_pool: blocking_pool.into(), }) } #[cfg(feature = "rt-full")] fn build_threadpool(&mut self) -> io::Result<Runtime> { - use crate::runtime::Kind; + use crate::runtime::{Kind, ThreadPool}; use crate::timer::clock; use std::sync::Mutex; @@ -341,8 +345,8 @@ impl Builder { // Get a handle to the clock for the runtime. let clock = self.clock.clone(); - // Blocking pool - let blocking_pool = PoolWaiter::from(Pool::default()); + // Create the blocking pool + let blocking_pool = self.build_blocking_pool(); let pool = { let net_handles = net_handles.clone(); @@ -351,48 +355,52 @@ impl Builder { let after_start = self.after_start.clone(); let before_stop = self.before_stop.clone(); - let mut builder = thread_pool::Builder::new(); - builder.num_threads(self.num_threads); - builder.name(&self.thread_name); - - if let Some(stack_size) = self.thread_stack_size { - builder.stack_size(stack_size); - } - - builder - .around_worker(move |index, next| { - // Configure the network driver - let _net = io::set_default(&net_handles[index]); - - // Configure the clock - clock::with_default(&clock, || { - // Configure the timer - let _timer = timer::set_default(&timer_handles[index]); - - // Call the start callback - if let Some(after_start) = after_start.as_ref() { - after_start(); - } - - // Run the worker - next(); - - // Call the after call back - if let Some(before_stop) = before_stop.as_ref() { - before_stop(); - } - }) + let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| { + // Configure the network driver + let _net = io::set_default(&net_handles[index]); + + // Configure the clock + clock::with_default(&clock, || { + // Configure the timer + let _timer = timer::set_default(&timer_handles[index]); + + // Call the start callback + if let Some(after_start) = after_start.as_ref() { + after_start(); + } + + // Run the worker + next(); + + // Call the after call back + if let Some(before_stop) = before_stop.as_ref() { + before_stop(); + } }) - .build(move |index| timers[index].lock().unwrap().take().unwrap()) + }) + as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>); + + ThreadPool::new( + self.num_threads, + blocking_pool.clone(), + around_worker, + move |index| timers[index].lock().unwrap().take().unwrap(), + ) }; Ok(Runtime { kind: Kind::ThreadPool(pool), net_handles, timer_handles, - blocking_pool, + blocking_pool: blocking_pool.into(), }) } + + #[cfg(feature = "blocking")] + fn build_blocking_pool(&self) -> Arc<blocking::Pool> { + // Create the blocking pool + blocking::Pool::new(self.thread_name.clone(), self.thread_stack_size) + } } impl Default for Builder { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 9dbc857a..c8ff71c8 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -137,17 +137,21 @@ mod tests; mod blocking; #[cfg(feature = "blocking")] pub mod blocking; +#[cfg(feature = "blocking")] +use crate::runtime::blocking::PoolWaiter; mod builder; pub use self::builder::Builder; #[cfg(feature = "rt-current-thread")] mod current_thread; +#[cfg(feature = "rt-current-thread")] +use self::current_thread::CurrentThread; #[cfg(feature = "blocking")] mod enter; #[cfg(feature = "blocking")] -pub(crate) use self::enter::enter; +use self::enter::enter; mod global; pub use self::global::spawn; @@ -171,13 +175,8 @@ mod timer; #[cfg(feature = "rt-full")] pub(crate) mod thread_pool; - -#[cfg(feature = "blocking")] -use crate::runtime::blocking::PoolWaiter; -#[cfg(feature = "rt-current-thread")] -use crate::runtime::current_thread::CurrentThread; #[cfg(feature = "rt-full")] -use crate::runtime::thread_pool::ThreadPool; +use self::thread_pool::ThreadPool; #[cfg(feature = "blocking")] use std::future::Future; diff --git a/tokio/src/runtime/thread_pool/builder.rs b/tokio/src/runtime/thread_pool/builder.rs deleted file mode 100644 index 04b5fa23..00000000 --- a/tokio/src/runtime/thread_pool/builder.rs +++ /dev/null @@ -1,208 +0,0 @@ -use crate::loom::sync::Arc; -use crate::loom::sys::num_cpus; -use crate::runtime::park::Park; -use crate::runtime::thread_pool::{shutdown, Spawner, ThreadPool, Worker}; - -use std::{fmt, usize}; - -/// Builds a thread pool with custom configuration values. -pub(crate) struct Builder { - /// Number of worker threads to spawn - pool_size: usize, - - /// Thread name - name: String, - - /// Thread stack size - stack_size: Option<usize>, - - /// Around worker callback - around_worker: Option<Callback>, -} - -// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized -// loom doesn't support that because it requires CoerceUnsized, which is unstable -type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; - -impl Builder { - /// Returns a new thread pool builder initialized with default configuration - /// values. - pub(crate) fn new() -> Builder { - Builder { - pool_size: num_cpus(), - name: "tokio-runtime-worker".to_string(), - stack_size: None, - around_worker: None, - } - } - - /// Set the number of threads running async tasks. - /// - /// This must be a number between 1 and 2,048 though it is advised to keep - /// this value on the smaller side. - /// - /// The default value is the number of cores available to the system. - pub(crate) fn num_threads(&mut self, value: usize) -> &mut Self { - self.pool_size = value; - self - } - - /// Set name of threads spawned by the scheduler - /// - /// If this configuration is not set, then the thread will use the system - /// default naming scheme. - pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self { - self.name = val.into(); - self - } - - /// Set the stack size (in bytes) for worker threads. - /// - /// The actual stack size may be greater than this value if the platform - /// specifies minimal stack size. - /// - /// The default stack size for spawned threads is 2 MiB, though this - /// particular stack size is subject to change in the future. - pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self { - self.stack_size = Some(val); - self - } - - /// Execute function `f` on each worker thread. - /// - /// This function is provided a function that executes the worker and is - /// expected to call it, otherwise the worker thread will shutdown without - /// doing any work. - pub(crate) fn around_worker<F>(&mut self, f: F) -> &mut Self - where - F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static, - { - self.around_worker = Some(Arc::new(Box::new(f))); - self - } - - /// Create the configured `ThreadPool` with a custom `park` instances. - /// - /// The provided closure `build_park` is called once per worker and returns - /// a `Park` instance that is used by the worker to put itself to sleep. - pub(crate) fn build<F, P>(&self, mut build_park: F) -> ThreadPool - where - F: FnMut(usize) -> P, - P: Park + Send + 'static, - { - use crate::runtime::thread_pool::worker; - - let (shutdown_tx, shutdown_rx) = shutdown::channel(); - - let around_worker = self.around_worker.as_ref().map(Arc::clone); - let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| { - // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never - // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually - // the case. Only `build_with_park` and each worker hold onto a copy of this Arc. - // `build_with_park` drops it immediately, and the workers drop theirs when their `run` - // method returns (and their copy of the Arc are dropped). In fact, we don't actually - // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto - // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient. - let shutdown_tx = shutdown_tx.clone(); - let around_worker = around_worker.as_ref().map(Arc::clone); - Box::new(move || { - struct AbortOnPanic; - - impl Drop for AbortOnPanic { - fn drop(&mut self) { - if std::thread::panicking() { - eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); - std::process::abort(); - } - } - } - - let _abort_on_panic = AbortOnPanic; - if let Some(cb) = around_worker.as_ref() { - let idx = worker.id(); - let mut f = Some(move || worker.run()); - cb(idx, &mut || { - (f.take() - .expect("around_thread callback called closure twice"))( - ) - }) - } else { - worker.run() - } - - // Dropping the handle must happen __after__ the callback - drop(shutdown_tx); - }) as Box<dyn FnOnce() + Send + 'static> - }) - as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>); - - let mut blocking = crate::runtime::blocking::Builder::default(); - blocking.name(self.name.clone()); - if let Some(ss) = self.stack_size { - blocking.stack_size(ss); - } - let blocking = Arc::new(blocking.build()); - - let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( - self.pool_size, - |i| Box::new(BoxedPark::new(build_park(i))), - Arc::clone(&launch_worker), - blocking.clone(), - ); - - // Spawn threads for each worker - for worker in workers { - crate::runtime::blocking::Pool::spawn(&blocking, launch_worker(worker)) - } - - let spawner = Spawner::new(pool); - let blocking = crate::runtime::blocking::PoolWaiter::from(blocking); - ThreadPool::from_parts(spawner, shutdown_rx, blocking) - } -} - -impl Default for Builder { - fn default() -> Builder { - Builder::new() - } -} - -impl fmt::Debug for Builder { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Builder") - .field("pool_size", &self.pool_size) - .field("name", &self.name) - .field("stack_size", &self.stack_size) - .finish() - } -} - -pub(crate) struct BoxedPark<P> { - inner: P, -} - -impl<P> BoxedPark<P> { - pub(crate) fn new(inner: P) -> Self { - BoxedPark { inner } - } -} - -impl<P> Park for BoxedPark<P> -where - P: Park, -{ - type Unpark = Box<dyn crate::runtime::park::Unpark>; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - Box::new(self.inner.unpark()) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park() - } - - fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { - self.inner.park_timeout(duration) - } -} diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 5dce2a10..314942d3 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -1,8 +1,5 @@ //! Threadpool -mod builder; -pub(crate) use self::builder::Builder; - mod current; mod idle; @@ -11,9 +8,6 @@ use self::idle::Idle; mod owned; use self::owned::Owned; -mod pool; -pub(crate) use self::pool::ThreadPool; - mod queue; mod spawner; @@ -44,3 +38,187 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; // time. #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; + +use crate::loom::sync::Arc; +use crate::runtime::blocking::{self, PoolWaiter}; +use crate::runtime::task::JoinHandle; +use crate::runtime::Park; + +use std::fmt; +use std::future::Future; + +/// Work-stealing based thread pool for executing futures. +pub(crate) struct ThreadPool { + spawner: Spawner, + + /// Shutdown waiter + shutdown_rx: shutdown::Receiver, + + /// Shutdown valve for Pool + blocking: PoolWaiter, +} + +// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized +// loom doesn't support that because it requires CoerceUnsized, which is +// unstable +type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; + +impl ThreadPool { + pub(crate) fn new<F, P>( + pool_size: usize, + blocking_pool: Arc<blocking::Pool>, + around_worker: Callback, + mut build_park: F, + ) -> ThreadPool + where + F: FnMut(usize) -> P, + P: Park + Send + 'static, + { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + + let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| { + // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never + // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually + // the case. Only `build_with_park` and each worker hold onto a copy of this Arc. + // `build_with_park` drops it immediately, and the workers drop theirs when their `run` + // method returns (and their copy of the Arc are dropped). In fact, we don't actually + // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto + // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient. + let shutdown_tx = shutdown_tx.clone(); + let around_worker = around_worker.clone(); + + Box::new(move || { + struct AbortOnPanic; + + impl Drop for AbortOnPanic { + fn drop(&mut self) { + if std::thread::panicking() { + eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); + std::process::abort(); + } + } + } + + let _abort_on_panic = AbortOnPanic; + + let idx = worker.id(); + let mut f = Some(move || worker.run()); + around_worker(idx, &mut || { + (f.take() + .expect("around_thread callback called closure twice"))( + ) + }); + + // Dropping the handle must happen __after__ the callback + drop(shutdown_tx); + }) as Box<dyn FnOnce() + Send + 'static> + }) + as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>); + + let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( + pool_size, + |i| Box::new(BoxedPark::new(build_park(i))), + Arc::clone(&launch_worker), + blocking_pool.clone(), + ); + + // Spawn threads for each worker + for worker in workers { + crate::runtime::blocking::Pool::spawn(&blocking_pool, launch_worker(worker)) + } + + let spawner = Spawner::new(pool); + let blocking = crate::runtime::blocking::PoolWaiter::from(blocking_pool); + + // ThreadPool::from_parts(spawner, shutdown_rx, blocking) + ThreadPool { + spawner, + shutdown_rx, + blocking, + } + } + + /// Returns reference to `Spawner`. + /// + /// The `Spawner` handle can be cloned and enables spawning tasks from other + /// threads. + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Spawn a task + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawner.spawn(future) + } + + /// Block the current thread waiting for the future to complete. + /// + /// The future will execute on the current thread, but all spawned tasks + /// will be executed on the thread pool. + pub(crate) fn block_on<F>(&self, future: F) -> F::Output + where + F: Future, + { + crate::runtime::global::with_thread_pool(self.spawner(), || { + let mut enter = crate::runtime::enter(); + crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || { + enter.block_on(future) + }) + }) + } + + /// Shutdown the thread pool. + pub(crate) fn shutdown_now(&mut self) { + if self.spawner.workers().close() { + self.shutdown_rx.wait(); + } + self.blocking.shutdown(); + } +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ThreadPool").finish() + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + self.shutdown_now(); + } +} + +// TODO: delete? +pub(crate) struct BoxedPark<P> { + inner: P, +} + +impl<P> BoxedPark<P> { + pub(crate) fn new(inner: P) -> Self { + BoxedPark { inner } + } +} + +impl<P> Park for BoxedPark<P> +where + P: Park, +{ + type Unpark = Box<dyn crate::runtime::park::Unpark>; + type Error = P::Error; + + fn unpark(&self) -> Self::Unpark { + Box::new(self.inner.unpark()) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } +} diff --git a/tokio/src/runtime/thread_pool/pool.rs b/tokio/src/runtime/thread_pool/pool.rs deleted file mode 100644 index a6ef4346..00000000 --- a/tokio/src/runtime/thread_pool/pool.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::runtime::blocking::PoolWaiter; -use crate::runtime::task::JoinHandle; -use crate::runtime::thread_pool::{shutdown, Spawner}; - -use std::fmt; -use std::future::Future; - -/// Work-stealing based thread pool for executing futures. -pub(crate) struct ThreadPool { - spawner: Spawner, - - /// Shutdown waiter - shutdown_rx: shutdown::Receiver, - - /// Shutdown valve for Pool - blocking: PoolWaiter, -} - -impl ThreadPool { - pub(super) fn from_parts( - spawner: Spawner, - shutdown_rx: shutdown::Receiver, - blocking: PoolWaiter, - ) -> ThreadPool { - ThreadPool { - spawner, - shutdown_rx, - blocking, - } - } - - /// Returns reference to `Spawner`. - /// - /// The `Spawner` handle can be cloned and enables spawning tasks from other - /// threads. - pub(crate) fn spawner(&self) -> &Spawner { - &self.spawner - } - - /// Spawn a task - pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawner.spawn(future) - } - - /// Block the current thread waiting for the future to complete. - /// - /// The future will execute on the current thread, but all spawned tasks - /// will be executed on the thread pool. - pub(crate) fn block_on<F>(&self, future: F) -> F::Output - where - F: Future, - { - crate::runtime::global::with_thread_pool(self.spawner(), || { - let mut enter = crate::runtime::enter(); - crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || { - enter.block_on(future) - }) - }) - } - - /// Shutdown the thread pool. - pub(crate) fn shutdown_now(&mut self) { - if self.spawner.workers().close() { - self.shutdown_rx.wait(); - } - self.blocking.shutdown(); - } -} - -impl fmt::Debug for ThreadPool { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ThreadPool").finish() - } -} - -impl Drop for ThreadPool { - fn drop(&mut self) { - self.shutdown_now(); - } -} diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index b7be932c..5eb166ce 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -1,5 +1,5 @@ use crate::runtime::tests::loom_oneshot as oneshot; -use crate::runtime::thread_pool::{self, Builder}; +use crate::runtime::thread_pool::{self, ThreadPool}; use crate::runtime::{Park, Unpark}; use crate::spawn; @@ -13,8 +13,7 @@ use std::time::Duration; #[test] fn pool_multi_spawn() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); - + let pool = mk_pool(2); let c1 = Arc::new(AtomicUsize::new(0)); let (tx, rx) = oneshot::channel(); @@ -47,7 +46,7 @@ fn pool_multi_spawn() { #[test] fn only_blocking() { loom::model(|| { - let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new()); + let mut pool = mk_pool(1); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { @@ -65,7 +64,7 @@ fn only_blocking() { fn blocking_and_regular() { const NUM: usize = 3; loom::model(|| { - let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new()); + let mut pool = mk_pool(1); let cnt = Arc::new(AtomicUsize::new(0)); let (block_tx, block_rx) = oneshot::channel(); @@ -99,7 +98,7 @@ fn blocking_and_regular() { #[test] fn pool_multi_notify() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); + let pool = mk_pool(2); let c1 = Arc::new(AtomicUsize::new(0)); @@ -135,7 +134,7 @@ fn pool_multi_notify() { #[test] fn pool_shutdown() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); + let pool = mk_pool(2); pool.spawn(async move { gated2(true).await; @@ -152,7 +151,7 @@ fn pool_shutdown() { #[test] fn complete_block_on_under_load() { loom::model(|| { - let pool = Builder::new().build(|_| LoomPark::new()); + let pool = mk_pool(2); pool.block_on(async { // Spin hard @@ -167,6 +166,17 @@ fn complete_block_on_under_load() { }); |