From 8880222036f37c6204c8466f25e828447f16dacb Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 12 Oct 2020 13:44:54 -0400 Subject: rt: Remove `threaded_scheduler()` and `basic_scheduler()` (#2876) Co-authored-by: Alice Ryhl Co-authored-by: Carl Lerche --- benches/mpsc.rs | 37 +- benches/scheduler.rs | 5 +- benches/signal.rs | 4 +- benches/spawn.rs | 14 +- benches/sync_rwlock.rs | 21 +- benches/sync_semaphore.rs | 22 +- tests-build/Cargo.toml | 1 + tests-build/tests/fail/macros_core_no_default.rs | 6 + .../tests/fail/macros_core_no_default.stderr | 7 + tests-build/tests/fail/macros_invalid_input.stderr | 4 +- tests-build/tests/macros.rs | 5 +- tests-integration/tests/macros_main.rs | 21 +- tests-integration/tests/rt_shell.rs | 32 - tokio-macros/src/entry.rs | 409 ++++++------- tokio-macros/src/lib.rs | 238 +++----- tokio-test/src/lib.rs | 3 +- tokio-util/Cargo.toml | 1 + tokio-util/src/cfg.rs | 10 + tokio-util/src/codec/bytes_codec.rs | 2 +- tokio-util/src/context.rs | 12 +- tokio-util/src/lib.rs | 4 +- tokio-util/tests/context.rs | 11 +- tokio/Cargo.toml | 8 +- tokio/src/blocking.rs | 48 ++ tokio/src/coop.rs | 17 +- tokio/src/fs/mod.rs | 4 +- tokio/src/future/block_on.rs | 15 + tokio/src/future/mod.rs | 23 +- tokio/src/future/poll_fn.rs | 2 + tokio/src/future/try_join.rs | 2 +- tokio/src/io/driver/mod.rs | 40 +- tokio/src/io/mod.rs | 4 +- tokio/src/io/stdio_common.rs | 6 +- tokio/src/lib.rs | 28 +- tokio/src/loom/mod.rs | 2 + tokio/src/macros/cfg.rs | 90 +-- tokio/src/macros/mod.rs | 2 +- tokio/src/macros/support.rs | 3 +- tokio/src/net/addr.rs | 45 +- tokio/src/park/either.rs | 2 + tokio/src/park/mod.rs | 11 +- tokio/src/park/thread.rs | 25 +- tokio/src/process/unix/driver.rs | 2 + tokio/src/runtime/basic_scheduler.rs | 2 +- tokio/src/runtime/blocking/mod.rs | 20 +- tokio/src/runtime/blocking/pool.rs | 3 - tokio/src/runtime/builder.rs | 262 ++++----- tokio/src/runtime/context.rs | 6 +- tokio/src/runtime/driver.rs | 20 +- tokio/src/runtime/enter.rs | 214 ++++--- tokio/src/runtime/mod.rs | 648 ++++++++++----------- tokio/src/runtime/spawner.rs | 2 - tokio/src/runtime/task/error.rs | 2 +- tokio/src/runtime/task/join.rs | 2 +- tokio/src/runtime/task/mod.rs | 34 +- tokio/src/runtime/tests/loom_blocking.rs | 5 +- tokio/src/runtime/tests/loom_pool.rs | 5 +- tokio/src/runtime/thread_pool/atomic_cell.rs | 1 - tokio/src/runtime/thread_pool/mod.rs | 4 +- tokio/src/runtime/thread_pool/worker.rs | 161 +++-- tokio/src/signal/registry.rs | 2 +- tokio/src/signal/unix/driver.rs | 45 +- tokio/src/signal/windows.rs | 3 +- tokio/src/stream/mod.rs | 2 +- tokio/src/sync/mod.rs | 6 +- tokio/src/sync/mpsc/bounded.rs | 8 +- tokio/src/sync/mpsc/chan.rs | 2 +- tokio/src/task/blocking.rs | 153 +++-- tokio/src/task/local.rs | 2 + tokio/src/task/mod.rs | 19 +- tokio/src/task/yield_now.rs | 2 +- tokio/src/time/clock.rs | 38 +- tokio/src/time/driver/handle.rs | 72 ++- tokio/src/time/driver/mod.rs | 2 + tokio/src/util/linked_list.rs | 2 + tokio/src/util/mod.rs | 16 +- tokio/src/util/slab.rs | 2 + tokio/src/util/trace.rs | 4 +- tokio/tests/io_driver.rs | 3 +- tokio/tests/io_driver_drop.rs | 3 +- tokio/tests/rt_basic.rs | 3 +- tokio/tests/rt_common.rs | 15 +- tokio/tests/rt_threaded.rs | 17 +- tokio/tests/signal_drop_rt.rs | 3 +- tokio/tests/signal_multi_rt.rs | 3 +- tokio/tests/sync_rwlock.rs | 2 +- tokio/tests/task_blocking.rs | 53 +- tokio/tests/task_local.rs | 2 +- tokio/tests/task_local_set.rs | 27 +- tokio/tests/time_rt.rs | 6 +- 90 files changed, 1469 insertions(+), 1687 deletions(-) create mode 100644 tests-build/tests/fail/macros_core_no_default.rs create mode 100644 tests-build/tests/fail/macros_core_no_default.stderr delete mode 100644 tests-integration/tests/rt_shell.rs create mode 100644 tokio/src/blocking.rs create mode 100644 tokio/src/future/block_on.rs diff --git a/benches/mpsc.rs b/benches/mpsc.rs index 2f3fe963..3f7e3fca 100644 --- a/benches/mpsc.rs +++ b/benches/mpsc.rs @@ -4,6 +4,13 @@ use tokio::sync::mpsc; type Medium = [usize; 64]; type Large = [Medium; 64]; +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) + .build() + .unwrap() +} + fn create_1_medium(b: &mut Bencher) { b.iter(|| { black_box(&mpsc::channel::(1)); @@ -43,11 +50,7 @@ fn send_large(b: &mut Bencher) { } fn contention_bounded(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() - .build() - .unwrap(); + let rt = rt(); b.iter(|| { rt.block_on(async move { @@ -70,11 +73,7 @@ fn contention_bounded(b: &mut Bencher) { } fn contention_bounded_full(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() - .build() - .unwrap(); + let rt = rt(); b.iter(|| { rt.block_on(async move { @@ -97,11 +96,7 @@ fn contention_bounded_full(b: &mut Bencher) { } fn contention_unbounded(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() - .build() - .unwrap(); + let rt = rt(); b.iter(|| { rt.block_on(async move { @@ -124,11 +119,7 @@ fn contention_unbounded(b: &mut Bencher) { } fn uncontented_bounded(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() - .build() - .unwrap(); + let rt = rt(); b.iter(|| { rt.block_on(async move { @@ -146,11 +137,7 @@ fn uncontented_bounded(b: &mut Bencher) { } fn uncontented_unbounded(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() - .build() - .unwrap(); + let rt = rt(); b.iter(|| { rt.block_on(async move { diff --git a/benches/scheduler.rs b/benches/scheduler.rs index 801de72a..68a6d6a4 100644 --- a/benches/scheduler.rs +++ b/benches/scheduler.rs @@ -139,9 +139,8 @@ fn chained_spawn(b: &mut Bencher) { } fn rt() -> Runtime { - runtime::Builder::new() - .threaded_scheduler() - .core_threads(4) + runtime::Builder::new_multi_thread() + .worker_threads(4) .enable_all() .build() .unwrap() diff --git a/benches/signal.rs b/benches/signal.rs index 3a354c6b..fcad40da 100644 --- a/benches/signal.rs +++ b/benches/signal.rs @@ -45,9 +45,9 @@ fn many_signals(bench: &mut Bencher) { let num_signals = 10; let (tx, mut rx) = mpsc::channel(num_signals); - let rt = runtime::Builder::new() + let rt = runtime::Builder::new_multi_thread() // Intentionally single threaded to measure delays in propagating wakes - .basic_scheduler() + .worker_threads(0) .enable_all() .build() .unwrap(); diff --git a/benches/spawn.rs b/benches/spawn.rs index f76daf3f..72a40357 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -10,8 +10,7 @@ async fn work() -> usize { } fn basic_scheduler_local_spawn(bench: &mut Bencher) { - let runtime = tokio::runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); runtime.block_on(async { @@ -23,8 +22,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) { } fn threaded_scheduler_local_spawn(bench: &mut Bencher) { - let runtime = tokio::runtime::Builder::new() - .threaded_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); runtime.block_on(async { @@ -36,8 +34,7 @@ fn threaded_scheduler_local_spawn(bench: &mut Bencher) { } fn basic_scheduler_remote_spawn(bench: &mut Bencher) { - let runtime = tokio::runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); @@ -48,10 +45,7 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) { } fn threaded_scheduler_remote_spawn(bench: &mut Bencher) { - let runtime = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); bench.iter(|| { let h = runtime.spawn(work()); diff --git a/benches/sync_rwlock.rs b/benches/sync_rwlock.rs index 30c66e49..46eeac0c 100644 --- a/benches/sync_rwlock.rs +++ b/benches/sync_rwlock.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use tokio::{sync::RwLock, task}; fn read_uncontended(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) .build() .unwrap(); @@ -22,9 +21,8 @@ fn read_uncontended(b: &mut Bencher) { } fn read_concurrent_uncontended_multi(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) .build() .unwrap(); @@ -51,8 +49,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) { } fn read_concurrent_uncontended(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); @@ -78,9 +75,8 @@ fn read_concurrent_uncontended(b: &mut Bencher) { } fn read_concurrent_contended_multi(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) .build() .unwrap(); @@ -108,8 +104,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) { } fn read_concurrent_contended(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); diff --git a/benches/sync_semaphore.rs b/benches/sync_semaphore.rs index 32d4aa2b..19a1dd33 100644 --- a/benches/sync_semaphore.rs +++ b/benches/sync_semaphore.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use tokio::{sync::Semaphore, task}; fn uncontended(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) .build() .unwrap(); @@ -27,9 +26,8 @@ async fn task(s: Arc) { } fn uncontended_concurrent_multi(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) .build() .unwrap(); @@ -51,8 +49,8 @@ fn uncontended_concurrent_multi(b: &mut Bencher) { } fn uncontended_concurrent_single(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(0) .build() .unwrap(); @@ -73,9 +71,8 @@ fn uncontended_concurrent_single(b: &mut Bencher) { } fn contended_concurrent_multi(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .core_threads(6) - .threaded_scheduler() + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) .build() .unwrap(); @@ -97,8 +94,7 @@ fn contended_concurrent_multi(b: &mut Bencher) { } fn contended_concurrent_single(b: &mut Bencher) { - let rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); diff --git a/tests-build/Cargo.toml b/tests-build/Cargo.toml index 68231d71..e76621b4 100644 --- a/tests-build/Cargo.toml +++ b/tests-build/Cargo.toml @@ -7,6 +7,7 @@ publish = false [features] full = ["tokio/full"] +rt-core = ["tokio/rt-core", "tokio/macros"] [dependencies] tokio = { path = "../tokio", optional = true } diff --git a/tests-build/tests/fail/macros_core_no_default.rs b/tests-build/tests/fail/macros_core_no_default.rs new file mode 100644 index 00000000..23f8847d --- /dev/null +++ b/tests-build/tests/fail/macros_core_no_default.rs @@ -0,0 +1,6 @@ +use tests_build::tokio; + +#[tokio::main] +async fn my_fn() {} + +fn main() {} diff --git a/tests-build/tests/fail/macros_core_no_default.stderr b/tests-build/tests/fail/macros_core_no_default.stderr new file mode 100644 index 00000000..a3ae32cd --- /dev/null +++ b/tests-build/tests/fail/macros_core_no_default.stderr @@ -0,0 +1,7 @@ +error: The default runtime flavor is `multi_thread`, but the `rt-threaded` feature is disabled. + --> $DIR/macros_core_no_default.rs:3:1 + | +3 | #[tokio::main] + | ^^^^^^^^^^^^^^ + | + = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/tests-build/tests/fail/macros_invalid_input.stderr b/tests-build/tests/fail/macros_invalid_input.stderr index 96fdcb17..4c68bd93 100644 --- a/tests-build/tests/fail/macros_invalid_input.stderr +++ b/tests-build/tests/fail/macros_invalid_input.stderr @@ -4,7 +4,7 @@ error: the async keyword is missing from the function declaration 4 | fn main_is_not_async() {} | ^^ -error: Unknown attribute foo is specified; expected `basic_scheduler` or `threaded_scheduler` +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads` --> $DIR/macros_invalid_input.rs:6:15 | 6 | #[tokio::main(foo)] @@ -28,7 +28,7 @@ error: the test function cannot accept arguments 16 | async fn test_fn_has_args(_x: u8) {} | ^^^^^^ -error: Unknown attribute foo is specified; expected `basic_scheduler` or `threaded_scheduler` +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads` --> $DIR/macros_invalid_input.rs:18:15 | 18 | #[tokio::test(foo)] diff --git a/tests-build/tests/macros.rs b/tests-build/tests/macros.rs index 170db227..a12a20ef 100644 --- a/tests-build/tests/macros.rs +++ b/tests-build/tests/macros.rs @@ -1,9 +1,12 @@ #[test] -fn compile_fail() { +fn compile_fail_full() { let t = trybuild::TestCases::new(); #[cfg(feature = "full")] t.compile_fail("tests/fail/macros_invalid_input.rs"); + #[cfg(all(feature = "rt-core", not(feature = "full")))] + t.compile_fail("tests/fail/macros_core_no_default.rs"); + drop(t); } diff --git a/tests-integration/tests/macros_main.rs b/tests-integration/tests/macros_main.rs index 42f5be3b..868adb0f 100644 --- a/tests-integration/tests/macros_main.rs +++ b/tests-integration/tests/macros_main.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "macros")] +#![cfg(all(feature = "macros", feature = "rt-core"))] #[tokio::main] async fn basic_main() -> usize { @@ -10,18 +10,15 @@ async fn generic_fun() -> T { T::default() } -#[cfg(feature = "rt-core")] -mod spawn { - #[tokio::main] - async fn spawning() -> usize { - let join = tokio::spawn(async { 1 }); - join.await.unwrap() - } +#[tokio::main] +async fn spawning() -> usize { + let join = tokio::spawn(async { 1 }); + join.await.unwrap() +} - #[test] - fn main_with_spawn() { - assert_eq!(1, spawning()); - } +#[test] +fn main_with_spawn() { + assert_eq!(1, spawning()); } #[test] diff --git a/tests-integration/tests/rt_shell.rs b/tests-integration/tests/rt_shell.rs deleted file mode 100644 index 012f44a7..00000000 --- a/tests-integration/tests/rt_shell.rs +++ /dev/null @@ -1,32 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(feature = "sync")] - -use tokio::runtime; -use tokio::sync::oneshot; - -use std::sync::mpsc; -use std::thread; - -#[test] -fn basic_shell_rt() { - let (feed_tx, feed_rx) = mpsc::channel::>(); - - let th = thread::spawn(move || { - for tx in feed_rx.iter() { - tx.send(()).unwrap(); - } - }); - - for _ in 0..1_000 { - let rt = runtime::Builder::new().build().unwrap(); - - let (tx, rx) = oneshot::channel(); - - feed_tx.send(tx).unwrap(); - - rt.block_on(rx).unwrap(); - } - - drop(feed_tx); - th.join().unwrap(); -} diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index 2681f50d..f3c7c230 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -1,13 +1,139 @@ use proc_macro::TokenStream; +use proc_macro2::Span; use quote::quote; -use std::num::NonZeroUsize; +use syn::spanned::Spanned; #[derive(Clone, Copy, PartialEq)] -enum Runtime { - Basic, +enum RuntimeFlavor { + CurrentThread, Threaded, } +impl RuntimeFlavor { + fn from_str(s: &str) -> Result { + match s { + "current_thread" => Ok(RuntimeFlavor::CurrentThread), + "multi_thread" => Ok(RuntimeFlavor::Threaded), + "single_thread" => Err("The single threaded runtime flavor is called `current_thread`.".to_string()), + "basic_scheduler" => Err("The `basic_scheduler` runtime flavor has been renamed to `current_thread`.".to_string()), + "threaded_scheduler" => Err("The `threaded_scheduler` runtime flavor has been renamed to `multi_thread`.".to_string()), + _ => Err(format!("No such runtime flavor `{}`. The runtime flavors are `current_thread` and `multi_thread`.", s)), + } + } +} + +struct FinalConfig { + flavor: RuntimeFlavor, + worker_threads: Option, +} + +struct Configuration { + rt_threaded_available: bool, + default_flavor: RuntimeFlavor, + flavor: Option, + worker_threads: Option<(usize, Span)>, +} + +impl Configuration { + fn new(is_test: bool, rt_threaded: bool) -> Self { + Configuration { + rt_threaded_available: rt_threaded, + default_flavor: match is_test { + true => RuntimeFlavor::CurrentThread, + false => RuntimeFlavor::Threaded, + }, + flavor: None, + worker_threads: None, + } + } + + fn set_flavor(&mut self, runtime: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.flavor.is_some() { + return Err(syn::Error::new(span, "`flavor` set multiple times.")); + } + + let runtime_str = parse_string(runtime, span, "flavor")?; + let runtime = + RuntimeFlavor::from_str(&runtime_str).map_err(|err| syn::Error::new(span, err))?; + self.flavor = Some(runtime); + Ok(()) + } + + fn set_worker_threads( + &mut self, + worker_threads: syn::Lit, + span: Span, + ) -> Result<(), syn::Error> { + if self.worker_threads.is_some() { + return Err(syn::Error::new( + span, + "`worker_threads` set multiple times.", + )); + } + + let worker_threads = parse_int(worker_threads, span, "worker_threads")?; + if worker_threads == 0 { + return Err(syn::Error::new(span, "`worker_threads` may not be 0.")); + } + self.worker_threads = Some((worker_threads, span)); + Ok(()) + } + + fn build(&self) -> Result { + let flavor = self.flavor.unwrap_or(self.default_flavor); + use RuntimeFlavor::*; + match (flavor, self.worker_threads) { + (CurrentThread, Some((_, worker_threads_span))) => Err(syn::Error::new( + worker_threads_span, + "The `worker_threads` option requires the `multi_thread` runtime flavor.", + )), + (CurrentThread, None) => Ok(FinalConfig { + flavor, + worker_threads: None, + }), + (Threaded, worker_threads) if self.rt_threaded_available => Ok(FinalConfig { + flavor, + worker_threads: worker_threads.map(|(val, _span)| val), + }), + (Threaded, _) => { + let msg = if self.flavor.is_none() { + "The default runtime flavor is `multi_thread`, but the `rt-threaded` feature is disabled." + } else { + "The runtime flavor `multi_thread` requires the `rt-threaded` feature." + }; + Err(syn::Error::new(Span::call_site(), msg)) + } + } + } +} + +fn parse_int(int: syn::Lit, span: Span, field: &str) -> Result { + match int { + syn::Lit::Int(lit) => match lit.base10_parse::() { + Ok(value) => Ok(value), + Err(e) => Err(syn::Error::new( + span, + format!("Failed to parse {} as integer: {}", field, e), + )), + }, + _ => Err(syn::Error::new( + span, + format!("Failed to parse {} as integer.", field), + )), + } +} + +fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result { + match int { + syn::Lit::Str(s) => Ok(s.value()), + syn::Lit::Verbatim(s) => Ok(s.to_string()), + _ => Err(syn::Error::new( + span, + format!("Failed to parse {} as string.", field), + )), + } +} + fn parse_knobs( mut input: syn::ItemFn, args: syn::AttributeArgs, @@ -26,9 +152,12 @@ fn parse_knobs( sig.asyncness = None; - let mut runtime = None; - let mut core_threads = None; - let mut max_threads = None; + let macro_name = if is_test { + "tokio::test" + } else { + "tokio::main" + }; + let mut config = Configuration::new(is_test, rt_threaded); for arg in args { match arg { @@ -39,65 +168,18 @@ fn parse_knobs( return Err(syn::Error::new_spanned(namevalue, msg)); } match ident.unwrap().to_string().to_lowercase().as_str() { + "worker_threads" => { + config.set_worker_threads(namevalue.lit.clone(), namevalue.span())?; + } + "flavor" => { + config.set_flavor(namevalue.lit.clone(), namevalue.span())?; + } "core_threads" => { - if rt_threaded { - match &namevalue.lit { - syn::Lit::Int(expr) => { - let num = expr.base10_parse::().unwrap(); - if num.get() > 1 { - runtime = Some(Runtime::Threaded); - } else { - runtime = Some(Runtime::Basic); - } - - if let Some(v) = max_threads { - if v < num { - return Err(syn::Error::new_spanned( - namevalue, - "max_threads cannot be less than core_threads", - )); - } - } - - core_threads = Some(num); - } - _ => { - return Err(syn::Error::new_spanned( - namevalue, - "core_threads argument must be an int", - )) - } - } - } else { - return Err(syn::Error::new_spanned( - namevalue, - "core_threads can only be set with rt-threaded feature flag enabled", - )); - } + let msg = "Attribute `core_threads` is renamed to `worker_threads`"; + return Err(syn::Error::new_spanned(namevalue, msg)); } - "max_threads" => match &namevalue.lit { - syn::Lit::Int(expr) => { - let num = expr.base10_parse::().unwrap(); - - if let Some(v) = core_threads { - if num < v { - return Err(syn::Error::new_spanned( - namevalue, - "max_threads cannot be less than core_threads", - )); - } - } - max_threads = Some(num); - } - _ => { - return Err(syn::Error::new_spanned( - namevalue, - "max_threads argument must be an int", - )) - } - }, name => { - let msg = format!("Unknown attribute pair {} is specified; expected one of: `core_threads`, `max_threads`", name); + let msg = format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `max_threads`", name); return Err(syn::Error::new_spanned(namevalue, msg)); } } @@ -108,16 +190,22 @@ fn parse_knobs( let msg = "Must have specified ident"; return Err(syn::Error::new_spanned(path, msg)); } - match ident.unwrap().to_string().to_lowercase().as_str() { - "threaded_scheduler" => { - runtime = Some(runtime.unwrap_or_else(|| Runtime::Threaded)) - } - "basic_scheduler" => runtime = Some(runtime.unwrap_or_else(|| Runtime::Basic)), + let name = ident.unwrap().to_string().to_lowercase(); + let msg = match name.as_str() { + "threaded_scheduler" | "multi_thread" => { + format!("Set the runtime flavor with #[{}(flavor = \"multi_thread\")].", macro_name) + }, + "basic_scheduler" | "current_thread" | "single_threaded" => { + format!("Set the runtime flavor with #[{}(flavor = \"current_thread\")].", macro_name) + }, + "flavor" | "worker_threads" => { + format!("The `{}` attribute requires an argument.", name) + }, name => { - let msg = format!("Unknown attribute {} is specified; expected `basic_scheduler` or `threaded_scheduler`", name); - return Err(syn::Error::new_spanned(path, msg)); - } - } + format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`", name) + }, + }; + return Err(syn::Error::new_spanned(path, msg)); } other => { return Err(syn::Error::new_spanned( @@ -128,15 +216,18 @@ fn parse_knobs( } } - let mut rt = quote! { tokio::runtime::Builder::new().basic_scheduler() }; - if rt_threaded && (runtime == Some(Runtime::Threaded) || (runtime.is_none() && !is_test)) { - rt = quote! { #rt.threaded_scheduler() }; - } - if let Some(v) = core_threads.map(|v| v.get()) { - rt = quote! { #rt.core_threads(#v) }; - } - if let Some(v) = max_threads.map(|v| v.get()) { - rt = quote! { #rt.max_threads(#v) }; + let config = config.build()?; + + let mut rt = match config.flavor { + RuntimeFlavor::CurrentThread => quote! { + tokio::runtime::Builder::new_current_thread() + }, + RuntimeFlavor::Threaded => quote! { + tokio::runtime::Builder::new_multi_thread() + }, + }; + if let Some(v) = config.worker_threads { + rt = quote! { #rt.worker_threads(#v) }; } let header = { @@ -171,7 +262,7 @@ pub(crate) fn main(args: TokenStream, item: TokenStream, rt_threaded: bool) -> T if input.sig.ident == "main" && !input.sig.inputs.is_empty() { let msg = "the main function cannot accept arguments"; - return syn::Error::new_spanned(&input.sig.inputs, msg) + return syn::Error::new_spanned(&input.sig.ident, msg) .to_compile_error() .into(); } @@ -201,159 +292,3 @@ pub(crate) fn test(args: TokenStream, item: TokenStream, rt_threaded: bool) -> T parse_knobs(input, args, true, rt_threaded).unwrap_or_else(|e| e.to_compile_error().into()) } - -pub(crate) mod old { - use proc_macro::TokenStream; - use quote::quote; - - enum Runtime { - Basic, - Threaded, - Auto, - } - - #[cfg(not(test))] // Work around for rust-lang/rust#62127 - pub(crate) fn main(args: TokenStream, item: TokenStream) -> TokenStream { - let mut input = syn::parse_macro_input!(item as syn::ItemFn); - let args = syn::parse_macro_input!(args as syn::AttributeArgs); - - let sig = &mut input.sig; - let name = &sig.ident; - let inputs = &sig.inputs; - let body = &input.block; - let attrs = &input.attrs; - let vis = input.vis; - - if sig.asyncness.is_none() { - let msg = "the async keyword is missing from the function declaration"; - return syn::Error::new_spanned(sig.fn_token, msg) - .to_compile_error() - .into(); - } else if name == "main" && !inputs.is_empty() { - let msg = "the main function cannot accept arguments"; - return syn::Error::new_spanned(&sig.inputs, msg) - .to_compile_error() - .into(); - } - - sig.asyncness = None; - - let mut runtime = Runtime::Auto; - - for arg in args { - if let syn::NestedMeta::Meta(syn::Meta::Path(path)) = arg { - let ident = path.get_ident(); - if ident.is_none() { - let msg = "Must have specified ident"; - return syn::Error::new_spanned(path, msg).to_compile_error().into(); - } - match ident.unwrap().to_string().to_lowercase().as_str() { - "threaded_scheduler" => runtime = Runtime::Threaded, - "basic_scheduler" => runtime = Runtime::Basic, - name => { - let msg = format!("Unknown attribute {} is specified; expected `basic_scheduler` or `threaded_scheduler`", name); - return syn::Error::new_spanned(path, msg).to_compile_error().into(); - } - } - } - } - - let result = match runtime { - Runtime::Threaded | Runtime::Auto => quote! { - #(#attrs)* - #vis #sig { - tokio::runtime::Runtime::new().unwrap().block_on(async { #body }) - } - }, - Runtime::Basic => quote! { - #(#attrs)* - #vis #sig { - tokio::runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap() - .block_on(async { #body }) - } - }, - }; - - result.into() - } - - pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream { - let input = syn::parse_macro_input!(item as syn::ItemFn); - let args = syn::parse_macro_input!(args as syn::AttributeArgs); - - let ret = &input.sig.output; - let name = &input.sig.ident; - let body = &input.block; - let attrs = &input.attrs; - let vis = input.vis; - - for attr in attrs { - if attr.path.is_ident("test") { - let msg = "second test attribute is supplied"; - return syn::Error::new_spanned(&attr, msg) - .to_compile_error() - .into(); - } - } - - if input.sig.asyncness.is_none() { - let msg = "the async keyword is missing from the function declaration"; - return syn::Error::new_spanned(&input.sig.fn_token, msg) - .to_compile_error() - .into(); - } else if !input.sig.inputs.is_empty() { - let msg = "the test function cannot accept arguments"; - return syn::Error::new_spanned(&input.sig.inputs, msg) - .to_compile_error() - .into(); - } - - let mut runtime = Runtime::Auto; - - for arg in args { - if let syn::NestedMeta::Meta(syn::Meta::Path(path)) = arg { - let ident = path.get_ident(); - if ident.is_none() { - let msg = "Must have specified ident"; - return syn::Error::new_spanned(path, msg).to_compile_error().into(); - } - match ident.unwrap().to_string().to_lowercase().as_str() { - "threaded_scheduler" => runtime = Runtime::Threaded, - "basic_scheduler" => runtime = Runtime::Basic, - name => { - let msg = format!("Unknown attribute {} is specified; expected `basic_scheduler` or `threaded_scheduler`", name); - return syn::Error::new_spanned(path, msg).to_compile_error().into(); - } - } - } - } - - let result = match runtime { - Runtime::Threaded => quote! { - #[::core::prelude::v1::test] - #(#attrs)* - #vis fn #name() #ret { - tokio::runtime::Runtime::new().unwrap().block_on(async { #body }) - } - }, - Runtime::Basic | Runtime::Auto => quote! { - #[::core::prelude::v1::test] - #(#attrs)* - #vis fn #name() #ret { - tokio::runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap() - .block_on(async { #body }) - } - }, - }; - - result.into() - } -} diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 09733ba5..1d6f577a 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -24,23 +24,38 @@ mod select; use proc_macro::TokenStream; -/// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` -/// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or +/// Marks async function to be executed by the selected runtime. This macro helps +/// set up a `Runtime` without requiring the user to use +/// [Runtime](../tokio/runtime/struct.Runtime.html) or /// [Builder](../tokio/runtime/struct.Builder.html) directly. /// -/// Note: This macro is designed to be simplistic and targets applications that do not require -/// a complex setup. If provided functionality is not sufficient, user may be interested in -/// using [Builder](../tokio/runtime/struct.Builder.html), which provides a more powerful -/// interface. +/// Note: This macro is designed to be simplistic and targets applications that +/// do not require a complex setup. If the provided functionality is not +/// sufficient, you may be interested in using +/// [Builder](../tokio/runtime/struct.Builder.html), which provides a more +/// powerful interface. /// -/// ## Options: +/// # Multi-threaded runtime /// -/// If you want to set the number of worker threads used for asynchronous code, use the -/// `core_threads` option. +/// To use the multi-threaded runtime, the macro can be configured using /// -/// - `core_threads=n` - Sets core threads to `n` (requires `rt-threaded` feature). -/// - `max_threads=n` - Sets max threads to `n` (requires `rt-core` or `rt-threaded` feature). -/// - `basic_scheduler` - Use the basic schduler (requires `rt-core`). +/// ``` +/// #[tokio::main(flavor = "multi_thread", worker_threads = 10)] +/// # async fn main() {} +/// ``` +/// +/// The `worker_threads` option configures the number of worker threads, and +/// defaults to the number of cpus on the system. This is the default flavor. +/// +/// # Current thread runtime +/// +/// To use the single-threaded runtime known as the `current_thread` runtime, +/// the macro can be configured using +/// +/// ``` +/// #[tokio::main(flavor = "current_thread")] +/// # async fn main() {} +/// ``` /// /// ## Function arguments: /// @@ -48,7 +63,7 @@ use proc_macro::TokenStream; /// /// ## Usage /// -/// ### Using default +/// ### Using the multi-thread runtime /// /// ```rust /// #[tokio::main] @@ -61,8 +76,7 @@ use proc_macro::TokenStream; /// /// ```rust /// fn main() { -/// tokio::runtime::Builder::new() -/// .threaded_scheduler() +/// tokio::runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap() @@ -72,12 +86,12 @@ use proc_macro::TokenStream; /// } /// ``` /// -/// ### Using basic scheduler +/// ### Using current thread runtime /// /// The basic scheduler is single-threaded. /// /// ```rust -/// #[tokio::main(basic_scheduler)] +/// #[tokio::main(flavor = "current_thread")] /// async fn main() { /// println!("Hello world"); /// } @@ -87,8 +101,7 @@ use proc_macro::TokenStream; /// /// ```rust /// fn main() { -/// tokio::runtime::Builder::new() -/// .basic_scheduler() +/// tokio::runtime::Builder::new_current_thread() /// .enable_all() /// .build() /// .unwrap() @@ -98,89 +111,10 @@ use proc_macro::TokenStream; /// } /// ``` /// -/// ### Set number of core threads -/// -/// ```rust -/// #[tokio::main(core_threads = 2)] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[tokio::main]` -/// -/// ```rust -/// fn main() { -/// tokio::runtime::Builder::new() -/// .threaded_scheduler() -/// .core_threads(2) -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// ### NOTE: -/// -/// If you rename the tokio crate in your dependencies this macro -/// will not work. If you must rename the 0.2 version of tokio because -/// you're also using the 0.1 version of tokio, you _must_ make the -/// tokio 0.2 crate available as `tokio` in the module where this -/// macro is expanded. -#[proc_macro_attribute] -#[cfg(not(test))] // Work around for rust-lang/rust#62127 -pub fn main_threaded(args: TokenStream, item: TokenStream) -> TokenStream { - entry::main(args, item, true) -} - -/// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` -/// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or -/// [Builder](../tokio/runtime/struct.Builder.html) directly. -/// -/// Note: This macro is designed to be simplistic and targets applications that do not require -/// a complex setup. If provided functionality is not sufficient, user may be interested in -/// using [Builder](../tokio/runtime/struct.Builder.html), which provides a more powerful -/// interface. -/// -/// ## Options: -/// -/// - `basic_scheduler` - All tasks are executed on the current thread. -/// - `threaded_scheduler` - Uses the multi-threaded scheduler. Used by default (requires `rt-threaded` feature). -/// -/// ## Function arguments: -/// -/// Arguments are allowed for any functions aside from `main` which is special -/// -/// ## Usage -/// -/// ### Using default -/// -/// ```rust -/// #[tokio::main] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[tokio::main]` -/// -/// ```rust -/// fn main() { -/// tokio::runtime::Runtime::new() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// ### Select runtime +/// ### Set number of worker threads /// /// ```rust -/// #[tokio::main(basic_scheduler)] +/// #[tokio::main(worker_threads = 2)] /// async fn main() { /// println!("Hello world"); /// } @@ -190,8 +124,8 @@ pub fn main_threaded(args: TokenStream, item: TokenStream) -> TokenStream { /// /// ```rust /// fn main() { -/// tokio::runtime::Builder::new() -/// .basic_scheduler() +/// tokio::runtime::Builder::new_multi_thread() +/// .worker_threads(2) /// .enable_all() /// .build() /// .unwrap() @@ -203,25 +137,20 @@ pub fn main_threaded(args: TokenStream, item: TokenStream) -> TokenStream { /// /// ### NOTE: /// -/// If you rename the tokio crate in your dependencies this macro -/// will not work. If you must rename the 0.2 version of tokio because -/// you're also using the 0.1 version of tokio, you _must_ make the -/// tokio 0.2 crate available as `tokio` in the module where this -/// macro is expanded. +/// If you rename the tokio crate in your dependencies this macro will not work. +/// If you must rename the 0.2 version of tokio because you're also using the +/// 0.1 version of tokio, you _must_ make the tokio 0.2 crate available as +/// `tokio` in the module where this macro is expanded. #[proc_macro_attribute] #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { - entry::old::main(args, item) + entry::main(args, item, true) } /// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` /// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or /// [Builder](../tokio/runtime/struct.builder.html) directly. /// -/// ## Options: -/// -/// - `max_threads=n` - Sets max threads to `n`. -/// /// ## Function arguments: /// /// Arguments are allowed for any functions aside from `main` which is special @@ -231,7 +160,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { /// ### Using default /// /// ```rust -/// #[tokio::main] +/// #[tokio::main(flavor = "current_thread")] /// async fn main() { /// println!("Hello world"); /// } @@ -241,8 +170,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { /// /// ```rust /// fn main() { -/// tokio::runtime::Builder::new() -/// .basic_scheduler() +/// tokio::runtime::Builder::new_current_thread() /// .enable_all() /// .build() /// .unwrap() @@ -261,23 +189,18 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { /// macro is expanded. #[proc_macro_attribute] #[cfg(not(test))] // Work around for rust-lang/rust#62127 -pub fn main_basic(args: TokenStream, item: TokenStream) -> TokenStream { +pub fn main_rt_core(args: TokenStream, item: TokenStream) -> TokenStream { entry::main(args, item, false) } /// Marks async function to be executed by runtime, suitable to test environment /// -/// ## Options: -/// -/// - `core_threads=n` - Sets core threads to `n` (requires `rt-threaded` feature). -/// - `max_threads=n` - Sets max threads to `n` (requires `rt-core` or `rt-threaded` feature). -/// /// ## Usage /// -/// ### Select runtime +/// ### Multi-thread runtime /// /// ```no_run -/// #[tokio::test(core_threads = 1)] +/// #[tokio::test(flavor = "multi_thread", worker_threads = 1)] /// async fn my_test() { /// assert!(true); /// } @@ -285,6 +208,8 @@ pub fn main_basic(args: TokenStream, item: TokenStream) -> TokenStream { /// /// ### Using default /// +/// The default test runtime is multi-threaded. +/// /// ```no_run /// #[tokio::test] /// async fn my_test() { @@ -300,7 +225,7 @@ pub fn main_basic(args: TokenStream, item: TokenStream) -> TokenStream { /// tokio 0.2 crate available as `tokio` in the module where this /// macro is expanded. #[proc_macro_attribute] -pub fn test_threaded(args: TokenStream, item: TokenStream) -> TokenStream { +pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { entry::test(args, item, true) } @@ -308,22 +233,10 @@ pub fn test_threaded(args: TokenStream, item: TokenStream) -> TokenStream { /// /// ## Options: /// -/// - `basic_scheduler` - All tasks are executed on the current thread. Used by default. -/// - `threaded_scheduler` - Use multi-threaded scheduler (requires `rt-threaded` feature). +/// - `max_threads=n` - Sets max threads to `n`. /// /// ## Usage /// -/// ### Select runtime -/// -/// ```no_run -/// #[tokio::test(threaded_scheduler)] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// ### Using default -/// /// ```no_run /// #[tokio::test] /// async fn my_test() { @@ -339,35 +252,36 @@ pub fn test_threaded(args: TokenStream, item: TokenStream) -> TokenStream { /// tokio 0.2 crate available as `tokio` in the module where this /// macro is expanded. #[proc_macro_attribute] -pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { - entry::old::test(args, item) +pub fn test_rt_core(args: TokenStream, item: TokenStream) -> TokenStream { + entry::test(args, item, false) } -/// Marks async function to be executed by runtime, suitable to test environment -/// -/// ## Options: -/// -/// - `max_threads=n` - Sets max threads to `n`. -/// -/// ## Usage -/// -/// ```no_run -/// #[tokio::test] -/// async fn my_test() { -/// assert!(true); -/// } +/// Always fails with the error message below. +/// ```text +/// The #[tokio::main] macro requires rt-core or rt-threaded. /// ``` -/// -/// ### NOTE: -/// -/// If you rename the tokio crate in your dependencies this macro -/// will not work. If you must rename the 0.2 version of tokio because -/// you're also using the 0.1 version of tokio, you _must_ make the -/// tokio 0.2 crate available as `tokio` in the module where this -/// macro is expanded. #[proc_macro_attribute] -pub fn test_basic(args: TokenStream, item: TokenStream) -> TokenStream { - entry::test(args, item, false) +pub fn main_fail(_args: TokenStream, _item: TokenStream) -> TokenStream { + syn::Error::new( + proc_macro2::Span::call_site(), + "The #[tokio::main] macro requires rt-core or rt-threaded.", + ) + .to_compile_error() + .into() +} + +/// Always fails with the error message below. +/// ```text +/// The #[tokio::test] macro requires rt-core or rt-threaded. +/// ``` +#[proc_macro_attribute] +pub fn test_fail(_args: TokenStream, _item: TokenStream) -> TokenStream { + syn::Error::new( + proc_macro2::Span::call_site(), + "The #[tokio::test] macro requires rt-core or rt-threaded.", + ) + .to_compile_error() + .into() } /// Implementation detail of the `select!` macro. This macro is **not** intended diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index cfbf80ce..a20d0acd 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -28,8 +28,7 @@ pub mod task; pub fn block_on(future: F) -> F::Output { use tokio::runtime; - let rt = runtime::Builder::new() - .basic_scheduler() + let rt = runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 8c54f27b..f6007ce2 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -31,6 +31,7 @@ compat = ["futures-io",] codec = ["tokio/stream"] time = ["tokio/time","slab"] io = [] +rt-core = ["tokio/rt-core"] [dependencies] tokio = { version = "0.3.0", path = "../tokio" } diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index f9176747..a848223f 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -39,3 +39,13 @@ macro_rules! cfg_io { )* } } + +macro_rules! cfg_rt_core { + ($($item:item)*) => { + $( + #[cfg(feature = "rt-core")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))] + $item + )* + } +} diff --git a/tokio-util/src/codec/bytes_codec.rs b/tokio-util/src/codec/bytes_codec.rs index a5e73749..275031c0 100644 --- a/tokio-util/src/codec/bytes_codec.rs +++ b/tokio-util/src/codec/bytes_codec.rs @@ -33,7 +33,7 @@ use std::io; /// # } /// # } /// # -/// # #[tokio::main(core_threads = 1)] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), std::io::Error> { /// let my_async_read = File::open("filename.txt").await?; /// let my_stream_of_bytes = FramedRead::new(my_async_read, BytesCodec::new()); diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 5f6c6b9b..ae954d85 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -48,14 +48,14 @@ pub trait RuntimeExt { /// use tokio_util::context::RuntimeExt; /// use tokio::time::{sleep, Duration}; /// - /// let rt = tokio::runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = tokio::runtime::Builder::new_multi_thread() /// .enable_all() - /// .build().unwrap(); + /// .build() + /// .unwrap(); /// - /// let rt2 = tokio::runtime::Builder::new() - /// .threaded_scheduler() - /// .build().unwrap(); + /// let rt2 = tokio::runtime::Builder::new_multi_thread() + /// .build() + /// .unwrap(); /// /// let fut = sleep(Duration::from_millis(2)); /// diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 31a16d05..55fd67eb 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -47,7 +47,9 @@ cfg_io! { pub mod io; } -pub mod context; +cfg_rt_core! { + pub mod context; +} pub mod sync; diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index ee519130..852dcd0b 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "rt-core")] #![warn(rust_2018_idioms)] use tokio::runtime::Builder; @@ -6,15 +7,13 @@ use tokio_util::context::RuntimeExt; #[test] fn tokio_context_with_another_runtime() { - let rt1 = Builder::new() - .threaded_scheduler() - .core_threads(1) + let rt1 = Builder::new_multi_thread() + .worker_threads(1) // no timer! .build() .unwrap(); - let rt2 = Builder::new() - .threaded_scheduler() - .core_threads(1) + let rt2 = Builder::new_multi_thread() + .worker_threads(1) .enable_all() .build() .unwrap(); diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4d5f833c..cb407f93 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -30,7 +30,6 @@ default = [] # enable everything full = [ - "blocking", "dns", "fs", "io-util", @@ -47,12 +46,11 @@ full = [ "time", ] -blocking = ["rt-core"] -dns = ["rt-core"] -fs = ["rt-core", "io-util"] +dns = [] +fs = [] io-util = ["memchr"] # stdin, stdout, stderr -io-std = ["rt-core"] +io-std = [] macros = ["tokio-macros"] net = ["dns", "tcp", "udp", "uds"] process = [ diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs new file mode 100644 index 00000000..d6ef5915 --- /dev/null +++ b/tokio/src/blocking.rs @@ -0,0 +1,48 @@ +cfg_rt_core! { + pub(crate) use crate::runtime::spawn_blocking; + pub(crate) use crate::task::JoinHandle; +} + +cfg_not_rt_core! { + use std::fmt; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pub(crate) fn spawn_blocking(_f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + assert_send_sync::>>(); + panic!("requires the `rt-core` Tokio feature flag") + + } + + pub(crate) struct JoinHandle { + _p: std::marker::PhantomData, + } + + unsafe impl Send for JoinHandle {} + unsafe impl Sync for JoinHandle {} + + impl Future for JoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + unreachable!() + } + } + + impl fmt::Debug for JoinHandle + where + T: fmt::Debug, + { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } + } + + fn assert_send_sync() { + } +} diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 27e969c5..f6cca1c5 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + //! Opt-in yield points for improved cooperative scheduling. //! //! A single call to [`poll`] on a top-level task may potentially do a lot of @@ -96,14 +98,6 @@ pub(crate) fn budget(f: impl FnOnce() -> R) -> R { with_budget(Budget::initial(), f) } -cfg_rt_threaded! { - /// Set the current task's budget - #[cfg(feature = "blocking")] - pub(crate) fn set(budget: Budget) { - CURRENT.with(|cell| cell.set(budget)) - } -} - #[inline(always)] fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { struct ResetGuard<'a> { @@ -129,13 +123,18 @@ fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { } cfg_rt_threaded! { + /// Set the current task's budget + pub(crate) fn set(budget: Budget) { + CURRENT.with(|cell| cell.set(budget)) + } + #[inline(always)] pub(crate) fn has_budget_remaining() -> bool { CURRENT.with(|cell| cell.get().has_remaining()) } } -cfg_blocking_impl! { +cfg_rt_core! { /// Forcibly remove the budgeting constraints early. /// /// Returns the remaining budget diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index a2b062b1..d2757a5f 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -107,6 +107,6 @@ mod sys { pub(crate) use std::fs::File; // TODO: don't rename - pub(crate) use crate::runtime::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::blocking::JoinHandle as Blocking; } diff --git a/tokio/src/future/block_on.rs b/tokio/src/future/block_on.rs new file mode 100644 index 00000000..9fc7abc6 --- /dev/null +++ b/tokio/src/future/block_on.rs @@ -0,0 +1,15 @@ +use std::future::Future; + +cfg_rt_core! { + pub(crate) fn block_on(f: F) -> F::Output { + let mut e = crate::runtime::enter::enter(false); + e.block_on(f).unwrap() + } +} + +cfg_not_rt_core! { + pub(crate) fn block_on(f: F) -> F::Output { + let mut park = crate::park::thread::CachedParkThread::new(); + park.block_on(f).unwrap() + } +} diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index 770753f3..f7d93c98 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -1,15 +1,24 @@ -#![allow(unused_imports, dead_code)] +#![cfg_attr(not(feature = "macros"), allow(unreachable_pub))] //! Asynchronous values. -mod maybe_done; -pub use maybe_done::{maybe_done, MaybeDone}; +#[cfg(any(feature = "macros", feature = "process"))] +pub(crate) mod maybe_done; mod poll_fn; pub use poll_fn::poll_fn; -mod ready; -pub(crate) use ready::{ok, Ready}; +cfg_not_loom! { + mod ready; + pub(crate) use ready::{ok, Ready}; +} -mod try_join; -pub(crate) use try_join::try_join3; +cfg_process! { + mod try_join; + pub(crate) use try_join::try_join3; +} + +cfg_sync! { + mod block_on; + pub(crate) use block_on::block_on; +} diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs index 9b3d1370..0169bd5f 100644 --- a/tokio/src/future/poll_fn.rs +++ b/tokio/src/future/poll_fn.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + //! Definition of the `PollFn` adapter combinator use std::fmt; diff --git a/tokio/src/future/try_join.rs b/tokio/src/future/try_join.rs index 5bd80dc8..8943f61a 100644 --- a/tokio/src/future/try_join.rs +++ b/tokio/src/future/try_join.rs @@ -1,4 +1,4 @@ -use crate::future::{maybe_done, MaybeDone}; +use crate::future::maybe_done::{maybe_done, MaybeDone}; use pin_project_lite::pin_project; use std::future::Future; diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index c4f5887a..0d4133a5 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "rt-core"), allow(dead_code))] + mod ready; use ready::Ready; @@ -5,7 +7,6 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::park::{Park, Unpark}; -use crate::runtime::context; use crate::util::bit; use crate::util::slab::{self, Slab}; @@ -218,17 +219,36 @@ impl fmt::Debug for Driver { // ===== impl Handle ===== -impl Handle { - /// Returns a handle to the current reactor - /// - /// # Panics - /// - /// This function panics if there is no current reactor set. - pub(super) fn current() -> Self { - context::io_handle() - .expect("there is no reactor running, must be called from the context of Tokio runtime") +cfg_rt_core! { + impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set and `rt-core` feature + /// flag is not enabled. + pub(super) fn current() -> Self { + crate::runtime::context::io_handle() + .expect("there is no reactor running, must be called from the context of Tokio runtime") + } } +} +cfg_not_rt_core! { + impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt-core` + /// feature flag is not enabled. + pub(super) fn current() -> Self { + panic!("there is no reactor running, must be called from the context of Tokio runtime with `rt-core` enabled.") + } + } +} + +impl Handle { /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. /// diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index e1a036fb..62728ac1 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -250,7 +250,7 @@ cfg_io_blocking! { /// Types in this module can be mocked out in tests. mod sys { // TODO: don't rename - pub(crate) use crate::runtime::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::blocking::JoinHandle as Blocking; } } diff --git a/tokio/src/io/stdio_common.rs b/tokio/src/io/stdio_common.rs index 03800fcb..d21c842f 100644 --- a/tokio/src/io/stdio_common.rs +++ b/tokio/src/io/stdio_common.rs @@ -183,8 +183,7 @@ mod tests { let fut = async move { wr.write_all(data.as_bytes()).await.unwrap(); }; - crate::runtime::Builder::new() - .basic_scheduler() + crate::runtime::Builder::new_current_thread() .build() .unwrap() .block_on(fut); @@ -200,8 +199,7 @@ mod tests { data.extend(std::iter::repeat(0b1010_1010).take(MAX_BUF - checked_count + 1)); let mut writer = LoggingMockWriter::new(); let mut splitter = super::SplitByUtf8BoundaryIfWindows::new(&mut writer); - crate::runtime::Builder::new() - .basic_scheduler() + crate::runtime::Builder::new_current_thread() .build() .unwrap() .block_on(async { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index cd05cb55..1334eb88 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -348,8 +348,7 @@ cfg_fs! { pub mod fs; } -#[doc(hidden)] -pub mod future; +mod future; pub mod io; pub mod net; @@ -363,7 +362,14 @@ cfg_process! { pub mod process; } -pub mod runtime; +#[cfg(any(feature = "dns", feature = "fs", feature = "io-std"))] +mod blocking; + +cfg_rt_core! { + pub mod runtime; +} +#[cfg(all(not(feature = "rt-core"), feature = "rt-util"))] +mod runtime; pub(crate) mod coop; @@ -389,8 +395,8 @@ cfg_not_sync! { mod sync; } +pub mod task; cfg_rt_core! { - pub mod task; pub use task::spawn; } @@ -414,24 +420,24 @@ cfg_macros! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] - pub use tokio_macros::main_threaded as main; + pub use tokio_macros::main; #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] - pub use tokio_macros::test_threaded as test; + pub use tokio_macros::test; } cfg_not_rt_threaded! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 - pub use tokio_macros::main_basic as main; - pub use tokio_macros::test_basic as test; + pub use tokio_macros::main_rt_core as main; + pub use tokio_macros::test_rt_core as test; } } - // Maintains old behavior + // Always fail if rt-core is not enabled. cfg_not_rt_core! { #[cfg(not(test))] - pub use tokio_macros::main; - pub use tokio_macros::test; + pub use tokio_macros::main_fail as main; + pub use tokio_macros::test_fail as test; } } diff