summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool/tests/loom_pool.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-21 23:28:39 -0800
committerGitHub <noreply@github.com>2019-11-21 23:28:39 -0800
commit8546ff826db8dba1e39b4119ad909fb6cab2492a (patch)
tree0c1cdd36aaf9d732079a4ff7a71e5c6b138e7d42 /tokio/src/runtime/thread_pool/tests/loom_pool.rs
parent6866fe426cfab0e4da3e88c673f7bef141259bb6 (diff)
runtime: cleanup and add config options (#1807)
* runtime: cleanup and add config options This patch finishes the cleanup as part of the transition to Tokio 0.2. A number of changes were made to take advantage of having all Tokio types in a single crate. Also, fixes using Tokio types from `spawn_blocking`. * Many threads, one resource driver Previously, in the threaded scheduler, a resource driver (mio::Poll / timer combo) was created per thread. This was more or less fine, except it required balancing across the available drivers. When using a resource driver from **outside** of the thread pool, balancing is tricky. The change was original done to avoid having a dedicated driver thread. Now, instead of creating many resource drivers, a single resource driver is used. Each scheduler thread will attempt to "lock" the resource driver before parking on it. If the resource driver is already locked, the thread uses a condition variable to park. Contention should remain low as, under load, the scheduler avoids using the drivers. * Add configuration options to enable I/O / time New configuration options are added to `runtime::Builder` to allow enabling I/O and time drivers on a runtime instance basis. This is useful when wanting to create lightweight runtime instances to execute compute only tasks. * Bug fixes The condition variable parker is updated to the same algorithm used in `std`. This is motivated by some potential deadlock cases discovered by `loom`. The basic scheduler is fixed to fairly schedule tasks. `push_front` was accidentally used instead of `push_back`. I/O, time, and spawning now work from within `spawn_blocking` closures. * Misc cleanup The threaded scheduler is no longer generic over `P :Park`. Instead, it is hard coded to a specific parker. Tests, including loom tests, are updated to use `Runtime` directly. This provides greater coverage. The `blocking` module is moved back into `runtime` as all usage is within `runtime` itself.
Diffstat (limited to 'tokio/src/runtime/thread_pool/tests/loom_pool.rs')
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs101
1 files changed, 12 insertions, 89 deletions
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index b982e24e..81e292d6 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -1,14 +1,12 @@
+use crate::runtime::{self, Runtime};
use crate::runtime::tests::loom_oneshot as oneshot;
-use crate::runtime::thread_pool::ThreadPool;
-use crate::runtime::{Park, Unpark};
use crate::spawn;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
-use loom::sync::{Arc, Mutex, Notify};
+use loom::sync::{Arc, Mutex};
use std::future::Future;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
-use std::time::Duration;
#[test]
fn pool_multi_spawn() {
@@ -46,7 +44,7 @@ fn pool_multi_spawn() {
#[test]
fn only_blocking() {
loom::model(|| {
- let mut pool = mk_pool(1);
+ let pool = mk_pool(1);
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(async move {
@@ -56,7 +54,7 @@ fn only_blocking() {
});
block_rx.recv();
- pool.shutdown_now();
+ drop(pool);
});
}
@@ -64,7 +62,7 @@ fn only_blocking() {
fn blocking_and_regular() {
const NUM: usize = 3;
loom::model(|| {
- let mut pool = mk_pool(1);
+ let pool = mk_pool(1);
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
@@ -91,7 +89,7 @@ fn blocking_and_regular() {
done_rx.recv();
block_rx.recv();
- pool.shutdown_now();
+ drop(pool);
});
}
@@ -153,7 +151,7 @@ fn complete_block_on_under_load() {
use futures::FutureExt;
loom::model(|| {
- let pool = mk_pool(2);
+ let mut pool = mk_pool(2);
pool.block_on({
futures::future::lazy(|_| ()).then(|_| {
@@ -171,20 +169,11 @@ fn complete_block_on_under_load() {
}
fn mk_pool(num_threads: usize) -> Runtime {
- use crate::blocking::BlockingPool;
-
- let blocking_pool = BlockingPool::new("test".into(), None);
- let executor = ThreadPool::new(
- num_threads,
- blocking_pool.spawner().clone(),
- Arc::new(Box::new(|_, next| next())),
- move |_| LoomPark::new(),
- );
-
- Runtime {
- executor,
- blocking_pool,
- }
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .num_threads(num_threads)
+ .build()
+ .unwrap()
}
use futures::future::poll_fn;
@@ -244,69 +233,3 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> {
}
})
}
-
-/// Fake runtime
-struct Runtime {
- executor: ThreadPool,
- #[allow(dead_code)]
- blocking_pool: crate::blocking::BlockingPool,
-}
-
-use std::ops;
-
-impl ops::Deref for Runtime {
- type Target = ThreadPool;
-
- fn deref(&self) -> &ThreadPool {
- &self.executor
- }
-}
-
-impl ops::DerefMut for Runtime {
- fn deref_mut(&mut self) -> &mut ThreadPool {
- &mut self.executor
- }
-}
-
-struct LoomPark {
- notify: Arc<Notify>,
-}
-
-struct LoomUnpark {
- notify: Arc<Notify>,
-}
-
-impl LoomPark {
- fn new() -> LoomPark {
- LoomPark {
- notify: Arc::new(Notify::new()),
- }
- }
-}
-
-impl Park for LoomPark {
- type Unpark = LoomUnpark;
-
- type Error = ();
-
- fn unpark(&self) -> LoomUnpark {
- let notify = self.notify.clone();
- LoomUnpark { notify }
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.notify.wait();
- Ok(())
- }
-
- fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
- self.notify.wait();
- Ok(())
- }
-}
-
-impl Unpark for LoomUnpark {
- fn unpark(&self) {
- self.notify.notify();
- }
-}