diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-21 23:28:39 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-21 23:28:39 -0800 |
commit | 8546ff826db8dba1e39b4119ad909fb6cab2492a (patch) | |
tree | 0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/thread_pool/mod.rs | |
parent | 6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff) |
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/thread_pool/mod.rs')
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 95 |
1 files changed, 23 insertions, 72 deletions
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 4b23c3b9..3d795fa4 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -18,9 +18,8 @@ mod slice; mod shared; use self::shared::Shared; -mod shutdown; - mod worker; +use worker::Worker; cfg_blocking! { pub(crate) use worker::block_in_place; @@ -39,9 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; -use crate::blocking; -use crate::loom::sync::Arc; -use crate::runtime::Park; +use crate::runtime::{self, blocking, Parker}; use crate::task::JoinHandle; use std::fmt; @@ -50,48 +47,29 @@ use std::future::Future; /// Work-stealing based thread pool for executing futures. pub(crate) struct ThreadPool { spawner: Spawner, - - /// Shutdown waiter - shutdown_rx: shutdown::Receiver, } -// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized -// loom doesn't support that because it requires CoerceUnsized, which is -// unstable -type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; +pub(crate) struct Workers { + workers: Vec<Worker>, +} impl ThreadPool { - pub(crate) fn new<F, P>( + pub(crate) fn new( pool_size: usize, - blocking_pool: blocking::Spawner, - around_worker: Callback, - mut build_park: F, - ) -> ThreadPool - where - F: FnMut(usize) -> P, - P: Park + Send + 'static, - { - let (shutdown_tx, shutdown_rx) = shutdown::channel(); - - let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( + parker: Parker, + ) -> (ThreadPool, Workers) { + let (pool, workers) = worker::create_set( pool_size, - |i| BoxedPark::new(build_park(i)), - blocking_pool.clone(), - around_worker, - shutdown_tx, + parker, ); - // Spawn threads for each worker - for worker in workers { - blocking_pool.spawn_background(|| worker.run()); - } - let spawner = Spawner::new(pool); - ThreadPool { + let pool = ThreadPool { spawner, - shutdown_rx, - } + }; + + (pool, Workers { workers }) } /// Returns reference to `Spawner`. @@ -124,13 +102,6 @@ impl ThreadPool { enter.block_on(future) }) } - - /// Shutdown the thread pool. - pub(crate) fn shutdown_now(&mut self) { - if self.spawner.workers().close() { - self.shutdown_rx.wait(); - } - } } impl fmt::Debug for ThreadPool { @@ -141,37 +112,17 @@ impl fmt::Debug for ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - self.shutdown_now(); + self.spawner.workers().close(); } } -// TODO: delete? -pub(crate) struct BoxedPark<P> { - inner: P, -} - -impl<P> BoxedPark<P> { - pub(crate) fn new(inner: P) -> Self { - BoxedPark { inner } - } -} - -impl<P> Park for BoxedPark<P> -where - P: Park, -{ - type Unpark = Box<dyn crate::runtime::park::Unpark>; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - Box::new(self.inner.unpark()) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park() - } - - fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { - self.inner.park_timeout(duration) +impl Workers { + pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) { + blocking_pool.enter(|| { + for worker in self.workers { + let b = blocking_pool.clone(); + runtime::spawn_blocking(move || worker.run(b)); + } + }); } } |