diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-05 10:31:37 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-05 10:31:37 -0800 |
commit | a78b1c65ccfb9692ca5d3ed8ddde934f40091d83 (patch) | |
tree | c88e547d6913b204f590aea54dc03328ee3cb094 | |
parent | 5ede2e4d6b2f732e83e33f9693682dffc6c9f5b0 (diff) |
rt: cleanup and simplify scheduler (scheduler v2.5) (#2273)
A refactor of the scheduler internals focusing on simplifying and
reducing unsafety. There are no fundamental logic changes.
* The state transitions of the core task component are refined and
reduced.
* `basic_scheduler` has most unsafety removed.
* `local_set` has most unsafety removed.
* `threaded_scheduler` limits most unsafety to its queue implementation.
77 files changed, 4396 insertions, 6423 deletions
diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4743a66a..cc50f3c8 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -44,13 +44,10 @@ jobs: displayName: Test build permutations rust: stable -# Run loom tests -- template: ci/azure-loom.yml +# Run miri tests +- template: ci/azure-miri.yml parameters: - name: loom - rust: stable - crates: - - tokio + name: miri # Try cross compiling - template: ci/azure-cross-compile.yml @@ -99,16 +96,25 @@ jobs: # name: tsan # rust: stable +# Run loom tests +- template: ci/azure-loom.yml + parameters: + name: loom + rust: stable + - template: ci/azure-deploy-docs.yml parameters: rust: stable dependsOn: - rustfmt + - docs - clippy - test_tokio - test_linux + - test_integration - test_build - loom + - miri - cross - minrust - check_features diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 8c9ad1d3..f4a1d8fb 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -17,3 +17,8 @@ harness = false name = "mpsc" path = "mpsc.rs" harness = false + +[[bench]] +name = "scheduler" +path = "scheduler.rs" +harness = false diff --git a/benches/scheduler.rs b/benches/scheduler.rs new file mode 100644 index 00000000..0562a120 --- /dev/null +++ b/benches/scheduler.rs @@ -0,0 +1,152 @@ +//! Benchmark implementation details of the theaded scheduler. These benches are +//! intended to be used as a form of regression testing and not as a general +//! purpose benchmark demonstrating real-world performance. + +use tokio::runtime::{self, Runtime}; +use tokio::sync::oneshot; + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{mpsc, Arc}; + +fn spawn_many(b: &mut Bencher) { + const NUM_SPAWN: usize = 10_000; + + let mut rt = rt(); + + let (tx, rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + rem.store(NUM_SPAWN, Relaxed); + + rt.block_on(async { + for _ in 0..NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); + + tokio::spawn(async move { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } + + let _ = rx.recv().unwrap(); + }); + }); +} + +fn yield_many(b: &mut Bencher) { + const NUM_YIELD: usize = 1_000; + const TASKS: usize = 200; + + let rt = rt(); + + let (tx, rx) = mpsc::sync_channel(TASKS); + + b.iter(move || { + for _ in 0..TASKS { + let tx = tx.clone(); + + rt.spawn(async move { + for _ in 0..NUM_YIELD { + tokio::task::yield_now().await; + } + + tx.send(()).unwrap(); + }); + } + + for _ in 0..TASKS { + let _ = rx.recv().unwrap(); + } + }); +} + +fn ping_pong(b: &mut Bencher) { + const NUM_PINGS: usize = 1_000; + + let mut rt = rt(); + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + let done_tx = done_tx.clone(); + let rem = rem.clone(); + rem.store(NUM_PINGS, Relaxed); + + rt.block_on(async { + tokio::spawn(async move { + for _ in 0..NUM_PINGS { + let rem = rem.clone(); + let done_tx = done_tx.clone(); + + tokio::spawn(async move { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + tokio::spawn(async move { + rx1.await.unwrap(); + tx2.send(()).unwrap(); + }); + + tx1.send(()).unwrap(); + rx2.await.unwrap(); + + if 1 == rem.fetch_sub(1, Relaxed) { + done_tx.send(()).unwrap(); + } + }); + } + }); + + done_rx.recv().unwrap(); + }); + }); +} + +fn chained_spawn(b: &mut Bencher) { + const ITER: usize = 1_000; + + let mut rt = rt(); + + fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + tokio::spawn(async move { + iter(done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + + b.iter(move || { + let done_tx = done_tx.clone(); + + rt.block_on(async { + tokio::spawn(async move { + iter(done_tx, ITER); + }); + + done_rx.recv().unwrap(); + }); + }); +} + +fn rt() -> Runtime { + runtime::Builder::new() + .threaded_scheduler() + .core_threads(4) + .enable_all() + .build() + .unwrap() +} + +benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,); + +benchmark_main!(scheduler); diff --git a/ci/azure-check-features.yml b/ci/azure-check-features.yml index a80af0d3..f5985843 100644 --- a/ci/azure-check-features.yml +++ b/ci/azure-check-features.yml @@ -6,7 +6,7 @@ jobs: Linux: vmImage: ubuntu-16.04 MacOS: - vmImage: macOS-10.13 + vmImage: macos-latest Windows: vmImage: vs2017-win2016 pool: diff --git a/ci/azure-install-rust.yml b/ci/azure-install-rust.yml index 2b4feb2a..4cf5ca3f 100644 --- a/ci/azure-install-rust.yml +++ b/ci/azure-install-rust.yml @@ -2,6 +2,13 @@ steps: # Linux and macOS. - script: | set -e + + if [ "$RUSTUP_TOOLCHAIN" == "nightly" ]; then + echo "++ getting latest miri version" + export RUSTUP_TOOLCHAIN="nightly-$(curl -s https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/miri)" + echo "$RUSTUP_TOOLCHAIN" + fi + curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none export PATH=$PATH:$HOME/.cargo/bin rustup toolchain install $RUSTUP_TOOLCHAIN diff --git a/ci/azure-loom.yml b/ci/azure-loom.yml index fdfb8670..001aedec 100644 --- a/ci/azure-loom.yml +++ b/ci/azure-loom.yml @@ -1,6 +1,18 @@ jobs: - job: ${{ parameters.name }} displayName: Loom tests + strategy: + matrix: + rest: + scope: --skip loom_pool + pool_group_a: + scope: loom_pool::group_a + pool_group_b: + scope: loom_pool::group_b + pool_group_c: + scope: loom_pool::group_c + pool_group_d: + scope: loom_pool::group_d pool: vmImage: ubuntu-16.04 @@ -9,10 +21,9 @@ jobs: parameters: rust_version: ${{ parameters.rust }} - - ${{ each crate in parameters.crates }}: - - script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --test-threads=1 --nocapture - env: - LOOM_MAX_PREEMPTIONS: 1 - CI: 'True' - displayName: test ${{ crate }} - workingDirectory: $(Build.SourcesDirectory)/${{ crate }} + - script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --nocapture $(scope) + env: + LOOM_MAX_PREEMPTIONS: 2 + CI: 'True' + displayName: $(scope) + workingDirectory: $(Build.SourcesDirectory)/tokio diff --git a/ci/azure-miri.yml b/ci/azure-miri.yml new file mode 100644 index 00000000..fb886edc --- /dev/null +++ b/ci/azure-miri.yml @@ -0,0 +1,23 @@ +jobs: +- job: ${{ parameters.name }} + displayName: Miri + pool: + vmImage: ubuntu-16.04 + + steps: + - template: azure-install-rust.yml + parameters: + rust_version: nightly + + - script: | + rustup component add miri + cargo miri setup + rm -rf $(Build.SourcesDirectory)/tokio/tests + displayName: Install miri + + # TODO: enable all tests once they pass + - script: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task + env: + CI: 'True' + displayName: cargo miri test + workingDirectory: $(Build.SourcesDirectory)/tokio diff --git a/ci/azure-test-integration.yml b/ci/azure-test-integration.yml index fc45429a..f498a649 100644 --- a/ci/azure-test-integration.yml +++ b/ci/azure-test-integration.yml @@ -6,7 +6,7 @@ jobs: Linux: vmImage: ubuntu-16.04 MacOS: - vmImage: macOS-10.13 + vmImage: macos-latest Windows: vmImage: vs2017-win2016 pool: diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index a924c928..bc93febd 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -8,7 +8,7 @@ jobs: ${{ if parameters.cross }}: MacOS: - vmImage: macOS-10.13 + vmImage: macos-latest Windows: vmImage: vs2017-win2016 pool: diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 843a1e64..1498c2b5 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -382,10 +382,6 @@ cfg_macros! { } } -// Tests -#[cfg(test)] -mod tests; - // TODO: rm #[cfg(feature = "io-util")] #[cfg(test)] diff --git a/tokio/src/loom/std/alloc.rs b/tokio/src/loom/std/alloc.rs deleted file mode 100644 index 25b199b1..00000000 --- a/tokio/src/loom/std/alloc.rs +++ /dev/null @@ -1,18 +0,0 @@ -#[derive(Debug)] -pub(crate) struct Track<T> { - value: T, -} - -impl<T> Track<T> { - pub(crate) fn new(value: T) -> Track<T> { - Track { value } - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.value - } - - pub(crate) fn into_inner(self) -> T { - self.value - } -} diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs index c4917e5f..8300437a 100644 --- a/tokio/src/loom/std/causal_cell.rs +++ b/tokio/src/loom/std/causal_cell.rs @@ -18,15 +18,6 @@ impl<T> CausalCell<T> { f(self.0.get()) } - pub(crate) fn with_unchecked<F, R>(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn check(&self) {} - pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck) where F: FnOnce(*const T) -> R, diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index e4bae357..a56d778a 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -5,8 +5,6 @@ mod atomic_u64; mod atomic_usize; mod causal_cell; -pub(crate) mod alloc; - pub(crate) mod cell { pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; } diff --git a/tokio/src/macros/assert.rs b/tokio/src/macros/assert.rs deleted file mode 100644 index 4b1cf272..00000000 --- a/tokio/src/macros/assert.rs +++ /dev/null @@ -1,18 +0,0 @@ -/// Asserts option is some -macro_rules! assert_some { - ($e:expr) => {{ - match $e { - Some(v) => v, - _ => panic!("expected some, was none"), - } - }}; -} - -/// Asserts option is none -macro_rules! assert_none { - ($e:expr) => {{ - if let Some(v) = $e { - panic!("expected none, was {:?}", v); - } - }}; -} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index a37b3e49..2643c360 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -1,10 +1,6 @@ #![cfg_attr(not(feature = "full"), allow(unused_macros))] #[macro_use] -#[cfg(test)] -mod assert; - -#[macro_use] mod cfg; #[macro_use] @@ -19,6 +15,10 @@ mod ready; #[macro_use] mod thread_local; +#[macro_use] +#[cfg(feature = "rt-core")] +pub(crate) mod scoped_tls; + cfg_macros! { #[macro_use] mod select; diff --git a/tokio/src/macros/scoped_tls.rs b/tokio/src/macros/scoped_tls.rs new file mode 100644 index 00000000..666f382b --- /dev/null +++ b/tokio/src/macros/scoped_tls.rs @@ -0,0 +1,80 @@ +use crate::loom::thread::LocalKey; + +use std::cell::Cell; +use std::marker; + +/// Set a reference as a thread-local +#[macro_export] +macro_rules! scoped_thread_local { + ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => ( + $(#[$attrs])* + $vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty> + = $crate::macros::scoped_tls::ScopedKey { + inner: { + thread_local!(static FOO: ::std::cell::Cell<*const ()> = { + std::cell::Cell::new(::std::ptr::null()) + }); + &FOO + }, + _marker: ::std::marker::PhantomData, + }; + ) +} + +/// Type representing a thread local storage key corresponding to a reference +/// to the type parameter `T`. +pub(crate) struct ScopedKey<T> { + #[doc(hidden)] + pub(crate) inner: &'static LocalKey<Cell<*const ()>>, + #[doc(hidden)] + pub(crate) _marker: marker::PhantomData<T>, +} + +unsafe impl<T> Sync for ScopedKey<T> {} + +impl<T> ScopedKey<T> { + /// Inserts a value into this scoped thread local storage slot for a + /// duration of a closure. + pub(crate) fn set<F, R>(&'static self, t: &T, f: F) -> R + where + F: FnOnce() -> R, + { + struct Reset { + key: &'static LocalKey<Cell<*const ()>>, + val: *const (), + } + + impl Drop for Reset { + fn drop(&mut self) { + self.key.with(|c| c.set(self.val)); + } + } + + let prev = self.inner.with(|c| { + let prev = c.get(); + c.set(t as *const _ as *const ()); + prev + }); + + let _reset = Reset { |