From a78b1c65ccfb9692ca5d3ed8ddde934f40091d83 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 5 Mar 2020 10:31:37 -0800 Subject: rt: cleanup and simplify scheduler (scheduler v2.5) (#2273) A refactor of the scheduler internals focusing on simplifying and reducing unsafety. There are no fundamental logic changes. * The state transitions of the core task component are refined and reduced. * `basic_scheduler` has most unsafety removed. * `local_set` has most unsafety removed. * `threaded_scheduler` limits most unsafety to its queue implementation. --- azure-pipelines.yml | 18 +- benches/Cargo.toml | 5 + benches/scheduler.rs | 152 +++ ci/azure-check-features.yml | 2 +- ci/azure-install-rust.yml | 7 + ci/azure-loom.yml | 25 +- ci/azure-miri.yml | 23 + ci/azure-test-integration.yml | 2 +- ci/azure-test-stable.yml | 2 +- tokio/src/lib.rs | 4 - tokio/src/loom/std/alloc.rs | 18 - tokio/src/loom/std/causal_cell.rs | 9 - tokio/src/loom/std/mod.rs | 2 - tokio/src/macros/assert.rs | 18 - tokio/src/macros/mod.rs | 8 +- tokio/src/macros/scoped_tls.rs | 80 ++ tokio/src/park/thread.rs | 4 + tokio/src/runtime/basic_scheduler.rs | 461 +++++---- tokio/src/runtime/blocking/pool.rs | 12 +- tokio/src/runtime/blocking/schedule.rs | 22 +- tokio/src/runtime/builder.rs | 6 +- tokio/src/runtime/mod.rs | 7 +- tokio/src/runtime/task/core.rs | 280 ++++++ tokio/src/runtime/task/error.rs | 163 ++++ tokio/src/runtime/task/harness.rs | 369 ++++++++ tokio/src/runtime/task/join.rs | 149 +++ tokio/src/runtime/task/mod.rs | 219 +++++ tokio/src/runtime/task/raw.rs | 131 +++ tokio/src/runtime/task/stack.rs | 81 ++ tokio/src/runtime/task/state.rs | 447 +++++++++ tokio/src/runtime/task/waker.rs | 101 ++ tokio/src/runtime/tests/loom_pool.rs | 381 ++++++++ tokio/src/runtime/tests/mod.rs | 13 +- tokio/src/runtime/tests/task.rs | 159 ++++ tokio/src/runtime/thread_pool/atomic_cell.rs | 52 ++ tokio/src/runtime/thread_pool/current.rs | 84 -- tokio/src/runtime/thread_pool/mod.rs | 92 +- tokio/src/runtime/thread_pool/owned.rs | 83 -- tokio/src/runtime/thread_pool/queue.rs | 568 +++++++++++ tokio/src/runtime/thread_pool/queue/global.rs | 209 ----- tokio/src/runtime/thread_pool/queue/inject.rs | 41 - tokio/src/runtime/thread_pool/queue/local.rs | 298 ------ tokio/src/runtime/thread_pool/queue/mod.rs | 41 - tokio/src/runtime/thread_pool/queue/worker.rs | 127 --- tokio/src/runtime/thread_pool/shared.rs | 94 -- tokio/src/runtime/thread_pool/shutdown.rs | 44 - tokio/src/runtime/thread_pool/slice.rs | 172 ---- tokio/src/runtime/thread_pool/spawner.rs | 49 - tokio/src/runtime/thread_pool/tests/loom_pool.rs | 308 ------ tokio/src/runtime/thread_pool/tests/loom_queue.rs | 69 -- tokio/src/runtime/thread_pool/tests/mod.rs | 8 - tokio/src/runtime/thread_pool/tests/queue.rs | 277 ------ tokio/src/runtime/thread_pool/worker.rs | 1033 +++++++++++---------- tokio/src/sync/notify.rs | 19 +- tokio/src/task/core.rs | 156 ---- tokio/src/task/error.rs | 163 ---- tokio/src/task/harness.rs | 558 ----------- tokio/src/task/join.rs | 157 ---- tokio/src/task/list.rs | 96 -- tokio/src/task/local.rs | 393 ++++---- tokio/src/task/mod.rs | 171 +--- tokio/src/task/queue.rs | 338 ------- tokio/src/task/raw.rs | 197 ---- tokio/src/task/stack.rs | 88 -- tokio/src/task/state.rs | 497 ---------- tokio/src/task/tests/loom.rs | 277 ------ tokio/src/task/tests/mod.rs | 5 - tokio/src/task/tests/task.rs | 661 ------------- tokio/src/task/waker.rs | 97 -- tokio/src/tests/backoff.rs | 32 - tokio/src/tests/loom_schedule.rs | 53 -- tokio/src/tests/mock_schedule.rs | 134 --- tokio/src/tests/mod.rs | 10 - tokio/src/tests/track_drop.rs | 57 -- tokio/src/util/linked_list.rs | 105 ++- tokio/src/util/mod.rs | 13 +- tokio/src/util/try_lock.rs | 21 +- tokio/src/util/wake.rs | 83 ++ tokio/tests/rt_common.rs | 116 ++- tokio/tests/task_local_set.rs | 33 +- 80 files changed, 4766 insertions(+), 6793 deletions(-) create mode 100644 benches/scheduler.rs create mode 100644 ci/azure-miri.yml delete mode 100644 tokio/src/loom/std/alloc.rs delete mode 100644 tokio/src/macros/assert.rs create mode 100644 tokio/src/macros/scoped_tls.rs create mode 100644 tokio/src/runtime/task/core.rs create mode 100644 tokio/src/runtime/task/error.rs create mode 100644 tokio/src/runtime/task/harness.rs create mode 100644 tokio/src/runtime/task/join.rs create mode 100644 tokio/src/runtime/task/mod.rs create mode 100644 tokio/src/runtime/task/raw.rs create mode 100644 tokio/src/runtime/task/stack.rs create mode 100644 tokio/src/runtime/task/state.rs create mode 100644 tokio/src/runtime/task/waker.rs create mode 100644 tokio/src/runtime/tests/loom_pool.rs create mode 100644 tokio/src/runtime/tests/task.rs create mode 100644 tokio/src/runtime/thread_pool/atomic_cell.rs delete mode 100644 tokio/src/runtime/thread_pool/current.rs delete mode 100644 tokio/src/runtime/thread_pool/owned.rs create mode 100644 tokio/src/runtime/thread_pool/queue.rs delete mode 100644 tokio/src/runtime/thread_pool/queue/global.rs delete mode 100644 tokio/src/runtime/thread_pool/queue/inject.rs delete mode 100644 tokio/src/runtime/thread_pool/queue/local.rs delete mode 100644 tokio/src/runtime/thread_pool/queue/mod.rs delete mode 100644 tokio/src/runtime/thread_pool/queue/worker.rs delete mode 100644 tokio/src/runtime/thread_pool/shared.rs delete mode 100644 tokio/src/runtime/thread_pool/shutdown.rs delete mode 100644 tokio/src/runtime/thread_pool/slice.rs delete mode 100644 tokio/src/runtime/thread_pool/spawner.rs delete mode 100644 tokio/src/runtime/thread_pool/tests/loom_pool.rs delete mode 100644 tokio/src/runtime/thread_pool/tests/loom_queue.rs delete mode 100644 tokio/src/runtime/thread_pool/tests/mod.rs delete mode 100644 tokio/src/runtime/thread_pool/tests/queue.rs delete mode 100644 tokio/src/task/core.rs delete mode 100644 tokio/src/task/error.rs delete mode 100644 tokio/src/task/harness.rs delete mode 100644 tokio/src/task/join.rs delete mode 100644 tokio/src/task/list.rs delete mode 100644 tokio/src/task/queue.rs delete mode 100644 tokio/src/task/raw.rs delete mode 100644 tokio/src/task/stack.rs delete mode 100644 tokio/src/task/state.rs delete mode 100644 tokio/src/task/tests/loom.rs delete mode 100644 tokio/src/task/tests/mod.rs delete mode 100644 tokio/src/task/tests/task.rs delete mode 100644 tokio/src/task/waker.rs delete mode 100644 tokio/src/tests/backoff.rs delete mode 100644 tokio/src/tests/loom_schedule.rs delete mode 100644 tokio/src/tests/mock_schedule.rs delete mode 100644 tokio/src/tests/mod.rs delete mode 100644 tokio/src/tests/track_drop.rs create mode 100644 tokio/src/util/wake.rs diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4743a66a..cc50f3c8 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -44,13 +44,10 @@ jobs: displayName: Test build permutations rust: stable -# Run loom tests -- template: ci/azure-loom.yml +# Run miri tests +- template: ci/azure-miri.yml parameters: - name: loom - rust: stable - crates: - - tokio + name: miri # Try cross compiling - template: ci/azure-cross-compile.yml @@ -99,16 +96,25 @@ jobs: # name: tsan # rust: stable +# Run loom tests +- template: ci/azure-loom.yml + parameters: + name: loom + rust: stable + - template: ci/azure-deploy-docs.yml parameters: rust: stable dependsOn: - rustfmt + - docs - clippy - test_tokio - test_linux + - test_integration - test_build - loom + - miri - cross - minrust - check_features diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 8c9ad1d3..f4a1d8fb 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -17,3 +17,8 @@ harness = false name = "mpsc" path = "mpsc.rs" harness = false + +[[bench]] +name = "scheduler" +path = "scheduler.rs" +harness = false diff --git a/benches/scheduler.rs b/benches/scheduler.rs new file mode 100644 index 00000000..0562a120 --- /dev/null +++ b/benches/scheduler.rs @@ -0,0 +1,152 @@ +//! Benchmark implementation details of the theaded scheduler. These benches are +//! intended to be used as a form of regression testing and not as a general +//! purpose benchmark demonstrating real-world performance. + +use tokio::runtime::{self, Runtime}; +use tokio::sync::oneshot; + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{mpsc, Arc}; + +fn spawn_many(b: &mut Bencher) { + const NUM_SPAWN: usize = 10_000; + + let mut rt = rt(); + + let (tx, rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + rem.store(NUM_SPAWN, Relaxed); + + rt.block_on(async { + for _ in 0..NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); + + tokio::spawn(async move { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } + + let _ = rx.recv().unwrap(); + }); + }); +} + +fn yield_many(b: &mut Bencher) { + const NUM_YIELD: usize = 1_000; + const TASKS: usize = 200; + + let rt = rt(); + + let (tx, rx) = mpsc::sync_channel(TASKS); + + b.iter(move || { + for _ in 0..TASKS { + let tx = tx.clone(); + + rt.spawn(async move { + for _ in 0..NUM_YIELD { + tokio::task::yield_now().await; + } + + tx.send(()).unwrap(); + }); + } + + for _ in 0..TASKS { + let _ = rx.recv().unwrap(); + } + }); +} + +fn ping_pong(b: &mut Bencher) { + const NUM_PINGS: usize = 1_000; + + let mut rt = rt(); + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + let done_tx = done_tx.clone(); + let rem = rem.clone(); + rem.store(NUM_PINGS, Relaxed); + + rt.block_on(async { + tokio::spawn(async move { + for _ in 0..NUM_PINGS { + let rem = rem.clone(); + let done_tx = done_tx.clone(); + + tokio::spawn(async move { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + tokio::spawn(async move { + rx1.await.unwrap(); + tx2.send(()).unwrap(); + }); + + tx1.send(()).unwrap(); + rx2.await.unwrap(); + + if 1 == rem.fetch_sub(1, Relaxed) { + done_tx.send(()).unwrap(); + } + }); + } + }); + + done_rx.recv().unwrap(); + }); + }); +} + +fn chained_spawn(b: &mut Bencher) { + const ITER: usize = 1_000; + + let mut rt = rt(); + + fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + tokio::spawn(async move { + iter(done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + + b.iter(move || { + let done_tx = done_tx.clone(); + + rt.block_on(async { + tokio::spawn(async move { + iter(done_tx, ITER); + }); + + done_rx.recv().unwrap(); + }); + }); +} + +fn rt() -> Runtime { + runtime::Builder::new() + .threaded_scheduler() + .core_threads(4) + .enable_all() + .build() + .unwrap() +} + +benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,); + +benchmark_main!(scheduler); diff --git a/ci/azure-check-features.yml b/ci/azure-check-features.yml index a80af0d3..f5985843 100644 --- a/ci/azure-check-features.yml +++ b/ci/azure-check-features.yml @@ -6,7 +6,7 @@ jobs: Linux: vmImage: ubuntu-16.04 MacOS: - vmImage: macOS-10.13 + vmImage: macos-latest Windows: vmImage: vs2017-win2016 pool: diff --git a/ci/azure-install-rust.yml b/ci/azure-install-rust.yml index 2b4feb2a..4cf5ca3f 100644 --- a/ci/azure-install-rust.yml +++ b/ci/azure-install-rust.yml @@ -2,6 +2,13 @@ steps: # Linux and macOS. - script: | set -e + + if [ "$RUSTUP_TOOLCHAIN" == "nightly" ]; then + echo "++ getting latest miri version" + export RUSTUP_TOOLCHAIN="nightly-$(curl -s https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/miri)" + echo "$RUSTUP_TOOLCHAIN" + fi + curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none export PATH=$PATH:$HOME/.cargo/bin rustup toolchain install $RUSTUP_TOOLCHAIN diff --git a/ci/azure-loom.yml b/ci/azure-loom.yml index fdfb8670..001aedec 100644 --- a/ci/azure-loom.yml +++ b/ci/azure-loom.yml @@ -1,6 +1,18 @@ jobs: - job: ${{ parameters.name }} displayName: Loom tests + strategy: + matrix: + rest: + scope: --skip loom_pool + pool_group_a: + scope: loom_pool::group_a + pool_group_b: + scope: loom_pool::group_b + pool_group_c: + scope: loom_pool::group_c + pool_group_d: + scope: loom_pool::group_d pool: vmImage: ubuntu-16.04 @@ -9,10 +21,9 @@ jobs: parameters: rust_version: ${{ parameters.rust }} - - ${{ each crate in parameters.crates }}: - - script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --test-threads=1 --nocapture - env: - LOOM_MAX_PREEMPTIONS: 1 - CI: 'True' - displayName: test ${{ crate }} - workingDirectory: $(Build.SourcesDirectory)/${{ crate }} + - script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --nocapture $(scope) + env: + LOOM_MAX_PREEMPTIONS: 2 + CI: 'True' + displayName: $(scope) + workingDirectory: $(Build.SourcesDirectory)/tokio diff --git a/ci/azure-miri.yml b/ci/azure-miri.yml new file mode 100644 index 00000000..fb886edc --- /dev/null +++ b/ci/azure-miri.yml @@ -0,0 +1,23 @@ +jobs: +- job: ${{ parameters.name }} + displayName: Miri + pool: + vmImage: ubuntu-16.04 + + steps: + - template: azure-install-rust.yml + parameters: + rust_version: nightly + + - script: | + rustup component add miri + cargo miri setup + rm -rf $(Build.SourcesDirectory)/tokio/tests + displayName: Install miri + + # TODO: enable all tests once they pass + - script: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task + env: + CI: 'True' + displayName: cargo miri test + workingDirectory: $(Build.SourcesDirectory)/tokio diff --git a/ci/azure-test-integration.yml b/ci/azure-test-integration.yml index fc45429a..f498a649 100644 --- a/ci/azure-test-integration.yml +++ b/ci/azure-test-integration.yml @@ -6,7 +6,7 @@ jobs: Linux: vmImage: ubuntu-16.04 MacOS: - vmImage: macOS-10.13 + vmImage: macos-latest Windows: vmImage: vs2017-win2016 pool: diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index a924c928..bc93febd 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -8,7 +8,7 @@ jobs: ${{ if parameters.cross }}: MacOS: - vmImage: macOS-10.13 + vmImage: macos-latest Windows: vmImage: vs2017-win2016 pool: diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 843a1e64..1498c2b5 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -382,10 +382,6 @@ cfg_macros! { } } -// Tests -#[cfg(test)] -mod tests; - // TODO: rm #[cfg(feature = "io-util")] #[cfg(test)] diff --git a/tokio/src/loom/std/alloc.rs b/tokio/src/loom/std/alloc.rs deleted file mode 100644 index 25b199b1..00000000 --- a/tokio/src/loom/std/alloc.rs +++ /dev/null @@ -1,18 +0,0 @@ -#[derive(Debug)] -pub(crate) struct Track { - value: T, -} - -impl Track { - pub(crate) fn new(value: T) -> Track { - Track { value } - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.value - } - - pub(crate) fn into_inner(self) -> T { - self.value - } -} diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs index c4917e5f..8300437a 100644 --- a/tokio/src/loom/std/causal_cell.rs +++ b/tokio/src/loom/std/causal_cell.rs @@ -18,15 +18,6 @@ impl CausalCell { f(self.0.get()) } - pub(crate) fn with_unchecked(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn check(&self) {} - pub(crate) fn with_deferred(&self, f: F) -> (R, CausalCheck) where F: FnOnce(*const T) -> R, diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index e4bae357..a56d778a 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -5,8 +5,6 @@ mod atomic_u64; mod atomic_usize; mod causal_cell; -pub(crate) mod alloc; - pub(crate) mod cell { pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; } diff --git a/tokio/src/macros/assert.rs b/tokio/src/macros/assert.rs deleted file mode 100644 index 4b1cf272..00000000 --- a/tokio/src/macros/assert.rs +++ /dev/null @@ -1,18 +0,0 @@ -/// Asserts option is some -macro_rules! assert_some { - ($e:expr) => {{ - match $e { - Some(v) => v, - _ => panic!("expected some, was none"), - } - }}; -} - -/// Asserts option is none -macro_rules! assert_none { - ($e:expr) => {{ - if let Some(v) = $e { - panic!("expected none, was {:?}", v); - } - }}; -} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index a37b3e49..2643c360 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -1,9 +1,5 @@ #![cfg_attr(not(feature = "full"), allow(unused_macros))] -#[macro_use] -#[cfg(test)] -mod assert; - #[macro_use] mod cfg; @@ -19,6 +15,10 @@ mod ready; #[macro_use] mod thread_local; +#[macro_use] +#[cfg(feature = "rt-core")] +pub(crate) mod scoped_tls; + cfg_macros! { #[macro_use] mod select; diff --git a/tokio/src/macros/scoped_tls.rs b/tokio/src/macros/scoped_tls.rs new file mode 100644 index 00000000..666f382b --- /dev/null +++ b/tokio/src/macros/scoped_tls.rs @@ -0,0 +1,80 @@ +use crate::loom::thread::LocalKey; + +use std::cell::Cell; +use std::marker; + +/// Set a reference as a thread-local +#[macro_export] +macro_rules! scoped_thread_local { + ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => ( + $(#[$attrs])* + $vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty> + = $crate::macros::scoped_tls::ScopedKey { + inner: { + thread_local!(static FOO: ::std::cell::Cell<*const ()> = { + std::cell::Cell::new(::std::ptr::null()) + }); + &FOO + }, + _marker: ::std::marker::PhantomData, + }; + ) +} + +/// Type representing a thread local storage key corresponding to a reference +/// to the type parameter `T`. +pub(crate) struct ScopedKey { + #[doc(hidden)] + pub(crate) inner: &'static LocalKey>, + #[doc(hidden)] + pub(crate) _marker: marker::PhantomData, +} + +unsafe impl Sync for ScopedKey {} + +impl ScopedKey { + /// Inserts a value into this scoped thread local storage slot for a + /// duration of a closure. + pub(crate) fn set(&'static self, t: &T, f: F) -> R + where + F: FnOnce() -> R, + { + struct Reset { + key: &'static LocalKey>, + val: *const (), + } + + impl Drop for Reset { + fn drop(&mut self) { + self.key.with(|c| c.set(self.val)); + } + } + + let prev = self.inner.with(|c| { + let prev = c.get(); + c.set(t as *const _ as *const ()); + prev + }); + + let _reset = Reset { + key: self.inner, + val: prev, + }; + + f() + } + + /// Gets a value out of this scoped variable. + pub(crate) fn with(&'static self, f: F) -> R + where + F: FnOnce(Option<&T>) -> R, + { + let val = self.inner.with(|c| c.get()); + + if val.is_null() { + f(None) + } else { + unsafe { f(Some(&*(val as *const T))) } + } + } +} diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 12ef9717..a8cdf143 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -129,6 +129,10 @@ impl Inner { return; } + if dur == Duration::from_millis(0) { + return; + } + let m = self.mutex.lock().unwrap(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index f625920d..a494f9e3 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,80 +1,110 @@ use crate::park::{Park, Unpark}; -use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task}; +use crate::runtime; +use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::util::linked_list::LinkedList; +use crate::util::{waker_ref, Wake}; -use std::cell::Cell; +use std::cell::RefCell; +use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::mem::ManuallyDrop; -use std::ptr; -use std::sync::Arc; -use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::{Arc, Mutex}; +use std::task::Poll::Ready; use std::time::Duration; /// Executes tasks on the current thread -#[derive(Debug)] pub(crate) struct BasicScheduler

where P: Park, { - /// Scheduler component - scheduler: Arc, + /// Scheduler run queue + /// + /// When the scheduler is executed, the queue is removed from `self` and + /// moved into `Context`. + /// + /// This indirection is to allow `BasicScheduler` to be `Send`. + tasks: Option, - /// Local state - local: LocalState

, + /// Sendable task spawner + spawner: Spawner, + + /// Current tick + tick: u8, + + /// Thread park handle + park: P, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct Spawner { - scheduler: Arc, + shared: Arc, +} + +struct Tasks { + /// Collection of all active tasks spawned onto this executor. + owned: LinkedList>>, + + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + queue: VecDeque>>, } -/// The scheduler component. -pub(super) struct SchedulerPriv { - queues: MpscQueues, +/// Scheduler state shared between threads. +struct Shared { + /// Remote run queue + queue: Mutex>>>, + /// Unpark the blocked thread unpark: Box, } -unsafe impl Send for SchedulerPriv {} -unsafe impl Sync for SchedulerPriv {} +/// Thread-local context +struct Context { + /// Shared scheduler state + shared: Arc, -/// Local state -#[derive(Debug)] -struct LocalState

{ - /// Current tick - tick: u8, - - /// Thread park handle - park: P, + /// Local queue + tasks: RefCell, } +/// Initial queue capacity +const INITIAL_CAPACITY: usize = 64; + /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -thread_local! { - static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null()) -} +/// How often ot check the remote queue first +const REMOTE_FIRST_INTERVAL: u8 = 31; + +// Tracks the current BasicScheduler +scoped_thread_local!(static CURRENT: Context); impl

BasicScheduler

where P: Park, { pub(crate) fn new(park: P) -> BasicScheduler

{ - let unpark = park.unpark(); + let unpark = Box::new(park.unpark()); BasicScheduler { - scheduler: Arc::new(SchedulerPriv { - queues: MpscQueues::new(), - unpark: Box::new(unpark), + tasks: Some(Tasks { + owned: LinkedList::new(), + queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - local: LocalState { tick: 0, park }, + spawner: Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box, + }), + }, + tick: 0, + park, } } - pub(crate) fn spawner(&self) -> Spawner { - Spawner { - scheduler: self.scheduler.clone(), - } + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner } /// Spawns a future onto the thread pool @@ -83,74 +113,146 @@ where F: Future + Send + 'static, F::Output: Send + 'static, { - let (task, handle) = task::joinable(future); - self.scheduler.schedule(task, true); - handle + self.spawner.spawn(future) } - pub(crate) fn block_on(&mut self, mut future: F) -> F::Output + pub(crate) fn block_on(&mut self, future: F) -> F::Output where F: Future, { - use crate::runtime; - use std::pin::Pin; - use std::task::Context; - use std::task::Poll::Ready; + enter(self, |scheduler, context| { + let _enter = runtime::enter(); + let waker = waker_ref(&scheduler.spawner.shared); + let mut cx = std::task::Context::from_waker(&waker); - let local = &mut self.local; - let scheduler = &*self.scheduler; + pin!(future); - struct Guard { - old: *const SchedulerPriv, - } + 'outer: loop { + if let Ready(v) = future.as_mut().poll(&mut cx) { + return v; + } - impl Drop for Guard { - fn drop(&mut self) { - ACTIVE.with(|cell| cell.set(self.old)); - } - } + for _ in 0..MAX_TASKS_PER_TICK { + // Get and increment the current tick + let tick = scheduler.tick; + scheduler.tick = scheduler.tick.wrapping_add(1); + + let next = if tick % REMOTE_FIRST_INTERVAL == 0 { + scheduler + .spawner + .pop() + .or_else(|| context.tasks.borrow_mut().queue.pop_front()) + } else { + context + .tasks + .borrow_mut() + .queue + .pop_front() + .or_else(|| scheduler.spawner.pop()) + }; + + match next { + Some(task) => task.run(), + None => { + // Park until the thread is signaled + scheduler.park.park().ok().expect("failed to park"); + + // Try polling the `block_on` future next + continue 'outer; + } + } + } - // Track the current scheduler - let _guard = ACTIVE.with(|cell| { - let guard = Guard { old: cell.get() }; + // Yield to the park, this drives the timer and pulls any pending + // I/O events. + scheduler + .park + .park_timeout(Duration::from_millis(0)) + .ok() + .expect("failed to park"); + } + }) + } +} - cell.set(scheduler as *const SchedulerPriv); +/// Enter the scheduler context. This sets the queue and other necessary +/// scheduler state in the thread-local +fn enter(scheduler: &mut BasicScheduler

, f: F) -> R +where + F: FnOnce(&mut BasicScheduler

, &Context) -> R, + P: Park, +{ + // Ensures the run queue is placed back in the `BasicScheduler` instance + // once `block_on` returns.` + struct Guard<'a, P: Park> { + context: Option, + scheduler: &'a mut BasicScheduler

, + } - guard - }); + impl Drop for Guard<'_, P> { + fn drop(&mut self) { + let Context { tasks, .. } = self.context.take().expect("context missing"); + self.scheduler.tasks = Some(tasks.into_inner()); + } + } - let mut _enter = runtime::enter(); + // Remove `tasks` from `self` and place it in a `Context`. + let tasks = scheduler.tasks.take().expect("invalid state"); - let raw_waker = RawWaker::new( - scheduler as *const SchedulerPriv as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), - ); + let guard = Guard { + context: Some(Context { + shared: scheduler.spawner.shared.clone(), + tasks: RefCell::new(tasks), + }), + scheduler, + }; - let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); - let mut cx = Context::from_waker(&waker); + let context = guard.context.as_ref().unwrap(); + let scheduler = &mut *guard.scheduler; - // `block_on` takes ownership of `f`. Once it is pinned here, the - // original `f` binding can no longer be accessed, making the - // pinning safe. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; + CURRENT.set(context, || f(scheduler, context)) +} - loop { - if let Ready(v) = future.as_mut().poll(&mut cx) { - return v; +impl

Drop for BasicScheduler

+where + P: Park, +{ + fn drop(&mut self) { + enter(self, |scheduler, context| { + // Loop required here to ensure borrow is dropped between iterations + #[allow(clippy::while_let_loop)] + loop { + let task = match context.tasks.borrow_mut().owned.pop_back() { + Some(task) => task, + None => break, + }; + + task.shutdown(); } - scheduler.tick(local); + // Drain local queue + for task in context.tasks.borrow_mut().queue.drain(..) { + task.shutdown(); + } - // Maintenance work - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on (which we are). - scheduler.queues.drain_pending_drop(); + // Drain remote queue + for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + task.shutdown(); } - } + + assert!(context.tasks.borrow().owned.is_empty()); + }); + } +} + +impl fmt::Debug for BasicScheduler

{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BasicScheduler").finish() } } +// ===== impl Spawner ===== + impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn(&self, future: F) -> JoinHandle @@ -159,177 +261,66 @@ impl Spawner { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.scheduler.schedule(task, true); + self.shared.schedule(task); handle } -} - -// === impl SchedulerPriv === - -impl SchedulerPriv { - fn tick(&self, local: &mut LocalState) { - for _ in 0..MAX_TASKS_PER_TICK { - // Get the current tick - let tick = local.tick; - - // Increment the tick - local.tick = tick.wrapping_add(1); - let next = unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. The `LocalState` - // parameter to this method implies that we are on that thread. - self.queues.next_task(tick) - }; - - let task = match next { - Some(task) => task, - None => { - local.park.park().ok().expect("failed to park"); - return; - } - }; - - if let Some(task) = task.run(&mut || Some(self.into())) { - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. The `LocalState` - // parameter to this method implies that we are on that thread. - self.queues.push_local(task); - } - } - } - - local - .park - .park_timeout(Duration::from_millis(0)) - .ok() - .expect("failed to park"); - } - - /// Schedule the provided task on the scheduler. - /// - /// If this scheduler is the `ACTIVE` scheduler, enqueue this task on the local queue, otherwise - /// the task is enqueued on the remote queue. - fn schedule(&self, task: Task, spawn: bool) { - let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); - if is_current { - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. If `is_current` is - // then we are on that thread. - self.queues.push_local(task) - }; - } else { - let mut lock = self.queues.remote(); - lock.schedule(task, spawn); - - // while locked, call unpark - self.unpark.unpark(); - - drop(lock); - } + fn pop(&self) -> Option>> { + self.shared.queue.lock().unwrap().pop_front() } } -impl Schedule for SchedulerPriv { - fn bind(&self, task: &Task) { - unsafe { - // safety: `Queues::add_task` is only safe to call from the thread - // that owns the queues (the thread the scheduler is running on). - // `Scheduler::bind` is called when polling a task that - // doesn't have a scheduler set. We will only poll new tasks from - // the thread that the scheduler is running on. Therefore, this is - // safe to call. - self.queues.add_task(task); - } +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() } +} - fn release(&self, task: Task) { - self.queues.release_remote(task); - } +// ===== impl Shared ===== - fn release_local(&self, task: &Task) { - unsafe { - // safety: `Scheduler::release_local` is only called from the - // thread that the scheduler is running on. The `Schedule` trait's - // contract is that releasing a task from another thread should call - // `release` rather than `release_local`. - self.queues.release_local(task); - } +impl Schedule for Arc { + fn bind(task: Task) -> Arc { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.clone() + }) } - fn schedule(&self, task: Task) { - SchedulerPriv::schedule(self, task, false); - } -} + fn release(&self, task: &Task) -> Option> { + use std::ptr::NonNull; -impl ScheduleSendOnly for SchedulerPriv {} + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); -impl

Drop for BasicScheduler

-where - P: Park, -{ - fn drop(&mut self) { - unsafe { - // safety: the `Drop` impl owns the scheduler's queues. these fields - // will only be accessed when running the scheduler, and it can no - // longer be run, since we are in the process of dropping it. - - // Shut down the task queues. - self.scheduler.queues.shutdown(); - } - - // Wait until all tasks have been released. - loop { + // safety: the task is inserted in the list in `bind`. unsafe { - self.scheduler.queues.drain_pending_drop(); - self.scheduler.queues.drain_queues(); - - if !self.scheduler.queues.has_tasks_remaining() { - break; - } - - self.local.park.park().ok().expect("park failed"); + let ptr = NonNull::from(task.header()); + cx.tasks.borrow_mut().owned.remove(ptr) } - } + }) } -} -impl fmt::Debug for SchedulerPriv { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scheduler") - .field("queues", &self.queues) - .finish() + fn schedule(&self, task: task::Notified) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.shared) => { + cx.tasks.borrow_mut().queue.push_back(task); + } + _ => { + self.queue.lock().unwrap().push_back(task); + self.unpark.unpark(); + } + }); } } -unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker { - let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv)); - - #[allow(clippy::redundant_clone)] - let s2 = s1.clone(); - - RawWaker::new( - &**s2 as *const SchedulerPriv as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop), - ) -} - -unsafe fn sched_wake(ptr: *const ()) { - let scheduler = Arc::from_raw(ptr as *const SchedulerPriv); - scheduler.unpark.unpark(); -} - -unsafe fn sched_wake_by_ref(ptr: *const ()) { - let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv)); - scheduler.unpark.unpark(); -} - -unsafe fn sched_drop(ptr: *const ()) { - let _ = Arc::from_raw(ptr as *const SchedulerPriv); -} +impl Wake for Shared { + fn wake(self: Arc) { + Wake::wake_by_ref(&self) + } -unsafe fn sched_noop(_ptr: *const ()) { - unreachable!(); + /// Wake by reference + fn wake_by_ref(arc_self: &Arc) { + arc_self.unpark.unpark(); + } } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 0b9d2209..a3b208d1 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -5,8 +5,8 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; +use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; -use crate::task::{self, JoinHandle}; use std::collections::VecDeque; use std::fmt; @@ -53,7 +53,7 @@ struct Shared { shutdown_tx: Option, } -type Task = task::Task; +type Task = task::Notified; const KEEP_ALIVE: Duration = Duration::from_secs(10); @@ -227,7 +227,7 @@ impl Inner { // BUSY while let Some(task) = shared.queue.pop_front() { drop(shared); - run_task(task); + task.run(); shared = self.shared.lock().unwrap(); } @@ -305,9 +305,3 @@ impl fmt::Debug for Spawner { fmt.debug_struct("blocking::Spawner").finish() } } - -fn run_task(f: Task) { - let scheduler: &'static NoopSchedule = &NoopSchedule; - let res = f.run(|| Some(scheduler.into())); - assert!(res.is_none()); -} diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 5d2cd5f5..e10778d5 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,20 +1,24 @@ -use crate::task::{Schedule, ScheduleSendOnly, Task}; +use crate::runtime::task::{self, Task}; /// `task::Schedule` implementation that does nothing. This is unique to the /// blocking scheduler as tasks scheduled are not really futures but blocking /// operations. +/// +/// We avoid storing the task by forgetting it in `bind` and re-materializing it +/// in `release. pub(super) struct NoopSchedule; -impl Schedule for NoopSchedule { - fn bind(&self, _task: &Task) {} - - fn release(&self, _task: Task) {} +impl task::Schedule for NoopSchedule { + fn bind(_task: Task) -> NoopSchedule { + // Do nothing w/ the task + NoopSchedule + } - fn release_local(&self, _task: &Task) {} + fn release(&self, _task: &Task) -> Option> { + None + } - fn schedule(&self, _task: Task) { + fn schedule(&self, _task: task::Notified) { unreachable!(); } } - -impl ScheduleSendOnly for NoopSchedule {} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index dce265c0..cfde9982 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -425,7 +425,7 @@ cfg_rt_core! { // the reactor to generate some new stimuli for the futures to continue // in their life. let scheduler = BasicScheduler::new(driver); - let spawner = Spawner::Basic(scheduler.spawner()); + let spawner = Spawner::Basic(scheduler.spawner().clone()); // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); @@ -470,7 +470,7 @@ cfg_rt_threaded! { let (io_driver, io_handle) = io::create_driver(self.enable_io)?; let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); - let (scheduler, workers) = ThreadPool::new(core_threads, Parker::new(driver)); + let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool @@ -487,7 +487,7 @@ cfg_rt_threaded! { }; // Spawn the thread pool workers - workers.spawn(&handle); + handle.enter(|| launch.launch()); Ok(Runtime { kind: Kind::ThreadPool(scheduler), diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 6922ef59..3aafc406 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -187,11 +187,14 @@ #[cfg(test)] #[macro_use] mod tests; + pub(crate) mod context; cfg_rt_core! { mod basic_scheduler; use basic_scheduler::BasicScheduler; + + pub(crate) mod task; } mod blocking; @@ -215,7 +218,7 @@ mod io; cfg_rt_threaded! { mod park; - use park::{Parker, Unparker}; + use park::Parker; } mod shell; @@ -334,7 +337,7 @@ impl Runtime { /// [threaded scheduler]: index.html#threaded-scheduler /// [basic scheduler]: index.html#basic-scheduler /// [runtime builder]: crate::runtime::Builder - pub fn new() -> io::Result { + pub fn new() -> io::Result { #[cfg(feature = "rt-threaded")] let ret = Builder::new().threaded_scheduler().enable_all().build(); diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs new file mode 100644 index 00000000..43e6b471 --- /dev/null +++ b/tokio/src/runtime/task/core.rs @@ -0,0 +1,280 @@ +use crate::loom::cell::CausalCell; +use crate::runtime::task::raw::{self, Vtable}; +use crate::runtime::task::state::State; +use crate::runtime::task::waker::waker_ref; +use crate::runtime::task::{Notified, Schedule, Task}; +use crate::util::linked_list; + +use std::cell::UnsafeCell; +use std::future::Future; +use std::pin::Pin; +use std::ptr::NonNull; +use std::task::{Context, Poll, Waker}; + +/// The task cell. Contains the components of the task. +/// +/// It is critical for `Header` to be the first field as the task structure will +/// be referenced by both *mut Cell and *mut Header. +#[repr(C)] +pub(super) struct Cell { + /// Hot task state data + pub(super) header: Header, + + /// Either the future or output, depending on the execution stage. + pub(super) core: Core, + + /// Cold data + pub(super) trailer: Trailer, +} + +/// The core of the task. +/// +/// Holds the future or output, depending on the stage of execution. +pub(super) struct Core { + /// Scheduler used to drive this future + pub(super) scheduler: CausalCell>, + + /// Either the future or the output + pub(super) stage: CausalCell>, +} + +/// Crate public as this is also needed by the pool. +#[repr(C)] +pub(crate) struct Header { + /// Task state + pub(super) state: State, + + pub(crate) owned: UnsafeCell>, + + /// Pointer to next task, used with the injection queue + pub(crate) queue_next: UnsafeCell>>, + + /// Pointer to the next task in the transfer stack + pub(super) stack_next: UnsafeCell>>, + + /// Table of function pointers for executing actions on the task. + pub(super) vtable: &'static Vtable, +} + +unsafe impl Send for Header {} +unsafe impl Sync for Header {} + +/// Cold data is stored after the future. +pub(super) struct Trailer { + /// Consumer task waiting on completion of this task. + pub(super) waker: CausalCell>, +} + +/// Either the future or the output. +pub(super) enum Stage { + Running(T), + Finished(super::Result), + Consumed, +} + +impl Cell { + /// Allocates a new task cell, containing the header, trailer, and core + /// structures. + pub(super) fn new(future: T, state: State) -> Box> { + Box::new(Cell { + header: Header { + state, + owned: UnsafeCell::new(linked_list::Pointers::new()), + queue_next: UnsafeCell::new(None), + stack_next: UnsafeCell::new(None), + vtable: raw::vtable::(), + }, + core: Core { + scheduler: CausalCell::new(None), + stage: CausalCell::new(Stage::Running(future)), + }, + trailer: Trailer { + waker: CausalCell::new(None), + }, + }) + } +} + +impl Core { + /// If needed, bind a scheduler to the task. + /// + /// This only happens on the first poll. + pub(super) fn bind_scheduler(&self, task: Task) { + use std::mem::ManuallyDrop; + + // TODO: it would be nice to not have to wrap with a ManuallyDrop + let task = ManuallyDrop::new(task); + + // This function may be called concurrently, but the __first__ time it + // is called, the caller has unique access to this field. All subsequent + // concurrent calls will be via the `Waker`, which will "happens after" + // the first poll. + // + // In other words, it is always safe to read the field and it is safe to + // write to the field when it is `None`. + if self.is_bound() { + return; + } + + // Bind the task to the scheduler + let scheduler = S::bind(ManuallyDrop::into_inner(task)); + + // Safety: As `scheduler` is not set, this is the first poll + self.scheduler.with_mut(|ptr| unsafe { + *ptr = Some(scheduler); + }); + } + + /// Returns true if the task is bound to a scheduler. + pub(super) fn is_bound(&self) -> bool { + // Safety: never called concurrently w/ a mutation. + self.scheduler.with(|ptr| unsafe { (*ptr).is_some() }) + } + + /// Poll the future + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `state` field. This + /// requires ensuring mutal exclusion between any concurrent thread that + /// might modify the future or output field. + /// + /// The mutual exclusion is implemented by `Harness` and the `Lifecycle` + /// component of the task state. + /// + /// `self` must also be pinned. This is handled by storing the task on the + /// heap. + pub(super) fn poll(&self, header: &Header) -> Poll { + let res = { + self.stage.with_mut(|ptr| { + // Safety: The caller ensures mutual exclusion to the field. + let future = match unsafe { &mut *ptr } { + Stage::Running(future) => future, + _ => unreachable!("unexpected stage"), + }; + + // Safety: The caller ensures the future is pinned. + let future = unsafe { Pin::new_unchecked(future) }; + + // The waker passed into the `poll` function does not require a ref + // count increment. + let waker_ref = waker_ref::(header); + let mut cx = Context::from_waker(&*waker_ref); + + future.poll(&mut cx) + }) + }; + + if res.is_ready() { + self.drop_future_or_output(); + } + + res + } + + /// Drop the future + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn drop_future_or_output(&self) { + self.stage.with_mut(|ptr| { + // Safety: The caller ensures mutal exclusion to the field. + unsafe { *ptr = Stage::Consumed }; + }); + } + + /// Store the task output + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn store_output(&self, output: super::Result) { + self.stage.with_mut(|ptr| { + // Safety: the caller ensures mutual exclusion to the field. + unsafe { *ptr = Stage::Finished(output) }; + }); + } + + /// Take the task output + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn take_output(&self) -> super::Result { + use std::mem; + + self.stage.with_mut(|ptr| { + // Safety:: the caller ensures mutal exclusion to the field. + match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { + Stage::Finished(output) => output, + _ => panic!("unexpected task state"), + } + }) + } + + /// Schedule the future for execution + pub(super) fn schedule(&self, task: Notified) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.schedule(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Schedule the future for execution in the near future, yielding the + /// thread to other tasks. + pub(super) fn yield_now(&self, task: Notified) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.yield_now(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Release the task + /// + /// If the `Scheduler` implementation is able to, it returns the `Task` + /// handle immediately. The caller of this function will batch a ref-dec + /// with a state change. + pub(super) fn release(&self, task: Task) -> Option> { + use std::mem::ManuallyDrop; + + let task = ManuallyDrop::new(task); + + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.release(&*task), + // Task was never polled + None => None, + } + }) + } +} + +cfg_rt_threaded! { + impl Header { + pub(crate) fn shutdown(&self) { + use crate::runtime::task::RawTask; + + let task = unsafe { RawTask::from_raw(self.into()) }; + task.shutdown(); + } + } +} + +#[test] +#[cfg(not(loom))] +fn header_lte_cache_line() { + use std::mem::size_of; + + assert!(size_of::

() <= 8 * size_of::<*const ()>()); +} diff --git a/tokio/src/runtime/task/error.rs b/tokio/src/runtime/task/error.rs new file mode 100644 index 00000000..d5f65a49 --- /dev/null +++ b/tokio/src/runtime/task/error.rs @@ -0,0 +1,163 @@ +use std::any::Any; +use std::fmt; +use std::io; +use std::sync::Mutex; + +doc_rt_core! { + /// Task failed to execute to completion. + pub struct JoinError { + repr: Repr, + } +} + +enum Repr { + Cancelled, + Panic(Mutex>), +} + +impl JoinError { + #[doc(hidden)] + #[deprecated] + pub fn cancelled() -> JoinError { + Self::cancelled2() + } + + pub(crate) fn cancelled2() -> JoinError { + JoinError { + repr: Repr::Cancelled, + } + } + + #[doc(hidden)] + #[deprecated] + pub fn panic(err: Box) -> JoinError { + Self::panic2(err) + } + + pub(crate) fn panic2(err: Box) -> JoinError { + JoinError { + repr: Repr::Panic(Mutex::new(err)), + } + } + + /// Returns true if the error was caused by the task being cancelled + pub fn is_cancelled(&self) -> bool { + match &self.repr { + Repr::Cancelled => true, + _ => false, + } + } + + /// Returns true if the error was caused by the task panicking + /// + /// # Examples + /// + /// ``` + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// assert!(err.is_panic()); + /// } + /// ``` + pub fn is_panic(&self) -> bool { + match &self.repr { + Repr::Panic(_) => true, + _ => false, + } + } + + /// Consumes the join error, returning the object with which the task panicked. + /// + /// # Panics + /// + /// `into_panic()` panics if the `Error` does not represent the underlying + /// task terminating with a panic. Use `is_panic` to check the error reason + /// or `try_into_panic` for a variant that does not panic. + /// + /// # Examples + /// + /// ```should_panic + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// if err.is_panic() { + /// // Resume the panic on the main task + /// panic::resume_unwind(err.into_panic()); + /// } + /// } + /// ``` + pub fn into_panic(self) -> Box { + self.try_into_panic() + .expect("`JoinError` reason is not a panic.") + } + + /// Consumes the join error, returning the object with which the task + /// panicked if the task terminated due to a panic. Otherwise, `self` is + /// returned. + /// + /// # Examples + /// + /// ```should_panic + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// if let Ok(reason) = err.try_into_panic() { + /// // Resume the panic on the main task + /// panic::resume_unwind(reason); + /// } + /// } + /// ``` + pub fn try_into_panic(self) -> Result, JoinError> { + match self.repr { + Repr::Panic(p) => Ok(p.into_inner().expect("Extracting panic from mutex")), + _ => Err(self), + } + } +} + +impl fmt::Display for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Cancelled => write!(fmt, "cancelled"), + Repr::Panic(_) => write!(fmt, "panic"), + } + } +} + +impl fmt::Debug for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Cancelled => write!(fmt, "JoinError::Cancelled"), + Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"), + } + } +} + +impl std::error::Error for JoinError {} + +impl From for io::Error { + fn from(src: JoinError) -> io::Error { + io::Error::new( + io::ErrorKind::Other, + match src.repr { + Repr::Cancelled => "task was cancelled", + Repr::Panic(_) => "task panicked", + }, + ) + } +} diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs new file mode 100644 index 00000000..f9cf5e75 --- /dev/null +++ b/tokio/src/runtime/task/harness.rs @@ -0,0 +1,369 @@ +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; +use crate::runtime::task::state::Snapshot; +use crate::runtime::task::{JoinError, Notified, Schedule, Task}; + +use std::future::Future; +use std::mem; +use std::panic; +use std::ptr::NonNull; +use std::task::{Poll, Waker}; + +/// Typed raw task handle +pub(super) struct Harness { + cell: NonNull>, +} + +impl Harness +where + T: Future, + S: 'static, +{ + pub(super) unsafe fn from_raw(ptr: NonNull
) -> Harness { + Harness { + cell: ptr.cast::>(), + } + } + + fn header(&self) -> &Header { + unsafe { &self.cell.as_ref().header } + } + + fn trailer(&self) -> &Trailer { + unsafe { &self.cell.as_ref().trailer } + } + + fn core(&self) -> &Core { + unsafe { &self.cell.as_ref().core } + } +} + +impl Harness +where + T: Future, + S: Schedule, +{ + /// Polls the inner future. + /// + /// All necessary state checks and transitions are performed. + /// + /// Panics raised while polling the future are handled. + pub(super) fn poll(self) { + // If this is the first time the task is polled, the task will be bound + // to the scheduler, in which case the task ref count must be + // incremented. + let ref_inc = !self.core().is_bound(); + + // Transition the task to the running state. + // + // A failure to transition here indicates the task has been cancelled + // while in the run queue pending execution. + let snapshot = match self.header().state.transition_to_running(ref_inc) { + Ok(snapshot) => snapshot, + Err(_) => { + // The task was shutdown while in the run queue. At this point, + // we just hold a ref counted reference. Drop it here. + self.drop_reference(); + return; + } + }; + + // Ensure the task is bound to a scheduler instance. If this is the + // first time polling the task, a scheduler instance is pulled from the + // local context and assigned to the task. + // + // The scheduler maintains ownership of the task and responds to `wake` + // calls. + // + // The task reference count has been incremented. + self.core().bind_scheduler(self.to_task()); + + // The transition to `Running` done above ensures that a lock on the + // future has been obtained. This also ensures the `*mut T` pointer + // contains the future (as opposed to the output) and is initialized. + + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + struct Guard<'a, T: Future, S: Schedule> { + core: &'a Core, + polled: bool, + } + + impl Drop for Guard<'_, T, S> { + fn drop(&mut self) { + if !self.polled { + self.core.drop_future_or_output(); + } + } + } + + let mut guard = Guard { + core: self.core(), + polled: false, + }; + + // If the task is cancelled, avoid polling it, instead signalling it + // is complete. + if snapshot.is_cancelled() { + Poll::Ready(Err(JoinError::cancelled2())) + } else { + let res = guard.core.poll(self.header()); + + // prevent the guard from dropping the future + guard.polled = true; + + res.map(Ok) + } + })); + + match res { + Ok(Poll::Ready(out)) => { + self.complete(out, snapshot.is_join_interested()); + } + Ok(Poll::Pending) => { + match self.header().state.transition_to_idle() { + Ok(snapshot) => { + if snapshot.is_notified() { + // Signal yield + self.core().yield_now(Notified(self.to_task())); + } + } + Err(_) => self.cancel_task(), + } + } + Err(err) => { + self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested()); + } + } + } + + pub(super) fn dealloc(self) { + // Release the join waker, if there is one. + self.trailer().waker.with_mut(|_| ()); + + // Check causality + self.core().stage.with_mut(|_| {}); + self.core().scheduler.with_mut(|_|