From c62ef2d232dea1535a8e22484fa2ca083f03e903 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 28 Oct 2019 21:40:29 -0700 Subject: executor: move into `tokio` crate (#1702) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The executor implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- Cargo.toml | 1 - azure-pipelines.yml | 39 +- ci/azure-tsan.yml | 2 - ci/patch.toml | 1 - tests-build/Cargo.toml | 3 +- tokio-executor/CHANGELOG.md | 81 -- tokio-executor/Cargo.toml | 52 -- tokio-executor/LICENSE | 25 - tokio-executor/README.md | 13 - tokio-executor/benches/thread_pool.rs | 161 ---- tokio-executor/src/blocking/builder.rs | 57 -- tokio-executor/src/blocking/mod.rs | 332 -------- tokio-executor/src/current_thread/mod.rs | 873 --------------------- tokio-executor/src/current_thread/scheduler.rs | 808 ------------------- tokio-executor/src/enter.rs | 139 ---- tokio-executor/src/error.rs | 49 -- tokio-executor/src/executor.rs | 180 ----- tokio-executor/src/global.rs | 222 ------ tokio-executor/src/lib.rs | 102 --- tokio-executor/src/loom/mod.rs | 27 - tokio-executor/src/loom/std/atomic_u32.rs | 44 -- tokio-executor/src/loom/std/atomic_usize.rs | 45 -- tokio-executor/src/loom/std/causal_cell.rs | 48 -- tokio-executor/src/loom/std/mod.rs | 77 -- tokio-executor/src/park/mod.rs | 140 ---- tokio-executor/src/park/thread.rs | 263 ------- tokio-executor/src/task/core.rs | 153 ---- tokio-executor/src/task/error.rs | 48 -- tokio-executor/src/task/harness.rs | 546 ------------- tokio-executor/src/task/join.rs | 74 -- tokio-executor/src/task/list.rs | 70 -- tokio-executor/src/task/mod.rs | 130 --- tokio-executor/src/task/raw.rs | 190 ----- tokio-executor/src/task/stack.rs | 85 -- tokio-executor/src/task/state.rs | 502 ------------ tokio-executor/src/task/tests/loom.rs | 277 ------- tokio-executor/src/task/tests/mod.rs | 5 - tokio-executor/src/task/tests/task.rs | 644 --------------- tokio-executor/src/task/waker.rs | 107 --- tokio-executor/src/tests/backoff.rs | 32 - tokio-executor/src/tests/loom_oneshot.rs | 49 -- tokio-executor/src/tests/loom_schedule.rs | 51 -- tokio-executor/src/tests/mock_park.rs | 66 -- tokio-executor/src/tests/mock_schedule.rs | 131 ---- tokio-executor/src/tests/mod.rs | 40 - tokio-executor/src/tests/track_drop.rs | 57 -- tokio-executor/src/thread_pool/builder.rs | 259 ------ tokio-executor/src/thread_pool/current.rs | 85 -- tokio-executor/src/thread_pool/idle.rs | 229 ------ tokio-executor/src/thread_pool/join.rs | 42 - tokio-executor/src/thread_pool/mod.rs | 58 -- tokio-executor/src/thread_pool/owned.rs | 77 -- tokio-executor/src/thread_pool/park.rs | 182 ----- tokio-executor/src/thread_pool/pool.rs | 111 --- tokio-executor/src/thread_pool/queue/global.rs | 195 ----- tokio-executor/src/thread_pool/queue/inject.rs | 36 - tokio-executor/src/thread_pool/queue/local.rs | 298 ------- tokio-executor/src/thread_pool/queue/mod.rs | 41 - tokio-executor/src/thread_pool/queue/worker.rs | 129 --- tokio-executor/src/thread_pool/set.rs | 209 ----- tokio-executor/src/thread_pool/shared.rs | 104 --- tokio-executor/src/thread_pool/shutdown.rs | 48 -- tokio-executor/src/thread_pool/spawner.rs | 61 -- tokio-executor/src/thread_pool/tests/loom_pool.rs | 138 ---- tokio-executor/src/thread_pool/tests/loom_queue.rs | 68 -- tokio-executor/src/thread_pool/tests/mod.rs | 11 - tokio-executor/src/thread_pool/tests/queue.rs | 281 ------- tokio-executor/src/thread_pool/tests/worker.rs | 68 -- tokio-executor/src/thread_pool/worker.rs | 415 ---------- tokio-executor/src/typed.rs | 178 ----- tokio-executor/src/util/mod.rs | 5 - tokio-executor/src/util/pad.rs | 52 -- tokio-executor/src/util/rand.rs | 52 -- tokio-executor/tests/current_thread.rs | 781 ------------------ tokio-executor/tests/enter.rs | 17 - tokio-executor/tests/executor.rs | 24 - tokio-executor/tests/global.rs | 17 - tokio-executor/tests/thread_pool.rs | 478 ----------- tokio-test/Cargo.toml | 1 - tokio-test/src/clock.rs | 2 +- tokio-test/src/task.rs | 2 +- tokio/Cargo.toml | 21 +- tokio/benches/thread_pool.rs | 161 ++++ tokio/src/executor.rs | 104 --- tokio/src/executor/blocking/builder.rs | 58 ++ tokio/src/executor/blocking/mod.rs | 332 ++++++++ tokio/src/executor/current_thread/mod.rs | 873 +++++++++++++++++++++ tokio/src/executor/current_thread/scheduler.rs | 808 +++++++++++++++++++ tokio/src/executor/enter.rs | 139 ++++ tokio/src/executor/error.rs | 49 ++ tokio/src/executor/executor.rs | 181 +++++ tokio/src/executor/global.rs | 233 ++++++ tokio/src/executor/loom/mod.rs | 27 + tokio/src/executor/loom/std/atomic_u32.rs | 44 ++ tokio/src/executor/loom/std/atomic_usize.rs | 45 ++ tokio/src/executor/loom/std/causal_cell.rs | 48 ++ tokio/src/executor/loom/std/mod.rs | 77 ++ tokio/src/executor/mod.rs | 84 ++ tokio/src/executor/park/mod.rs | 140 ++++ tokio/src/executor/park/thread.rs | 263 +++++++ tokio/src/executor/task/core.rs | 153 ++++ tokio/src/executor/task/error.rs | 48 ++ tokio/src/executor/task/harness.rs | 546 +++++++++++++ tokio/src/executor/task/join.rs | 74 ++ tokio/src/executor/task/list.rs | 70 ++ tokio/src/executor/task/mod.rs | 130 +++ tokio/src/executor/task/raw.rs | 190 +++++ tokio/src/executor/task/stack.rs | 85 ++ tokio/src/executor/task/state.rs | 502 ++++++++++++ tokio/src/executor/task/tests/loom.rs | 277 +++++++ tokio/src/executor/task/tests/mod.rs | 5 + tokio/src/executor/task/tests/task.rs | 643 +++++++++++++++ tokio/src/executor/task/waker.rs | 107 +++ tokio/src/executor/tests/backoff.rs | 32 + tokio/src/executor/tests/loom_oneshot.rs | 49 ++ tokio/src/executor/tests/loom_schedule.rs | 51 ++ tokio/src/executor/tests/mock_park.rs | 66 ++ tokio/src/executor/tests/mock_schedule.rs | 131 ++++ tokio/src/executor/tests/mod.rs | 40 + tokio/src/executor/tests/track_drop.rs | 57 ++ tokio/src/executor/thread_pool/builder.rs | 259 ++++++ tokio/src/executor/thread_pool/current.rs | 85 ++ tokio/src/executor/thread_pool/idle.rs | 229 ++++++ tokio/src/executor/thread_pool/join.rs | 42 + tokio/src/executor/thread_pool/mod.rs | 58 ++ tokio/src/executor/thread_pool/owned.rs | 77 ++ tokio/src/executor/thread_pool/park.rs | 182 +++++ tokio/src/executor/thread_pool/pool.rs | 111 +++ tokio/src/executor/thread_pool/queue/global.rs | 195 +++++ tokio/src/executor/thread_pool/queue/inject.rs | 36 + tokio/src/executor/thread_pool/queue/local.rs | 298 +++++++ tokio/src/executor/thread_pool/queue/mod.rs | 41 + tokio/src/executor/thread_pool/queue/worker.rs | 127 +++ tokio/src/executor/thread_pool/set.rs | 209 +++++ tokio/src/executor/thread_pool/shared.rs | 104 +++ tokio/src/executor/thread_pool/shutdown.rs | 48 ++ tokio/src/executor/thread_pool/spawner.rs | 61 ++ tokio/src/executor/thread_pool/tests/loom_pool.rs | 138 ++++ tokio/src/executor/thread_pool/tests/loom_queue.rs | 68 ++ tokio/src/executor/thread_pool/tests/mod.rs | 11 + tokio/src/executor/thread_pool/tests/queue.rs | 281 +++++++ tokio/src/executor/thread_pool/tests/worker.rs | 68 ++ tokio/src/executor/thread_pool/worker.rs | 415 ++++++++++ tokio/src/executor/typed.rs | 178 +++++ tokio/src/executor/util/mod.rs | 5 + tokio/src/executor/util/pad.rs | 52 ++ tokio/src/executor/util/rand.rs | 52 ++ tokio/src/fs/mod.rs | 7 +- tokio/src/lib.rs | 8 +- tokio/src/net/addr.rs | 4 +- tokio/src/net/driver/mod.rs | 4 +- tokio/src/net/driver/reactor/mod.rs | 2 +- tokio/src/runtime/current_thread/builder.rs | 3 +- tokio/src/runtime/current_thread/mod.rs | 4 +- tokio/src/runtime/current_thread/runtime.rs | 11 +- tokio/src/runtime/mod.rs | 4 +- tokio/src/runtime/threadpool/builder.rs | 3 +- tokio/src/runtime/threadpool/mod.rs | 4 +- tokio/src/runtime/threadpool/spawner.rs | 3 +- tokio/src/timer/timer/mod.rs | 3 +- tokio/tests/current_thread.rs | 781 ++++++++++++++++++ tokio/tests/executor.rs | 24 + tokio/tests/executor_enter.rs | 17 + tokio/tests/executor_global.rs | 17 + tokio/tests/net_driver.rs | 2 +- tokio/tests/runtime_current_thread.rs | 3 +- tokio/tests/runtime_threaded.rs | 5 +- tokio/tests/thread_pool.rs | 478 +++++++++++ tokio/tests/timer_hammer.rs | 6 +- 169 files changed, 11629 insertions(+), 11932 deletions(-) delete mode 100644 tokio-executor/CHANGELOG.md delete mode 100644 tokio-executor/Cargo.toml delete mode 100644 tokio-executor/LICENSE delete mode 100644 tokio-executor/README.md delete mode 100644 tokio-executor/benches/thread_pool.rs delete mode 100644 tokio-executor/src/blocking/builder.rs delete mode 100644 tokio-executor/src/blocking/mod.rs delete mode 100644 tokio-executor/src/current_thread/mod.rs delete mode 100644 tokio-executor/src/current_thread/scheduler.rs delete mode 100644 tokio-executor/src/enter.rs delete mode 100644 tokio-executor/src/error.rs delete mode 100644 tokio-executor/src/executor.rs delete mode 100644 tokio-executor/src/global.rs delete mode 100644 tokio-executor/src/lib.rs delete mode 100644 tokio-executor/src/loom/mod.rs delete mode 100644 tokio-executor/src/loom/std/atomic_u32.rs delete mode 100644 tokio-executor/src/loom/std/atomic_usize.rs delete mode 100644 tokio-executor/src/loom/std/causal_cell.rs delete mode 100644 tokio-executor/src/loom/std/mod.rs delete mode 100644 tokio-executor/src/park/mod.rs delete mode 100644 tokio-executor/src/park/thread.rs delete mode 100644 tokio-executor/src/task/core.rs delete mode 100644 tokio-executor/src/task/error.rs delete mode 100644 tokio-executor/src/task/harness.rs delete mode 100644 tokio-executor/src/task/join.rs delete mode 100644 tokio-executor/src/task/list.rs delete mode 100644 tokio-executor/src/task/mod.rs delete mode 100644 tokio-executor/src/task/raw.rs delete mode 100644 tokio-executor/src/task/stack.rs delete mode 100644 tokio-executor/src/task/state.rs delete mode 100644 tokio-executor/src/task/tests/loom.rs delete mode 100644 tokio-executor/src/task/tests/mod.rs delete mode 100644 tokio-executor/src/task/tests/task.rs delete mode 100644 tokio-executor/src/task/waker.rs delete mode 100644 tokio-executor/src/tests/backoff.rs delete mode 100644 tokio-executor/src/tests/loom_oneshot.rs delete mode 100644 tokio-executor/src/tests/loom_schedule.rs delete mode 100644 tokio-executor/src/tests/mock_park.rs delete mode 100644 tokio-executor/src/tests/mock_schedule.rs delete mode 100644 tokio-executor/src/tests/mod.rs delete mode 100644 tokio-executor/src/tests/track_drop.rs delete mode 100644 tokio-executor/src/thread_pool/builder.rs delete mode 100644 tokio-executor/src/thread_pool/current.rs delete mode 100644 tokio-executor/src/thread_pool/idle.rs delete mode 100644 tokio-executor/src/thread_pool/join.rs delete mode 100644 tokio-executor/src/thread_pool/mod.rs delete mode 100644 tokio-executor/src/thread_pool/owned.rs delete mode 100644 tokio-executor/src/thread_pool/park.rs delete mode 100644 tokio-executor/src/thread_pool/pool.rs delete mode 100644 tokio-executor/src/thread_pool/queue/global.rs delete mode 100644 tokio-executor/src/thread_pool/queue/inject.rs delete mode 100644 tokio-executor/src/thread_pool/queue/local.rs delete mode 100644 tokio-executor/src/thread_pool/queue/mod.rs delete mode 100644 tokio-executor/src/thread_pool/queue/worker.rs delete mode 100644 tokio-executor/src/thread_pool/set.rs delete mode 100644 tokio-executor/src/thread_pool/shared.rs delete mode 100644 tokio-executor/src/thread_pool/shutdown.rs delete mode 100644 tokio-executor/src/thread_pool/spawner.rs delete mode 100644 tokio-executor/src/thread_pool/tests/loom_pool.rs delete mode 100644 tokio-executor/src/thread_pool/tests/loom_queue.rs delete mode 100644 tokio-executor/src/thread_pool/tests/mod.rs delete mode 100644 tokio-executor/src/thread_pool/tests/queue.rs delete mode 100644 tokio-executor/src/thread_pool/tests/worker.rs delete mode 100644 tokio-executor/src/thread_pool/worker.rs delete mode 100644 tokio-executor/src/typed.rs delete mode 100644 tokio-executor/src/util/mod.rs delete mode 100644 tokio-executor/src/util/pad.rs delete mode 100644 tokio-executor/src/util/rand.rs delete mode 100644 tokio-executor/tests/current_thread.rs delete mode 100644 tokio-executor/tests/enter.rs delete mode 100644 tokio-executor/tests/executor.rs delete mode 100644 tokio-executor/tests/global.rs delete mode 100644 tokio-executor/tests/thread_pool.rs create mode 100644 tokio/benches/thread_pool.rs delete mode 100644 tokio/src/executor.rs create mode 100644 tokio/src/executor/blocking/builder.rs create mode 100644 tokio/src/executor/blocking/mod.rs create mode 100644 tokio/src/executor/current_thread/mod.rs create mode 100644 tokio/src/executor/current_thread/scheduler.rs create mode 100644 tokio/src/executor/enter.rs create mode 100644 tokio/src/executor/error.rs create mode 100644 tokio/src/executor/executor.rs create mode 100644 tokio/src/executor/global.rs create mode 100644 tokio/src/executor/loom/mod.rs create mode 100644 tokio/src/executor/loom/std/atomic_u32.rs create mode 100644 tokio/src/executor/loom/std/atomic_usize.rs create mode 100644 tokio/src/executor/loom/std/causal_cell.rs create mode 100644 tokio/src/executor/loom/std/mod.rs create mode 100644 tokio/src/executor/mod.rs create mode 100644 tokio/src/executor/park/mod.rs create mode 100644 tokio/src/executor/park/thread.rs create mode 100644 tokio/src/executor/task/core.rs create mode 100644 tokio/src/executor/task/error.rs create mode 100644 tokio/src/executor/task/harness.rs create mode 100644 tokio/src/executor/task/join.rs create mode 100644 tokio/src/executor/task/list.rs create mode 100644 tokio/src/executor/task/mod.rs create mode 100644 tokio/src/executor/task/raw.rs create mode 100644 tokio/src/executor/task/stack.rs create mode 100644 tokio/src/executor/task/state.rs create mode 100644 tokio/src/executor/task/tests/loom.rs create mode 100644 tokio/src/executor/task/tests/mod.rs create mode 100644 tokio/src/executor/task/tests/task.rs create mode 100644 tokio/src/executor/task/waker.rs create mode 100644 tokio/src/executor/tests/backoff.rs create mode 100644 tokio/src/executor/tests/loom_oneshot.rs create mode 100644 tokio/src/executor/tests/loom_schedule.rs create mode 100644 tokio/src/executor/tests/mock_park.rs create mode 100644 tokio/src/executor/tests/mock_schedule.rs create mode 100644 tokio/src/executor/tests/mod.rs create mode 100644 tokio/src/executor/tests/track_drop.rs create mode 100644 tokio/src/executor/thread_pool/builder.rs create mode 100644 tokio/src/executor/thread_pool/current.rs create mode 100644 tokio/src/executor/thread_pool/idle.rs create mode 100644 tokio/src/executor/thread_pool/join.rs create mode 100644 tokio/src/executor/thread_pool/mod.rs create mode 100644 tokio/src/executor/thread_pool/owned.rs create mode 100644 tokio/src/executor/thread_pool/park.rs create mode 100644 tokio/src/executor/thread_pool/pool.rs create mode 100644 tokio/src/executor/thread_pool/queue/global.rs create mode 100644 tokio/src/executor/thread_pool/queue/inject.rs create mode 100644 tokio/src/executor/thread_pool/queue/local.rs create mode 100644 tokio/src/executor/thread_pool/queue/mod.rs create mode 100644 tokio/src/executor/thread_pool/queue/worker.rs create mode 100644 tokio/src/executor/thread_pool/set.rs create mode 100644 tokio/src/executor/thread_pool/shared.rs create mode 100644 tokio/src/executor/thread_pool/shutdown.rs create mode 100644 tokio/src/executor/thread_pool/spawner.rs create mode 100644 tokio/src/executor/thread_pool/tests/loom_pool.rs create mode 100644 tokio/src/executor/thread_pool/tests/loom_queue.rs create mode 100644 tokio/src/executor/thread_pool/tests/mod.rs create mode 100644 tokio/src/executor/thread_pool/tests/queue.rs create mode 100644 tokio/src/executor/thread_pool/tests/worker.rs create mode 100644 tokio/src/executor/thread_pool/worker.rs create mode 100644 tokio/src/executor/typed.rs create mode 100644 tokio/src/executor/util/mod.rs create mode 100644 tokio/src/executor/util/pad.rs create mode 100644 tokio/src/executor/util/rand.rs create mode 100644 tokio/tests/current_thread.rs create mode 100644 tokio/tests/executor.rs create mode 100644 tokio/tests/executor_enter.rs create mode 100644 tokio/tests/executor_global.rs create mode 100644 tokio/tests/thread_pool.rs diff --git a/Cargo.toml b/Cargo.toml index d6703e16..4a9e88a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "tokio", - "tokio-executor", "tokio-macros", "tokio-sync", "tokio-test", diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 5b3188ea..4fb21761 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -47,9 +47,6 @@ jobs: displayName: Test sub crates - rust: beta crates: - tokio-executor: - - current-thread - - thread-pool tokio-sync: - async-traits tokio-macros: [] @@ -58,22 +55,23 @@ jobs: examples: [] # Test compilation failure -- template: ci/azure-test-stable.yml - parameters: - name: test_features - displayName: Test feature flags - rust: beta - crates: - tests-build: - - tokio-executor - - executor-without-current-thread - # - macros-invalid-input - # - net-no-features - # - net-with-tcp - # - net-with-udp - # - net-with-uds - # - tokio-no-features - # - tokio-with-net +# Disable pending: https://github.com/tokio-rs/tokio/pull/1695#issuecomment-547045383 +# - template: ci/azure-test-stable.yml +# parameters: +# name: test_features +# displayName: Test feature flags +# rust: beta +# crates: +# tests-build: +# # - tokio-executor +# # - executor-without-current-thread +# # - macros-invalid-input +# # - net-no-features +# # - net-with-tcp +# # - net-with-udp +# # - net-with-uds +# # - tokio-no-features +# # - tokio-with-net # Run loom tests - template: ci/azure-loom.yml @@ -81,7 +79,6 @@ jobs: name: loom rust: beta crates: - - tokio-executor - tokio # Try cross compiling @@ -115,7 +112,7 @@ jobs: - clippy - test_tokio - test_linux - - test_features + # - test_features - loom # - test_nightly - cross diff --git a/ci/azure-tsan.yml b/ci/azure-tsan.yml index 3ce24ee8..0104697e 100644 --- a/ci/azure-tsan.yml +++ b/ci/azure-tsan.yml @@ -5,8 +5,6 @@ jobs: matrix: Timer: cmd: cargo test -p tokio-timer --test hammer - Threadpool: - cmd: cargo test -p tokio-executor --tests --features threadpool pool: vmImage: ubuntu-16.04 steps: diff --git a/ci/patch.toml b/ci/patch.toml index 6718f231..333c4dff 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -2,7 +2,6 @@ # repository. [patch.crates-io] tokio = { path = "tokio" } -tokio-executor = { path = "tokio-executor" } tokio-macros = { path = "tokio-macros" } tokio-sync = { path = "tokio-sync" } tokio-tls = { path = "tokio-tls" } diff --git a/tests-build/Cargo.toml b/tests-build/Cargo.toml index fb837d03..36dbc883 100644 --- a/tests-build/Cargo.toml +++ b/tests-build/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" publish = false [features] -executor-without-current-thread = ["tokio-executor"] +# executor-without-current-thread = ["tokio-executor"] # macros-invalid-input = ["tokio/rt-full"] # net-no-features = ["tokio-net"] # net-with-tcp = ["tokio-net/tcp"] @@ -19,7 +19,6 @@ executor-without-current-thread = ["tokio-executor"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio-executor = { path = "../tokio-executor", optional = true } # tokio = { path = "../tokio", optional = true, default-features = false } [dev-dependencies] diff --git a/tokio-executor/CHANGELOG.md b/tokio-executor/CHANGELOG.md deleted file mode 100644 index d2cca0ba..00000000 --- a/tokio-executor/CHANGELOG.md +++ /dev/null @@ -1,81 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -### Fix -- shutdown blocking pool threads when idle (#1562, #1514). - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 28, 2019) - -### Changed -- use `tracing` instead of `log` - -### Added -- thread pool dedicated to blocking operations (#1495). -- `Executor::spawn_with_handle` (#1492). - -# 0.2.0-alpha.2 (August 17, 2019) - -### Fixed -- allow running executor from within blocking clause (#1433). - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -### Added -- Import `current-thread` executor (#1447). -- Import `threadpool` executor (#1152). - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -### Removed -- `Enter::make_permanent` and `Enter::on_exit` (#???) - -# 0.1.7 (March 22, 2019) - -### Added -- `TypedExecutor` for spawning futures of a specific type (#993). - -# 0.1.6 (January 6, 2019) - -* Implement `Unpark` for `Arc` (#802). -* Switch to crossbeam's Parker / Unparker (#528). - -# 0.1.5 (September 26, 2018) - -* Implement `futures::Executor` for `DefaultExecutor` (#563). -* Add `Enter::block_on(future)` (#646) - -# 0.1.4 (August 23, 2018) - -* Implement `std::error::Error` for error types (#511). - -# 0.1.3 (August 6, 2018) - -* Implement `Executor` for `Box` (#420). -* Improve `EnterError` debug message (#410). -* Implement `status`, `Send`, and `Sync` for `DefaultExecutor` (#463, #472). -* Fix race in `ParkThread` (#507). -* Handle recursive calls into `DefaultExecutor` (#473). - -# 0.1.2 (March 30, 2018) - -* Implement `Unpark` for `Box`. - -# 0.1.1 (March 22, 2018) - -* Optionally support futures 0.2. - -# 0.1.0 (March 09, 2018) - -* Initial release diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml deleted file mode 100644 index d1bd6a18..00000000 --- a/tokio-executor/Cargo.toml +++ /dev/null @@ -1,52 +0,0 @@ -[package] -name = "tokio-executor" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.2.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -documentation = "https://docs.rs/tokio-executor/0.2.0-alpha.6/tokio_executor" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://github.com/tokio-rs/tokio" -license = "MIT" -authors = ["Tokio Contributors "] -description = """ -Future execution primitives -""" -keywords = ["futures", "tokio"] -categories = ["concurrency", "asynchronous"] - -[features] -blocking = ["lazy_static"] -current-thread = ["crossbeam-channel"] -thread-pool = ["num_cpus"] - -[dependencies] -futures-util-preview = { version = "=0.3.0-alpha.19", features = ["channel"] } -tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" } - -# current-thread dependencies -crossbeam-channel = { version = "0.3.8", optional = true } - -# threadpool dependencies -num_cpus = { version = "1.2", optional = true } - -# blocking -futures-core-preview = { version = "=0.3.0-alpha.19", optional = true } -lazy_static = { version = "1", optional = true } - -[dev-dependencies] -tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } -tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" } -tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" } - -futures-core-preview = "=0.3.0-alpha.19" -loom = { version = "0.2.11", features = ["futures", "checkpoint"] } -rand = "0.7" - -[package.metadata.docs.rs] -all-features = true diff --git a/tokio-executor/LICENSE b/tokio-executor/LICENSE deleted file mode 100644 index cdb28b4b..00000000 --- a/tokio-executor/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-executor/README.md b/tokio-executor/README.md deleted file mode 100644 index 328b52b6..00000000 --- a/tokio-executor/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# tokio-executor - -Task execution related traits and utilities. - -## License - -This project is licensed under the [MIT license](LICENSE). - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Tokio by you, shall be licensed as MIT, without any additional -terms or conditions. diff --git a/tokio-executor/benches/thread_pool.rs b/tokio-executor/benches/thread_pool.rs deleted file mode 100644 index 003c7ca9..00000000 --- a/tokio-executor/benches/thread_pool.rs +++ /dev/null @@ -1,161 +0,0 @@ -#![feature(test)] - -extern crate test; - -use tokio_executor::thread_pool::{Builder, Spawner, ThreadPool}; -use tokio_sync::oneshot; - -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::{mpsc, Arc}; -use std::task::{Context, Poll}; - -struct Backoff(usize); - -impl Future for Backoff { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.0 == 0 { - Poll::Ready(()) - } else { - self.0 -= 1; - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} - -const NUM_THREADS: usize = 6; - -#[bench] -fn spawn_many(b: &mut test::Bencher) { - const NUM_SPAWN: usize = 10_000; - - let threadpool = Builder::new().num_threads(NUM_THREADS).build(); - - let (tx, rx) = mpsc::sync_channel(1000); - let rem = Arc::new(AtomicUsize::new(0)); - - b.iter(|| { - rem.store(NUM_SPAWN, Relaxed); - - for _ in 0..NUM_SPAWN { - let tx = tx.clone(); - let rem = rem.clone(); - - threadpool.spawn(async move { - if 1 == rem.fetch_sub(1, Relaxed) { - tx.send(()).unwrap(); - } - }); - } - - let _ = rx.recv().unwrap(); - }); -} - -#[bench] -fn yield_many(b: &mut test::Bencher) { - const NUM_YIELD: usize = 1_000; - const TASKS_PER_CPU: usize = 50; - - let threadpool = Builder::new().num_threads(NUM_THREADS).build(); - - let tasks = TASKS_PER_CPU * num_cpus::get_physical(); - let (tx, rx) = mpsc::sync_channel(tasks); - - b.iter(move || { - for _ in 0..tasks { - let tx = tx.clone(); - - threadpool.spawn(async move { - let backoff = Backoff(NUM_YIELD); - backoff.await; - tx.send(()).unwrap(); - }); - } - - for _ in 0..tasks { - let _ = rx.recv().unwrap(); - } - }); -} - -#[bench] -fn ping_pong(b: &mut test::Bencher) { - const NUM_PINGS: usize = 1_000; - - let threadpool = Builder::new().num_threads(NUM_THREADS).build(); - - let (done_tx, done_rx) = mpsc::sync_channel(1000); - let rem = Arc::new(AtomicUsize::new(0)); - - b.iter(|| { - let done_tx = done_tx.clone(); - let rem = rem.clone(); - rem.store(NUM_PINGS, Relaxed); - - let spawner = threadpool.spawner().clone(); - - threadpool.spawn(async move { - for _ in 0..NUM_PINGS { - let rem = rem.clone(); - let done_tx = done_tx.clone(); - - let spawner2 = spawner.clone(); - - spawner.spawn(async move { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - spawner2.spawn(async move { - rx1.await.unwrap(); - tx2.send(()).unwrap(); - }); - - tx1.send(()).unwrap(); - rx2.await.unwrap(); - - if 1 == rem.fetch_sub(1, Relaxed) { - done_tx.send(()).unwrap(); - } - }); - } - }); - - done_rx.recv().unwrap(); - }); -} - -#[bench] -fn chained_spawn(b: &mut test::Bencher) { - const ITER: usize = 1_000; - - let threadpool = Builder::new().num_threads(NUM_THREADS).build(); - - fn iter(spawner: Spawner, done_tx: mpsc::SyncSender<()>, n: usize) { - if n == 0 { - done_tx.send(()).unwrap(); - } else { - let s2 = spawner.clone(); - spawner.spawn(async move { - iter(s2, done_tx, n - 1); - }); - } - } - - let (done_tx, done_rx) = mpsc::sync_channel(1000); - - b.iter(move || { - let done_tx = done_tx.clone(); - let spawner = threadpool.spawner().clone(); - threadpool.spawn(async move { - iter(spawner, done_tx, ITER); - }); - - done_rx.recv().unwrap(); - }); -} diff --git a/tokio-executor/src/blocking/builder.rs b/tokio-executor/src/blocking/builder.rs deleted file mode 100644 index 3cf1aef8..00000000 --- a/tokio-executor/src/blocking/builder.rs +++ /dev/null @@ -1,57 +0,0 @@ -use super::Pool; -use crate::loom::thread; -use std::usize; - -/// Builds a blocking thread pool with custom configuration values. -pub(crate) struct Builder { - /// Thread name - name: String, - - /// Thread stack size - stack_size: Option, -} - -impl Default for Builder { - fn default() -> Self { - Builder { - name: "tokio-blocking-thread".to_string(), - stack_size: None, - } - } -} - -impl Builder { - /// Set name of threads spawned by the pool - /// - /// If this configuration is not set, then the thread will use the system - /// default naming scheme. - pub(crate) fn name>(&mut self, val: S) -> &mut Self { - self.name = val.into(); - self - } - - /// Set the stack size (in bytes) for worker threads. - /// - /// The actual stack size may be greater than this value if the platform - /// specifies minimal stack size. - /// - /// The default stack size for spawned threads is 2 MiB, though this - /// particular stack size is subject to change in the future. - pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self { - self.stack_size = Some(val); - self - } - - pub(crate) fn build(self) -> Pool { - let mut p = Pool::default(); - let Builder { stack_size, name } = self; - p.new_thread = Box::new(move || { - let mut b = thread::Builder::new().name(name.clone()); - if let Some(stack_size) = stack_size { - b = b.stack_size(stack_size); - } - b - }); - p - } -} diff --git a/tokio-executor/src/blocking/mod.rs b/tokio-executor/src/blocking/mod.rs deleted file mode 100644 index 34c52181..00000000 --- a/tokio-executor/src/blocking/mod.rs +++ /dev/null @@ -1,332 +0,0 @@ -//! Thread pool for blocking operations - -use crate::loom::sync::{Arc, Condvar, Mutex}; -use crate::loom::thread; -#[cfg(feature = "blocking")] -use tokio_sync::oneshot; - -use std::cell::Cell; -use std::collections::VecDeque; -use std::fmt; -#[cfg(feature = "blocking")] -use std::future::Future; -use std::ops::Deref; -#[cfg(feature = "blocking")] -use std::pin::Pin; -#[cfg(feature = "blocking")] -use std::task::{Context, Poll}; -use std::time::Duration; - -#[cfg(feature = "thread-pool")] -mod builder; - -#[cfg(feature = "thread-pool")] -pub(crate) use builder::Builder; - -#[derive(Clone, Copy)] -enum State { - Empty, - Ready(*const Arc), -} - -thread_local! { - /// Thread-local tracking the current executor - static BLOCKING: Cell = Cell::new(State::Empty) -} - -/// Set the blocking pool for the duration of the closure -/// -/// If a blocking pool is already set, it will be restored when the closure returns or if it -/// panics. -#[allow(dead_code)] // we allow dead code since this won't be called if no executors are enabled -pub(crate) fn with_pool(pool: &Arc, f: F) -> R -where - F: FnOnce() -> R, -{ - // While scary, this is safe. The function takes a `&Pool`, which guarantees - // that the reference lives for the duration of `with_pool`. - // - // Because we are always clearing the TLS value at the end of the - // function, we can cast the reference to 'static which thread-local - // cells require. - BLOCKING.with(|cell| { - let was = cell.replace(State::Empty); - - // Ensure that the pool is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell, State); - - impl Drop for Reset<'_> { - fn drop(&mut self) { - self.0.set(self.1); - } - } - - let _reset = Reset(cell, was); - cell.set(State::Ready(pool as *const _)); - f() - }) -} - -pub(crate) struct Pool { - shared: Mutex, - condvar: Condvar, - new_thread: Box thread::Builder + Send + Sync + 'static>, -} - -impl fmt::Debug for Pool { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Pool").finish() - } -} - -struct Shared { - queue: VecDeque>, - num_th: u32, - num_idle: u32, - num_notify: u32, - shutdown: bool, -} - -const MAX_THREADS: u32 = 1_000; -const KEEP_ALIVE: Duration = Duration::from_secs(10); - -/// Result of a blocking operation running on the blocking thread pool. -#[cfg(feature = "blocking")] -#[derive(Debug)] -pub struct Blocking { - rx: oneshot::Receiver, -} - -impl Pool { - /// Run the provided function on an executor dedicated to blocking operations. - pub(crate) fn spawn(this: &Arc, f: Box) { - let should_spawn = { - let mut shared = this.shared.lock().unwrap(); - - if shared.shutdown { - // no need to even push this task; it would never get picked up - return; - } - - shared.queue.push_back(f); - - if shared.num_idle == 0 { - // No threads are able to process the task. - - if shared.num_th == MAX_THREADS { - // At max number of threads - false - } else { - shared.num_th += 1; - true - } - } else { - // Notify an idle worker thread. The notification counter - // is used to count the needed amount of notifications - // exactly. Thread libraries may generate spurious - // wakeups, this counter is used to keep us in a - // consistent state. - shared.num_idle -= 1; - shared.num_notify += 1; - this.condvar.notify_one(); - false - } - }; - - if should_spawn { - Pool::spawn_thread(Arc::clone(this), (this.new_thread)()); - } - } - - // NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc - fn spawn_thread(this: Arc, builder: thread::Builder) { - builder - .spawn(move || { - let mut shared = this.shared.lock().unwrap(); - 'main: loop { - // BUSY - while let Some(task) = shared.queue.pop_front() { - drop(shared); - run_task(task); - shared = this.shared.lock().unwrap(); - if shared.shutdown { - break; // Need to increment idle before we exit - } - } - - // IDLE - shared.num_idle += 1; - - while !shared.shutdown { - let lock_result = this.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); - shared = lock_result.0; - let timeout_result = lock_result.1; - - if shared.num_notify != 0 { - // We have received a legitimate wakeup, - // acknowledge it by decrementing the counter - // and transition to the BUSY state. - shared.num_notify -= 1; - break; - } - - if timeout_result.timed_out() { - break 'main; - } - - // Spurious wakeup detected, go back to sleep. - } - - if shared.shutdown { - // Work was produced, and we "took" it (by decrementing num_notify). - // This means that num_idle was decremented once for our wakeup. - // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; - // NOTE: Technically we should also do num_notify++ and notify again, - // but since we're shutting down anyway, that won't be necessary. - break; - } - } - - // Thread exit - shared.num_th -= 1; - - // num_idle should now be tracked exactly, panic - // with a descriptive message if it is not the - // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); - - if shared.shutdown && shared.num_th == 0 { - this.condvar.notify_one(); - } - }) - .unwrap(); - } - - /// Shut down all workers in the pool the next time they are idle. - /// - /// Blocks until all threads have exited. - pub(crate) fn shutdown(&self) { - let mut shared = self.shared.lock().unwrap(); - shared.shutdown = true; - self.condvar.notify_all(); - - while shared.num_th > 0 { - shared = self.condvar.wait(shared).unwrap(); - } - } -} - -pub(crate) struct PoolWaiter(Arc); - -impl From for PoolWaiter { - fn from(p: Pool) -> Self { - Self::from(Arc::new(p)) - } -} - -impl From> for PoolWaiter { - fn from(p: Arc) -> Self { - Self(p) - } -} - -impl Deref for PoolWaiter { - type Target = Arc; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Drop for PoolWaiter { - fn drop(&mut self) { - self.0.shutdown(); - } -} - -/// Run the provided closure on a thread where blocking is acceptable. -/// -/// In general, issuing a blocking call or performing a lot of compute in a future without -/// yielding is not okay, as it may prevent the executor from driving other futures forward. -/// A closure that is run through this method will instead be run on a dedicated thread pool for -/// such blocking tasks without holding up the main futures executor. -/// -/// # Examples -/// -/// ``` -/// # async fn docs() { -/// tokio_executor::blocking::run(move || { -/// // do some compute-heavy work or call synchronous code -/// }).await; -/// # } -/// ``` -#[cfg(feature = "blocking")] -pub fn run(f: F) -> Blocking -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let (tx, rx) = oneshot::channel(); - - BLOCKING.with(|current_pool| match current_pool.get() { - State::Ready(pool) => { - let pool = unsafe { &*pool }; - Pool::spawn( - pool, - Box::new(move || { - // receiver may have gone away - let _ = tx.send(f()); - }), - ); - } - State::Empty => panic!("must be called from the context of Tokio runtime"), - }); - - Blocking { rx } -} - -#[cfg(feature = "blocking")] -impl Future for Blocking { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use std::task::Poll::*; - - match Pin::new(&mut self.rx).poll(cx) { - Ready(Ok(v)) => Ready(v), - Ready(Err(_)) => panic!( - "the blocking operation has been dropped before completing. \ - This should not happen and is a bug." - ), - Pending => Pending, - } - } -} - -fn run_task(f: Box) { - use std::panic::{catch_unwind, AssertUnwindSafe}; - - let _ = catch_unwind(AssertUnwindSafe(|| f())); -} - -impl Default for Pool { - fn default() -> Self { - Pool { - shared: Mutex::new(Shared { - queue: VecDeque::new(), - num_th: 0, - num_idle: 0, - num_notify: 0, - shutdown: false, - }), - condvar: Condvar::new(), - new_thread: Box::new(|| { - thread::Builder::new().name("tokio-blocking-driver".to_string()) - }), - } - } -} diff --git a/tokio-executor/src/current_thread/mod.rs b/tokio-executor/src/current_thread/mod.rs deleted file mode 100644 index f302b019..00000000 --- a/tokio-executor/src/current_thread/mod.rs +++ /dev/null @@ -1,873 +0,0 @@ -//! A single-threaded executor which executes tasks on the same thread from which -//! they are spawned. -//! -//! [`CurrentThread`] is the main type of this crate. It executes tasks on the -//! current thread. The easiest way to start a new [`CurrentThread`] executor -//! is to call [`block_on_all`] with an initial task to seed the executor. All -//! tasks that are being managed by a [`CurrentThread`] executor are able to -//! spawn additional tasks by calling [`spawn`]. -//! -//! Application authors will not use this crate directly. Instead, they will use -//! the `tokio` crate. Library authors should only depend on -//! `tokio-current-thread` if they are building a custom task executor. -//! -//! [`CurrentThread`]: struct.CurrentThread.html -//! [`spawn`]: fn.spawn.html -//! [`block_on_all`]: fn.block_on_all.html - -mod scheduler; - -use self::scheduler::{Scheduler, TickArgs}; -#[cfg(feature = "blocking")] -use crate::blocking::{Pool, PoolWaiter}; -use crate::park::{Park, ParkThread, Unpark}; -use crate::{EnterError, Executor, SpawnError, TypedExecutor}; - -use std::cell::Cell; -use std::error::Error; -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::{atomic, Arc}; -use std::task::{Context, Poll, Waker}; -use std::thread; -use std::time::{Duration, Instant}; - -/// Executes tasks on the current thread -pub struct CurrentThread { - /// Execute futures and receive unpark notifications. - scheduler: Scheduler, - - /// Current number of futures being executed. - /// - /// The LSB is used to indicate that the runtime is preparing to shut down. - /// Thus, to get the actual number of pending futures, `>>1`. - num_futures: Arc, - - /// Thread park handle - park: P, - - /// Handle for spawning new futures from other threads - spawn_handle: Handle, - - /// Receiver for futures spawned from other threads - spawn_receiver: crossbeam_channel::Receiver + Send + 'static>>>, - - /// Handle to pool for handling blocking tasks - #[cfg(feature = "blocking")] - blocking: PoolWaiter, - - /// The thread-local ID assigned to this executor. - id: u64, -} - -/// Executes futures on the current thread. -/// -/// All futures executed using this executor will be executed on the current -/// thread. As such, `run` will wait for these futures to complete before -/// returning. -/// -/// For more details, see the [module level](index.html) documentation. -#[derive(Debug, Clone)] -pub struct TaskExecutor { - // Prevent the handle from moving across threads. - _p: ::std::marker::PhantomData>, -} - -/// Returned by the `turn` function. -#[derive(Debug)] -pub struct Turn { - polled: bool, -} - -impl Turn { - /// `true` if any futures were polled at all and `false` otherwise. - pub fn has_polled(&self) -> bool { - self.polled - } -} - -/// A `CurrentThread` instance bound to a supplied execution context. -pub struct Entered<'a, P: Park> { - executor: &'a mut CurrentThread

, -} - -/// Error returned by the `run` function. -#[derive(Debug)] -pub struct RunError { - _p: (), -} - -impl fmt::Display for RunError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Run error") - } -} - -impl Error for RunError {} - -/// Error returned by the `run_timeout` function. -#[derive(Debug)] -pub struct RunTimeoutError { - timeout: bool, -} - -impl fmt::Display for RunTimeoutError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = if self.timeout { - "Run timeout error (timeout)" - } else { - "Run timeout error (not timeout)" - }; - write!(fmt, "{}", descr) - } -} - -impl Error for RunTimeoutError {} - -/// Error returned by the `turn` function. -#[derive(Debug)] -pub struct TurnError { - _p: (), -} - -impl fmt::Display for TurnError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Turn error") - } -} - -impl Error for TurnError {} - -/// Error returned by the `block_on` function. -#[derive(Debug)] -pub struct BlockError { - inner: Option, -} - -impl fmt::Display for BlockError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Block error") - } -} - -impl Error for BlockError {} - -/// This is mostly split out to make the borrow checker happy. -struct Borrow<'a, U> { - spawner: BorrowSpawner<'a, U>, - #[cfg(feature = "blocking")] - blocking: &'a PoolWaiter, -} - -/// As is this. -struct BorrowSpawner<'a, U> { - id: u64, - num_futures: &'a atomic::AtomicUsize, - scheduler: &'a mut Scheduler, -} - -trait SpawnLocal { - fn spawn_local(&mut self, future: Pin>>, already_counted: bool); -} - -struct CurrentRunner { - spawn: Cell>, - id: Cell>, -} - -thread_local! { - /// Current thread's task runner. This is set in `TaskRunner::with` - static CURRENT: CurrentRunner = CurrentRunner { - spawn: Cell::new(None), - id: Cell::new(None), - } -} - -thread_local! { - /// Unique ID to assign to each new executor launched on this thread. - /// - /// The unique ID is used to determine if the currently running executor matches the one - /// referred to by a `Handle` so that direct task dispatch can be used. - static EXECUTOR_ID: Cell = Cell::new(0) -} - -/// Run the executor bootstrapping the execution with the provided future. -/// -/// This creates a new [`CurrentThread`] executor, spawns the provided future, -/// and blocks the current thread until the provided future and **all** -/// subsequently spawned futures complete. In other words: -/// -/// * If the provided bootstrap future does **not** spawn any additional tasks, -/// `block_on_all` returns once `future` completes. -/// * If the provided bootstrap future **does** spawn additional tasks, then -/// `block_on_all` returns once **all** spawned futures complete. -/// -/// See [module level][mod] documentation for more details. -/// -/// [`CurrentThread`]: struct.CurrentThread.html -/// [mod]: index.html -pub fn block_on_all(future: F) -> F::Output -where - F: Future, -{ - let mut current_thread = CurrentThread::new(); - - let ret = current_thread.block_on(future); - current_thread.run().unwrap(); - ret -} - -/// Executes a future on the current thread. -/// -/// The provided future must complete or be canceled before `run` will return. -/// -/// Unlike [`tokio::spawn`], this function will always spawn on a -/// `CurrentThread` executor and is able to spawn futures that are not `Send`. -/// -/// # Panics -/// -/// This function can only be invoked from the context of a `run` call; any -/// other use will result in a panic. -/// -/// [`tokio::spawn`]: ../fn.spawn.html -pub fn spawn(future: F) -where - F: Future + 'static, -{ - TaskExecutor::current() - .spawn_local(Box::pin(future)) - .unwrap(); -} - -// ===== impl CurrentThread ===== - -impl CurrentThread { - /// Create a new instance of `CurrentThread`. - pub fn new() -> Self { - CurrentThread::new_with_park(ParkThread::new()) - } -} - -impl CurrentThread

{ - /// Create a new instance of `CurrentThread` backed by the given park - /// handle. - pub fn new_with_park(park: P) -> Self { - let unpark = park.unpark(); - - let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded(); - let thread = thread::current().id(); - let id = EXECUTOR_ID.with(|idc| { - let id = idc.get(); - idc.set(id + 1); - id - }); - - let scheduler = Scheduler::new(unpark); - let waker = scheduler.waker(); - - let num_futures = Arc::new(atomic::AtomicUsize::new(0)); - - CurrentThread { - scheduler, - num_futures: num_futures.clone(), - park, - id, - spawn_handle: Handle { - sender: spawn_sender, - num_futures, - waker, - thread, - id, - }, - spawn_receiver, - - #[cfg(feature = "blocking")] - blocking: PoolWaiter::from(Pool::default()), - } - } - - /// Returns `true` if the executor is currently idle. - /// - /// An idle executor is defined by not currently having any spawned tasks. - /// - /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, - /// this method may return `true` even though there are more futures to be executed. - pub fn is_idle(&self) -> bool { - self.num_futures.load(atomic::Ordering::SeqCst) <= 1 - } - - /// Spawn the future on the executor. - /// - /// This internally queues the future to be executed once `run` is called. - pub fn spawn(&mut self, future: F) -> &mut Self - where - F: Future + 'static, - { - self.borrow().spawner.spawn_local(Box::pin(future), false); - self - } - - /// Synchronously waits for the provided `future` to complete. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution. - pub fn block_on(&mut self, future: F) -> F::Output - where - F: Future, - { - let _enter = crate::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().block_on(future) - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - let _enter = crate::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().run() - } - - /// Run the executor to completion, blocking the thread until all - /// spawned futures have completed **or** `duration` time has elapsed. - pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { - let _enter = crate::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().run_timeout(duration) - } - - /// Perform a single iteration of the event loop. - /// - /// This function blocks the current thread even if the executor is idle. - pub fn turn(&mut self, duration: Option) -> Result { - let _enter = crate::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().turn(duration) - } - - /// Bind `CurrentThread` instance with an execution context. - fn enter(&mut self) -> Entered<'_, P> { - Entered { executor: self } - } - - /// Returns a reference to the underlying `Park` instance. - pub fn get_park(&self) -> &P { - &self.park - } - - /// Returns a mutable reference to the underlying `Park` instance. - pub fn get_park_mut(&mut self) -> &mut P { - &mut self.park - } - - fn borrow(&mut self) -> Borrow<'_, P::Unpark> { - Borrow { - spawner: BorrowSpawner { - id: self.id, - scheduler: &mut self.scheduler, - num_futures: &*self.num_futures, - }, - #[cfg(feature = "blocking")] - blocking: &self.blocking, - } - } - - /// Get a new handle to spawn futures on the executor - /// - /// Different to the executor itself, the handle can be sent to different - /// threads and can be used to spawn futures on the executor. - pub fn handle(&self) -> Handle { - self.spawn_handle.clone() - } -} - -impl Drop for CurrentThread

{ - fn drop(&mut self) { - // Signal to Handles that no more futures can be spawned by setting LSB. - // - // NOTE: this isn't technically necessary since the send on the mpsc will fail once the - // receiver is dropped, but it's useful to illustrate how clean shutdown will be - // implemented (e.g., by setting the LSB). - let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); - - // TODO: We currently ignore any pending futures at the time we shut down. - // - // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) - // which sets LSB (as above) do make Handle::spawn stop working, and then runs until - // num_futures.load() == 1. - let _ = pending; - - // We will wait for any blocking ops by virtue of dropping `blocking`. - } -} - -impl Executor for CurrentThread { - fn spawn( - &mut self, - future: Pin + Send>>, - ) -> Result<(), SpawnError> { - self.borrow().spawner.spawn_local(future, false); - Ok(()) - } -} - -impl TypedExecutor for CurrentThread -where - T: Future + 'static, -{ - fn spawn(&mut self, future: T) -> Result<(), SpawnError> { - self.borrow().spawner.spawn_local(Box::pin(future), false); - Ok(()) - } -} - -impl fmt::Debug for CurrentThread

{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("CurrentThread") - .field("scheduler", &self.scheduler) - .field( - "num_futures", - &self.num_futures.load(atomic::Ordering::SeqCst), - ) - .finish() - } -} - -impl Default for CurrentThread

{ - fn default() -> Self { - CurrentThread::new_with_park(P::default()) - } -} - -// ===== impl Entered ===== - -impl Entered<'_, P> { - /// Spawn the future on the executor. - /// - /// This internally queues the future to be executed once `run` is called. - pub fn spawn(&mut self, future: F) -> &mut Self - where - F: Future + 'static, - { - self.executor - .borrow() - .spawner - .spawn_local(Box::pin(future), false); - self - } - - /// Synchronously waits for the provided `future` to complete. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution. - /// - /// # Panics - /// - /// This function will panic if the `Park` call returns an error. - pub fn block_on(&mut self, mut future: F) -> F::Output - where - F: Future, - { - // Safety: we shadow the original `future`, so it will never move - // again. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; - let waker = self.executor.scheduler.waker(); - let mut cx = Context::from_waker(&waker); - - loop { - let res = self - .executor - .borrow() - .enter(|| future.as_mut().poll(&mut cx)); - - match res { - Poll::Ready(e) => return e, - Poll::Pending => {} - } - - self.tick(); - - if self.executor.park.park().is_err() { - panic!("block_on park failed"); - } - } - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - self.run_timeout2(None).map_err(|_| RunError { _p: () }) - } - - /// Run the executor to completion, blocking the thread until all - /// spawned futures have completed **or** `duration` time has elapsed. - pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { - self.run_timeout2(Some(duration)) - } - - /// Perform a single iteration of the event loop. - /// - /// This function blocks the current thread even if the executor is idle. - pub fn turn(&mut self, duration: Option) -> Result { - let res = if self.executor.scheduler.has_pending_futures() { - self.executor.park.park_timeout(Duration::from_millis(0)) - } else { - match duration { - Some(duration) => self.executor.park.park_timeout(duration), - None => self.executor.park.park(), - } - }; - - if res.is_err() { - return Err(TurnError { _p: () }); - } - - let polled = self.tick(); - - Ok(Turn { polled }) - } - - /// Returns a reference to the underlying `Park` instance. - pub fn get_park(&self) -> &P { - &self.executor.park - } - - /// Returns a mutable reference to the underlying `Park` instance. - pub fn get_park_mut(&mut self) -> &mut P { - &mut self.executor.park - } - - fn run_timeout2(&mut self, dur: Option) -> Result<(), RunTimeoutError> { - if self.executor.is_idle() { - // Nothing to do - return Ok(()); - } - - let mut time = dur.map(|dur| (Instant::now() + dur, dur)); - - loop { - self.tick(); - - if self.executor.is_idle() { - return Ok(()); - } - - match time { - Some((until, rem)) => { - if self.executor.park.park_timeout(rem).is_err() { - return Err(RunTimeoutError::new(false)); - } - - let now = Instant::now(); - - if now >= until { - return Err(RunTimeoutError::new(true)); - } - - time = Some((until, until - now)); - } - None => { - if self.executor.park.park().is_err() { - return Err(RunTimeoutError::new(false)); - } - } - } - } - } - - /// Returns `true` if any futures were processed - fn tick(&mut self) -> bool { - // Spawn any futures that were spawned from other threads by manually - // looping over the receiver stream - - // FIXME: Slightly ugly but needed to make the borrow checker happy - let (mut borrow, spawn_receiver) = ( - Borrow { - spawner: BorrowSpawner { - id: self.executor.id, - scheduler: &mut self.executor.scheduler, - num_futures: &*self.executor.num_futures, - }, - #[cfg(feature = "blocking")] - blocking: &self.executor.blocking