summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool/mod.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-21 23:28:39 -0800
committerGitHub <noreply@github.com>2019-11-21 23:28:39 -0800
commit8546ff826db8dba1e39b4119ad909fb6cab2492a (patch)
tree0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/thread_pool/mod.rs
parent6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff)
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options This patch finishes the cleanup as part of the transition to Tokio 0.2. A number of changes were made to take advantage of having all Tokio types in a single crate. Also, fixes using Tokio types from `spawn_blocking`. * Many threads, one resource driver Previously, in the threaded scheduler, a resource driver (mio::Poll / timer combo) was created per thread. This was more or less fine, except it required balancing across the available drivers. When using a resource driver from **outside** of the thread pool, balancing is tricky. The change was original done to avoid having a dedicated driver thread. Now, instead of creating many resource drivers, a single resource driver is used. Each scheduler thread will attempt to "lock" the resource driver before parking on it. If the resource driver is already locked, the thread uses a condition variable to park. Contention should remain low as, under load, the scheduler avoids using the drivers. * Add configuration options to enable I/O / time New configuration options are added to `runtime::Builder` to allow enabling I/O and time drivers on a runtime instance basis. This is useful when wanting to create lightweight runtime instances to execute compute only tasks. * Bug fixes The condition variable parker is updated to the same algorithm used in `std`. This is motivated by some potential deadlock cases discovered by `loom`. The basic scheduler is fixed to fairly schedule tasks. `push_front` was accidentally used instead of `push_back`. I/O, time, and spawning now work from within `spawn_blocking` closures. * Misc cleanup The threaded scheduler is no longer generic over `P :Park`. Instead, it is hard coded to a specific parker. Tests, including loom tests, are updated to use `Runtime` directly. This provides greater coverage. The `blocking` module is moved back into `runtime` as all usage is within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/thread_pool/mod.rs')
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs95
1 files changed, 23 insertions, 72 deletions
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 4b23c3b9..3d795fa4 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -18,9 +18,8 @@ mod slice;
mod shared;
use self::shared::Shared;
-mod shutdown;
-
mod worker;
+use worker::Worker;
cfg_blocking! {
pub(crate) use worker::block_in_place;
@@ -39,9 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
-use crate::blocking;
-use crate::loom::sync::Arc;
-use crate::runtime::Park;
+use crate::runtime::{self, blocking, Parker};
use crate::task::JoinHandle;
use std::fmt;
@@ -50,48 +47,29 @@ use std::future::Future;
/// Work-stealing based thread pool for executing futures.
pub(crate) struct ThreadPool {
spawner: Spawner,
-
- /// Shutdown waiter
- shutdown_rx: shutdown::Receiver,
}
-// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
-// loom doesn't support that because it requires CoerceUnsized, which is
-// unstable
-type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
+pub(crate) struct Workers {
+ workers: Vec<Worker>,
+}
impl ThreadPool {
- pub(crate) fn new<F, P>(
+ pub(crate) fn new(
pool_size: usize,
- blocking_pool: blocking::Spawner,
- around_worker: Callback,
- mut build_park: F,
- ) -> ThreadPool
- where
- F: FnMut(usize) -> P,
- P: Park + Send + 'static,
- {
- let (shutdown_tx, shutdown_rx) = shutdown::channel();
-
- let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
+ parker: Parker,
+ ) -> (ThreadPool, Workers) {
+ let (pool, workers) = worker::create_set(
pool_size,
- |i| BoxedPark::new(build_park(i)),
- blocking_pool.clone(),
- around_worker,
- shutdown_tx,
+ parker,
);
- // Spawn threads for each worker
- for worker in workers {
- blocking_pool.spawn_background(|| worker.run());
- }
-
let spawner = Spawner::new(pool);
- ThreadPool {
+ let pool = ThreadPool {
spawner,
- shutdown_rx,
- }
+ };
+
+ (pool, Workers { workers })
}
/// Returns reference to `Spawner`.
@@ -124,13 +102,6 @@ impl ThreadPool {
enter.block_on(future)
})
}
-
- /// Shutdown the thread pool.
- pub(crate) fn shutdown_now(&mut self) {
- if self.spawner.workers().close() {
- self.shutdown_rx.wait();
- }
- }
}
impl fmt::Debug for ThreadPool {
@@ -141,37 +112,17 @@ impl fmt::Debug for ThreadPool {
impl Drop for ThreadPool {
fn drop(&mut self) {
- self.shutdown_now();
+ self.spawner.workers().close();
}
}
-// TODO: delete?
-pub(crate) struct BoxedPark<P> {
- inner: P,
-}
-
-impl<P> BoxedPark<P> {
- pub(crate) fn new(inner: P) -> Self {
- BoxedPark { inner }
- }
-}
-
-impl<P> Park for BoxedPark<P>
-where
- P: Park,
-{
- type Unpark = Box<dyn crate::runtime::park::Unpark>;
- type Error = P::Error;
-
- fn unpark(&self) -> Self::Unpark {
- Box::new(self.inner.unpark())
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.inner.park()
- }
-
- fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
- self.inner.park_timeout(duration)
+impl Workers {
+ pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) {
+ blocking_pool.enter(|| {
+ for worker in self.workers {
+ let b = blocking_pool.clone();
+ runtime::spawn_blocking(move || worker.run(b));
+ }
+ });
}
}