diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-21 23:28:39 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-21 23:28:39 -0800 |
commit | 8546ff826db8dba1e39b4119ad909fb6cab2492a (patch) | |
tree | 0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/thread_pool/tests/loom_pool.rs | |
parent | 6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff) |
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options
This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.
* Many threads, one resource driver
Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.
Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.
* Add configuration options to enable I/O / time
New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.
* Bug fixes
The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.
The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.
I/O, time, and spawning now work from within `spawn_blocking` closures.
* Misc cleanup
The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.
The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/thread_pool/tests/loom_pool.rs')
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 101 |
1 files changed, 12 insertions, 89 deletions
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index b982e24e..81e292d6 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -1,14 +1,12 @@ +use crate::runtime::{self, Runtime}; use crate::runtime::tests::loom_oneshot as oneshot; -use crate::runtime::thread_pool::ThreadPool; -use crate::runtime::{Park, Unpark}; use crate::spawn; use loom::sync::atomic::{AtomicBool, AtomicUsize}; -use loom::sync::{Arc, Mutex, Notify}; +use loom::sync::{Arc, Mutex}; use std::future::Future; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use std::time::Duration; #[test] fn pool_multi_spawn() { @@ -46,7 +44,7 @@ fn pool_multi_spawn() { #[test] fn only_blocking() { loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { @@ -56,7 +54,7 @@ fn only_blocking() { }); block_rx.recv(); - pool.shutdown_now(); + drop(pool); }); } @@ -64,7 +62,7 @@ fn only_blocking() { fn blocking_and_regular() { const NUM: usize = 3; loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); let cnt = Arc::new(AtomicUsize::new(0)); let (block_tx, block_rx) = oneshot::channel(); @@ -91,7 +89,7 @@ fn blocking_and_regular() { done_rx.recv(); block_rx.recv(); - pool.shutdown_now(); + drop(pool); }); } @@ -153,7 +151,7 @@ fn complete_block_on_under_load() { use futures::FutureExt; loom::model(|| { - let pool = mk_pool(2); + let mut pool = mk_pool(2); pool.block_on({ futures::future::lazy(|_| ()).then(|_| { @@ -171,20 +169,11 @@ fn complete_block_on_under_load() { } fn mk_pool(num_threads: usize) -> Runtime { - use crate::blocking::BlockingPool; - - let blocking_pool = BlockingPool::new("test".into(), None); - let executor = ThreadPool::new( - num_threads, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |_| LoomPark::new(), - ); - - Runtime { - executor, - blocking_pool, - } + runtime::Builder::new() + .threaded_scheduler() + .num_threads(num_threads) + .build() + .unwrap() } use futures::future::poll_fn; @@ -244,69 +233,3 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> { } }) } - -/// Fake runtime -struct Runtime { - executor: ThreadPool, - #[allow(dead_code)] - blocking_pool: crate::blocking::BlockingPool, -} - -use std::ops; - -impl ops::Deref for Runtime { - type Target = ThreadPool; - - fn deref(&self) -> &ThreadPool { - &self.executor - } -} - -impl ops::DerefMut for Runtime { - fn deref_mut(&mut self) -> &mut ThreadPool { - &mut self.executor - } -} - -struct LoomPark { - notify: Arc<Notify>, -} - -struct LoomUnpark { - notify: Arc<Notify>, -} - -impl LoomPark { - fn new() -> LoomPark { - LoomPark { - notify: Arc::new(Notify::new()), - } - } -} - -impl Park for LoomPark { - type Unpark = LoomUnpark; - - type Error = (); - - fn unpark(&self) -> LoomUnpark { - let notify = self.notify.clone(); - LoomUnpark { notify } - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.notify.wait(); - Ok(()) - } - - fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { - self.notify.wait(); - Ok(()) - } -} - -impl Unpark for LoomUnpark { - fn unpark(&self) { - self.notify.notify(); - } -} |