summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-03-05 10:31:37 -0800
committerGitHub <noreply@github.com>2020-03-05 10:31:37 -0800
commita78b1c65ccfb9692ca5d3ed8ddde934f40091d83 (patch)
treec88e547d6913b204f590aea54dc03328ee3cb094
parent5ede2e4d6b2f732e83e33f9693682dffc6c9f5b0 (diff)
rt: cleanup and simplify scheduler (scheduler v2.5) (#2273)
A refactor of the scheduler internals focusing on simplifying and reducing unsafety. There are no fundamental logic changes. * The state transitions of the core task component are refined and reduced. * `basic_scheduler` has most unsafety removed. * `local_set` has most unsafety removed. * `threaded_scheduler` limits most unsafety to its queue implementation.
-rw-r--r--azure-pipelines.yml18
-rw-r--r--benches/Cargo.toml5
-rw-r--r--benches/scheduler.rs152
-rw-r--r--ci/azure-check-features.yml2
-rw-r--r--ci/azure-install-rust.yml7
-rw-r--r--ci/azure-loom.yml25
-rw-r--r--ci/azure-miri.yml23
-rw-r--r--ci/azure-test-integration.yml2
-rw-r--r--ci/azure-test-stable.yml2
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/loom/std/alloc.rs18
-rw-r--r--tokio/src/loom/std/causal_cell.rs9
-rw-r--r--tokio/src/loom/std/mod.rs2
-rw-r--r--tokio/src/macros/assert.rs18
-rw-r--r--tokio/src/macros/mod.rs8
-rw-r--r--tokio/src/macros/scoped_tls.rs80
-rw-r--r--tokio/src/park/thread.rs4
-rw-r--r--tokio/src/runtime/basic_scheduler.rs461
-rw-r--r--tokio/src/runtime/blocking/pool.rs12
-rw-r--r--tokio/src/runtime/blocking/schedule.rs22
-rw-r--r--tokio/src/runtime/builder.rs6
-rw-r--r--tokio/src/runtime/mod.rs7
-rw-r--r--tokio/src/runtime/task/core.rs280
-rw-r--r--tokio/src/runtime/task/error.rs (renamed from tokio/src/task/error.rs)0
-rw-r--r--tokio/src/runtime/task/harness.rs369
-rw-r--r--tokio/src/runtime/task/join.rs (renamed from tokio/src/task/join.rs)60
-rw-r--r--tokio/src/runtime/task/mod.rs219
-rw-r--r--tokio/src/runtime/task/raw.rs131
-rw-r--r--tokio/src/runtime/task/stack.rs81
-rw-r--r--tokio/src/runtime/task/state.rs447
-rw-r--r--tokio/src/runtime/task/waker.rs (renamed from tokio/src/task/waker.rs)30
-rw-r--r--tokio/src/runtime/tests/loom_pool.rs381
-rw-r--r--tokio/src/runtime/tests/mod.rs13
-rw-r--r--tokio/src/runtime/tests/task.rs159
-rw-r--r--tokio/src/runtime/thread_pool/atomic_cell.rs52
-rw-r--r--tokio/src/runtime/thread_pool/current.rs84
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs92
-rw-r--r--tokio/src/runtime/thread_pool/owned.rs83
-rw-r--r--tokio/src/runtime/thread_pool/queue.rs568
-rw-r--r--tokio/src/runtime/thread_pool/queue/global.rs209
-rw-r--r--tokio/src/runtime/thread_pool/queue/inject.rs41
-rw-r--r--tokio/src/runtime/thread_pool/queue/local.rs298
-rw-r--r--tokio/src/runtime/thread_pool/queue/mod.rs41
-rw-r--r--tokio/src/runtime/thread_pool/queue/worker.rs127
-rw-r--r--tokio/src/runtime/thread_pool/shared.rs94
-rw-r--r--tokio/src/runtime/thread_pool/shutdown.rs44
-rw-r--r--tokio/src/runtime/thread_pool/slice.rs172
-rw-r--r--tokio/src/runtime/thread_pool/spawner.rs49
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs308
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_queue.rs69
-rw-r--r--tokio/src/runtime/thread_pool/tests/mod.rs8
-rw-r--r--tokio/src/runtime/thread_pool/tests/queue.rs277
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs1033
-rw-r--r--tokio/src/sync/notify.rs19
-rw-r--r--tokio/src/task/core.rs156
-rw-r--r--tokio/src/task/harness.rs558
-rw-r--r--tokio/src/task/list.rs96
-rw-r--r--tokio/src/task/local.rs393
-rw-r--r--tokio/src/task/mod.rs171
-rw-r--r--tokio/src/task/queue.rs338
-rw-r--r--tokio/src/task/raw.rs197
-rw-r--r--tokio/src/task/stack.rs88
-rw-r--r--tokio/src/task/state.rs497
-rw-r--r--tokio/src/task/tests/loom.rs277
-rw-r--r--tokio/src/task/tests/mod.rs5
-rw-r--r--tokio/src/task/tests/task.rs661
-rw-r--r--tokio/src/tests/backoff.rs32
-rw-r--r--tokio/src/tests/loom_schedule.rs53
-rw-r--r--tokio/src/tests/mock_schedule.rs134
-rw-r--r--tokio/src/tests/mod.rs10
-rw-r--r--tokio/src/tests/track_drop.rs57
-rw-r--r--tokio/src/util/linked_list.rs105
-rw-r--r--tokio/src/util/mod.rs13
-rw-r--r--tokio/src/util/try_lock.rs21
-rw-r--r--tokio/src/util/wake.rs83
-rw-r--r--tokio/tests/rt_common.rs116
-rw-r--r--tokio/tests/task_local_set.rs33
77 files changed, 4396 insertions, 6423 deletions
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 4743a66a..cc50f3c8 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -44,13 +44,10 @@ jobs:
displayName: Test build permutations
rust: stable
-# Run loom tests
-- template: ci/azure-loom.yml
+# Run miri tests
+- template: ci/azure-miri.yml
parameters:
- name: loom
- rust: stable
- crates:
- - tokio
+ name: miri
# Try cross compiling
- template: ci/azure-cross-compile.yml
@@ -99,16 +96,25 @@ jobs:
# name: tsan
# rust: stable
+# Run loom tests
+- template: ci/azure-loom.yml
+ parameters:
+ name: loom
+ rust: stable
+
- template: ci/azure-deploy-docs.yml
parameters:
rust: stable
dependsOn:
- rustfmt
+ - docs
- clippy
- test_tokio
- test_linux
+ - test_integration
- test_build
- loom
+ - miri
- cross
- minrust
- check_features
diff --git a/benches/Cargo.toml b/benches/Cargo.toml
index 8c9ad1d3..f4a1d8fb 100644
--- a/benches/Cargo.toml
+++ b/benches/Cargo.toml
@@ -17,3 +17,8 @@ harness = false
name = "mpsc"
path = "mpsc.rs"
harness = false
+
+[[bench]]
+name = "scheduler"
+path = "scheduler.rs"
+harness = false
diff --git a/benches/scheduler.rs b/benches/scheduler.rs
new file mode 100644
index 00000000..0562a120
--- /dev/null
+++ b/benches/scheduler.rs
@@ -0,0 +1,152 @@
+//! Benchmark implementation details of the theaded scheduler. These benches are
+//! intended to be used as a form of regression testing and not as a general
+//! purpose benchmark demonstrating real-world performance.
+
+use tokio::runtime::{self, Runtime};
+use tokio::sync::oneshot;
+
+use bencher::{benchmark_group, benchmark_main, Bencher};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::{mpsc, Arc};
+
+fn spawn_many(b: &mut Bencher) {
+ const NUM_SPAWN: usize = 10_000;
+
+ let mut rt = rt();
+
+ let (tx, rx) = mpsc::sync_channel(1000);
+ let rem = Arc::new(AtomicUsize::new(0));
+
+ b.iter(|| {
+ rem.store(NUM_SPAWN, Relaxed);
+
+ rt.block_on(async {
+ for _ in 0..NUM_SPAWN {
+ let tx = tx.clone();
+ let rem = rem.clone();
+
+ tokio::spawn(async move {
+ if 1 == rem.fetch_sub(1, Relaxed) {
+ tx.send(()).unwrap();
+ }
+ });
+ }
+
+ let _ = rx.recv().unwrap();
+ });
+ });
+}
+
+fn yield_many(b: &mut Bencher) {
+ const NUM_YIELD: usize = 1_000;
+ const TASKS: usize = 200;
+
+ let rt = rt();
+
+ let (tx, rx) = mpsc::sync_channel(TASKS);
+
+ b.iter(move || {
+ for _ in 0..TASKS {
+ let tx = tx.clone();
+
+ rt.spawn(async move {
+ for _ in 0..NUM_YIELD {
+ tokio::task::yield_now().await;
+ }
+
+ tx.send(()).unwrap();
+ });
+ }
+
+ for _ in 0..TASKS {
+ let _ = rx.recv().unwrap();
+ }
+ });
+}
+
+fn ping_pong(b: &mut Bencher) {
+ const NUM_PINGS: usize = 1_000;
+
+ let mut rt = rt();
+
+ let (done_tx, done_rx) = mpsc::sync_channel(1000);
+ let rem = Arc::new(AtomicUsize::new(0));
+
+ b.iter(|| {
+ let done_tx = done_tx.clone();
+ let rem = rem.clone();
+ rem.store(NUM_PINGS, Relaxed);
+
+ rt.block_on(async {
+ tokio::spawn(async move {
+ for _ in 0..NUM_PINGS {
+ let rem = rem.clone();
+ let done_tx = done_tx.clone();
+
+ tokio::spawn(async move {
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+
+ tokio::spawn(async move {
+ rx1.await.unwrap();
+ tx2.send(()).unwrap();
+ });
+
+ tx1.send(()).unwrap();
+ rx2.await.unwrap();
+
+ if 1 == rem.fetch_sub(1, Relaxed) {
+ done_tx.send(()).unwrap();
+ }
+ });
+ }
+ });
+
+ done_rx.recv().unwrap();
+ });
+ });
+}
+
+fn chained_spawn(b: &mut Bencher) {
+ const ITER: usize = 1_000;
+
+ let mut rt = rt();
+
+ fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
+ if n == 0 {
+ done_tx.send(()).unwrap();
+ } else {
+ tokio::spawn(async move {
+ iter(done_tx, n - 1);
+ });
+ }
+ }
+
+ let (done_tx, done_rx) = mpsc::sync_channel(1000);
+
+ b.iter(move || {
+ let done_tx = done_tx.clone();
+
+ rt.block_on(async {
+ tokio::spawn(async move {
+ iter(done_tx, ITER);
+ });
+
+ done_rx.recv().unwrap();
+ });
+ });
+}
+
+fn rt() -> Runtime {
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(4)
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,);
+
+benchmark_main!(scheduler);
diff --git a/ci/azure-check-features.yml b/ci/azure-check-features.yml
index a80af0d3..f5985843 100644
--- a/ci/azure-check-features.yml
+++ b/ci/azure-check-features.yml
@@ -6,7 +6,7 @@ jobs:
Linux:
vmImage: ubuntu-16.04
MacOS:
- vmImage: macOS-10.13
+ vmImage: macos-latest
Windows:
vmImage: vs2017-win2016
pool:
diff --git a/ci/azure-install-rust.yml b/ci/azure-install-rust.yml
index 2b4feb2a..4cf5ca3f 100644
--- a/ci/azure-install-rust.yml
+++ b/ci/azure-install-rust.yml
@@ -2,6 +2,13 @@ steps:
# Linux and macOS.
- script: |
set -e
+
+ if [ "$RUSTUP_TOOLCHAIN" == "nightly" ]; then
+ echo "++ getting latest miri version"
+ export RUSTUP_TOOLCHAIN="nightly-$(curl -s https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/miri)"
+ echo "$RUSTUP_TOOLCHAIN"
+ fi
+
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none
export PATH=$PATH:$HOME/.cargo/bin
rustup toolchain install $RUSTUP_TOOLCHAIN
diff --git a/ci/azure-loom.yml b/ci/azure-loom.yml
index fdfb8670..001aedec 100644
--- a/ci/azure-loom.yml
+++ b/ci/azure-loom.yml
@@ -1,6 +1,18 @@
jobs:
- job: ${{ parameters.name }}
displayName: Loom tests
+ strategy:
+ matrix:
+ rest:
+ scope: --skip loom_pool
+ pool_group_a:
+ scope: loom_pool::group_a
+ pool_group_b:
+ scope: loom_pool::group_b
+ pool_group_c:
+ scope: loom_pool::group_c
+ pool_group_d:
+ scope: loom_pool::group_d
pool:
vmImage: ubuntu-16.04
@@ -9,10 +21,9 @@ jobs:
parameters:
rust_version: ${{ parameters.rust }}
- - ${{ each crate in parameters.crates }}:
- - script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --test-threads=1 --nocapture
- env:
- LOOM_MAX_PREEMPTIONS: 1
- CI: 'True'
- displayName: test ${{ crate }}
- workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
+ - script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --nocapture $(scope)
+ env:
+ LOOM_MAX_PREEMPTIONS: 2
+ CI: 'True'
+ displayName: $(scope)
+ workingDirectory: $(Build.SourcesDirectory)/tokio
diff --git a/ci/azure-miri.yml b/ci/azure-miri.yml
new file mode 100644
index 00000000..fb886edc
--- /dev/null
+++ b/ci/azure-miri.yml
@@ -0,0 +1,23 @@
+jobs:
+- job: ${{ parameters.name }}
+ displayName: Miri
+ pool:
+ vmImage: ubuntu-16.04
+
+ steps:
+ - template: azure-install-rust.yml
+ parameters:
+ rust_version: nightly
+
+ - script: |
+ rustup component add miri
+ cargo miri setup
+ rm -rf $(Build.SourcesDirectory)/tokio/tests
+ displayName: Install miri
+
+ # TODO: enable all tests once they pass
+ - script: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task
+ env:
+ CI: 'True'
+ displayName: cargo miri test
+ workingDirectory: $(Build.SourcesDirectory)/tokio
diff --git a/ci/azure-test-integration.yml b/ci/azure-test-integration.yml
index fc45429a..f498a649 100644
--- a/ci/azure-test-integration.yml
+++ b/ci/azure-test-integration.yml
@@ -6,7 +6,7 @@ jobs:
Linux:
vmImage: ubuntu-16.04
MacOS:
- vmImage: macOS-10.13
+ vmImage: macos-latest
Windows:
vmImage: vs2017-win2016
pool:
diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml
index a924c928..bc93febd 100644
--- a/ci/azure-test-stable.yml
+++ b/ci/azure-test-stable.yml
@@ -8,7 +8,7 @@ jobs:
${{ if parameters.cross }}:
MacOS:
- vmImage: macOS-10.13
+ vmImage: macos-latest
Windows:
vmImage: vs2017-win2016
pool:
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 843a1e64..1498c2b5 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -382,10 +382,6 @@ cfg_macros! {
}
}
-// Tests
-#[cfg(test)]
-mod tests;
-
// TODO: rm
#[cfg(feature = "io-util")]
#[cfg(test)]
diff --git a/tokio/src/loom/std/alloc.rs b/tokio/src/loom/std/alloc.rs
deleted file mode 100644
index 25b199b1..00000000
--- a/tokio/src/loom/std/alloc.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-#[derive(Debug)]
-pub(crate) struct Track<T> {
- value: T,
-}
-
-impl<T> Track<T> {
- pub(crate) fn new(value: T) -> Track<T> {
- Track { value }
- }
-
- pub(crate) fn get_mut(&mut self) -> &mut T {
- &mut self.value
- }
-
- pub(crate) fn into_inner(self) -> T {
- self.value
- }
-}
diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs
index c4917e5f..8300437a 100644
--- a/tokio/src/loom/std/causal_cell.rs
+++ b/tokio/src/loom/std/causal_cell.rs
@@ -18,15 +18,6 @@ impl<T> CausalCell<T> {
f(self.0.get())
}
- pub(crate) fn with_unchecked<F, R>(&self, f: F) -> R
- where
- F: FnOnce(*const T) -> R,
- {
- f(self.0.get())
- }
-
- pub(crate) fn check(&self) {}
-
pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck)
where
F: FnOnce(*const T) -> R,
diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs
index e4bae357..a56d778a 100644
--- a/tokio/src/loom/std/mod.rs
+++ b/tokio/src/loom/std/mod.rs
@@ -5,8 +5,6 @@ mod atomic_u64;
mod atomic_usize;
mod causal_cell;
-pub(crate) mod alloc;
-
pub(crate) mod cell {
pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
}
diff --git a/tokio/src/macros/assert.rs b/tokio/src/macros/assert.rs
deleted file mode 100644
index 4b1cf272..00000000
--- a/tokio/src/macros/assert.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-/// Asserts option is some
-macro_rules! assert_some {
- ($e:expr) => {{
- match $e {
- Some(v) => v,
- _ => panic!("expected some, was none"),
- }
- }};
-}
-
-/// Asserts option is none
-macro_rules! assert_none {
- ($e:expr) => {{
- if let Some(v) = $e {
- panic!("expected none, was {:?}", v);
- }
- }};
-}
diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs
index a37b3e49..2643c360 100644
--- a/tokio/src/macros/mod.rs
+++ b/tokio/src/macros/mod.rs
@@ -1,10 +1,6 @@
#![cfg_attr(not(feature = "full"), allow(unused_macros))]
#[macro_use]
-#[cfg(test)]
-mod assert;
-
-#[macro_use]
mod cfg;
#[macro_use]
@@ -19,6 +15,10 @@ mod ready;
#[macro_use]
mod thread_local;
+#[macro_use]
+#[cfg(feature = "rt-core")]
+pub(crate) mod scoped_tls;
+
cfg_macros! {
#[macro_use]
mod select;
diff --git a/tokio/src/macros/scoped_tls.rs b/tokio/src/macros/scoped_tls.rs
new file mode 100644
index 00000000..666f382b
--- /dev/null
+++ b/tokio/src/macros/scoped_tls.rs
@@ -0,0 +1,80 @@
+use crate::loom::thread::LocalKey;
+
+use std::cell::Cell;
+use std::marker;
+
+/// Set a reference as a thread-local
+#[macro_export]
+macro_rules! scoped_thread_local {
+ ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => (
+ $(#[$attrs])*
+ $vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty>
+ = $crate::macros::scoped_tls::ScopedKey {
+ inner: {
+ thread_local!(static FOO: ::std::cell::Cell<*const ()> = {
+ std::cell::Cell::new(::std::ptr::null())
+ });
+ &FOO
+ },
+ _marker: ::std::marker::PhantomData,
+ };
+ )
+}
+
+/// Type representing a thread local storage key corresponding to a reference
+/// to the type parameter `T`.
+pub(crate) struct ScopedKey<T> {
+ #[doc(hidden)]
+ pub(crate) inner: &'static LocalKey<Cell<*const ()>>,
+ #[doc(hidden)]
+ pub(crate) _marker: marker::PhantomData<T>,
+}
+
+unsafe impl<T> Sync for ScopedKey<T> {}
+
+impl<T> ScopedKey<T> {
+ /// Inserts a value into this scoped thread local storage slot for a
+ /// duration of a closure.
+ pub(crate) fn set<F, R>(&'static self, t: &T, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ struct Reset {
+ key: &'static LocalKey<Cell<*const ()>>,
+ val: *const (),
+ }
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ self.key.with(|c| c.set(self.val));
+ }
+ }
+
+ let prev = self.inner.with(|c| {
+ let prev = c.get();
+ c.set(t as *const _ as *const ());
+ prev
+ });
+
+ let _reset = Reset {