diff options
author | Lucio Franco <luciofranco14@gmail.com> | 2020-10-12 13:44:54 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-12 13:44:54 -0400 |
commit | 8880222036f37c6204c8466f25e828447f16dacb (patch) | |
tree | fd623afc20f73bbce65746a3d1b1b2731ecf30a5 /tokio | |
parent | 0893841f31542b2b04c5050a8a4a3c45cf867e55 (diff) |
rt: Remove `threaded_scheduler()` and `basic_scheduler()` (#2876)
Co-authored-by: Alice Ryhl <alice@ryhl.io>
Co-authored-by: Carl Lerche <me@carllerche.com>
Diffstat (limited to 'tokio')
68 files changed, 1128 insertions, 1159 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4d5f833c..cb407f93 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -30,7 +30,6 @@ default = [] # enable everything full = [ - "blocking", "dns", "fs", "io-util", @@ -47,12 +46,11 @@ full = [ "time", ] -blocking = ["rt-core"] -dns = ["rt-core"] -fs = ["rt-core", "io-util"] +dns = [] +fs = [] io-util = ["memchr"] # stdin, stdout, stderr -io-std = ["rt-core"] +io-std = [] macros = ["tokio-macros"] net = ["dns", "tcp", "udp", "uds"] process = [ diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs new file mode 100644 index 00000000..d6ef5915 --- /dev/null +++ b/tokio/src/blocking.rs @@ -0,0 +1,48 @@ +cfg_rt_core! { + pub(crate) use crate::runtime::spawn_blocking; + pub(crate) use crate::task::JoinHandle; +} + +cfg_not_rt_core! { + use std::fmt; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pub(crate) fn spawn_blocking<F, R>(_f: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + assert_send_sync::<JoinHandle<std::cell::Cell<()>>>(); + panic!("requires the `rt-core` Tokio feature flag") + + } + + pub(crate) struct JoinHandle<R> { + _p: std::marker::PhantomData<R>, + } + + unsafe impl<T: Send> Send for JoinHandle<T> {} + unsafe impl<T: Send> Sync for JoinHandle<T> {} + + impl<R> Future for JoinHandle<R> { + type Output = Result<R, std::io::Error>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + unreachable!() + } + } + + impl<T> fmt::Debug for JoinHandle<T> + where + T: fmt::Debug, + { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } + } + + fn assert_send_sync<T: Send + Sync>() { + } +} diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 27e969c5..f6cca1c5 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + //! Opt-in yield points for improved cooperative scheduling. //! //! A single call to [`poll`] on a top-level task may potentially do a lot of @@ -96,14 +98,6 @@ pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { with_budget(Budget::initial(), f) } -cfg_rt_threaded! { - /// Set the current task's budget - #[cfg(feature = "blocking")] - pub(crate) fn set(budget: Budget) { - CURRENT.with(|cell| cell.set(budget)) - } -} - #[inline(always)] fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { struct ResetGuard<'a> { @@ -129,13 +123,18 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { } cfg_rt_threaded! { + /// Set the current task's budget + pub(crate) fn set(budget: Budget) { + CURRENT.with(|cell| cell.set(budget)) + } + #[inline(always)] pub(crate) fn has_budget_remaining() -> bool { CURRENT.with(|cell| cell.get().has_remaining()) } } -cfg_blocking_impl! { +cfg_rt_core! { /// Forcibly remove the budgeting constraints early. /// /// Returns the remaining budget diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index a2b062b1..d2757a5f 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -107,6 +107,6 @@ mod sys { pub(crate) use std::fs::File; // TODO: don't rename - pub(crate) use crate::runtime::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::blocking::JoinHandle as Blocking; } diff --git a/tokio/src/future/block_on.rs b/tokio/src/future/block_on.rs new file mode 100644 index 00000000..9fc7abc6 --- /dev/null +++ b/tokio/src/future/block_on.rs @@ -0,0 +1,15 @@ +use std::future::Future; + +cfg_rt_core! { + pub(crate) fn block_on<F: Future>(f: F) -> F::Output { + let mut e = crate::runtime::enter::enter(false); + e.block_on(f).unwrap() + } +} + +cfg_not_rt_core! { + pub(crate) fn block_on<F: Future>(f: F) -> F::Output { + let mut park = crate::park::thread::CachedParkThread::new(); + park.block_on(f).unwrap() + } +} diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index 770753f3..f7d93c98 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -1,15 +1,24 @@ -#![allow(unused_imports, dead_code)] +#![cfg_attr(not(feature = "macros"), allow(unreachable_pub))] //! Asynchronous values. -mod maybe_done; -pub use maybe_done::{maybe_done, MaybeDone}; +#[cfg(any(feature = "macros", feature = "process"))] +pub(crate) mod maybe_done; mod poll_fn; pub use poll_fn::poll_fn; -mod ready; -pub(crate) use ready::{ok, Ready}; +cfg_not_loom! { + mod ready; + pub(crate) use ready::{ok, Ready}; +} -mod try_join; -pub(crate) use try_join::try_join3; +cfg_process! { + mod try_join; + pub(crate) use try_join::try_join3; +} + +cfg_sync! { + mod block_on; + pub(crate) use block_on::block_on; +} diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs index 9b3d1370..0169bd5f 100644 --- a/tokio/src/future/poll_fn.rs +++ b/tokio/src/future/poll_fn.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + //! Definition of the `PollFn` adapter combinator use std::fmt; diff --git a/tokio/src/future/try_join.rs b/tokio/src/future/try_join.rs index 5bd80dc8..8943f61a 100644 --- a/tokio/src/future/try_join.rs +++ b/tokio/src/future/try_join.rs @@ -1,4 +1,4 @@ -use crate::future::{maybe_done, MaybeDone}; +use crate::future::maybe_done::{maybe_done, MaybeDone}; use pin_project_lite::pin_project; use std::future::Future; diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index c4f5887a..0d4133a5 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "rt-core"), allow(dead_code))] + mod ready; use ready::Ready; @@ -5,7 +7,6 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::park::{Park, Unpark}; -use crate::runtime::context; use crate::util::bit; use crate::util::slab::{self, Slab}; @@ -218,17 +219,36 @@ impl fmt::Debug for Driver { // ===== impl Handle ===== -impl Handle { - /// Returns a handle to the current reactor - /// - /// # Panics - /// - /// This function panics if there is no current reactor set. - pub(super) fn current() -> Self { - context::io_handle() - .expect("there is no reactor running, must be called from the context of Tokio runtime") +cfg_rt_core! { + impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set and `rt-core` feature + /// flag is not enabled. + pub(super) fn current() -> Self { + crate::runtime::context::io_handle() + .expect("there is no reactor running, must be called from the context of Tokio runtime") + } } +} +cfg_not_rt_core! { + impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt-core` + /// feature flag is not enabled. + pub(super) fn current() -> Self { + panic!("there is no reactor running, must be called from the context of Tokio runtime with `rt-core` enabled.") + } + } +} + +impl Handle { /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. /// diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index e1a036fb..62728ac1 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -250,7 +250,7 @@ cfg_io_blocking! { /// Types in this module can be mocked out in tests. mod sys { // TODO: don't rename - pub(crate) use crate::runtime::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::blocking::JoinHandle as Blocking; } } diff --git a/tokio/src/io/stdio_common.rs b/tokio/src/io/stdio_common.rs index 03800fcb..d21c842f 100644 --- a/tokio/src/io/stdio_common.rs +++ b/tokio/src/io/stdio_common.rs @@ -183,8 +183,7 @@ mod tests { let fut = async move { wr.write_all(data.as_bytes()).await.unwrap(); }; - crate::runtime::Builder::new() - .basic_scheduler() + crate::runtime::Builder::new_current_thread() .build() .unwrap() .block_on(fut); @@ -200,8 +199,7 @@ mod tests { data.extend(std::iter::repeat(0b1010_1010).take(MAX_BUF - checked_count + 1)); let mut writer = LoggingMockWriter::new(); let mut splitter = super::SplitByUtf8BoundaryIfWindows::new(&mut writer); - crate::runtime::Builder::new() - .basic_scheduler() + crate::runtime::Builder::new_current_thread() .build() .unwrap() .block_on(async { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index cd05cb55..1334eb88 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -348,8 +348,7 @@ cfg_fs! { pub mod fs; } -#[doc(hidden)] -pub mod future; +mod future; pub mod io; pub mod net; @@ -363,7 +362,14 @@ cfg_process! { pub mod process; } -pub mod runtime; +#[cfg(any(feature = "dns", feature = "fs", feature = "io-std"))] +mod blocking; + +cfg_rt_core! { + pub mod runtime; +} +#[cfg(all(not(feature = "rt-core"), feature = "rt-util"))] +mod runtime; pub(crate) mod coop; @@ -389,8 +395,8 @@ cfg_not_sync! { mod sync; } +pub mod task; cfg_rt_core! { - pub mod task; pub use task::spawn; } @@ -414,24 +420,24 @@ cfg_macros! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] - pub use tokio_macros::main_threaded as main; + pub use tokio_macros::main; #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] - pub use tokio_macros::test_threaded as test; + pub use tokio_macros::test; } cfg_not_rt_threaded! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 - pub use tokio_macros::main_basic as main; - pub use tokio_macros::test_basic as test; + pub use tokio_macros::main_rt_core as main; + pub use tokio_macros::test_rt_core as test; } } - // Maintains old behavior + // Always fail if rt-core is not enabled. cfg_not_rt_core! { #[cfg(not(test))] - pub use tokio_macros::main; - pub use tokio_macros::test; + pub use tokio_macros::main_fail as main; + pub use tokio_macros::test_fail as test; } } diff --git a/tokio/src/loom/mod.rs b/tokio/src/loom/mod.rs index 56a41f25..5957b537 100644 --- a/tokio/src/loom/mod.rs +++ b/tokio/src/loom/mod.rs @@ -1,6 +1,8 @@ //! This module abstracts over `loom` and `std::sync` depending on whether we //! are running tests or not. +#![allow(unused)] + #[cfg(not(all(test, loom)))] mod std; #[cfg(not(all(test, loom)))] diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 8f1536f8..83102da6 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -1,70 +1,10 @@ #![allow(unused_macros)] -macro_rules! cfg_resource_drivers { - ($($item:item)*) => { - $( - #[cfg(any( - feature = "process", - all(unix, feature = "signal"), - all(not(loom), feature = "tcp"), - feature = "time", - all(not(loom), feature = "udp"), - all(not(loom), feature = "uds"), - ))] - $item - )* - } -} - -macro_rules! cfg_blocking { - ($($item:item)*) => { - $( - #[cfg(feature = "blocking")] - #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] - $item - )* - } -} - -/// Enables blocking API internals -macro_rules! cfg_blocking_impl { - ($($item:item)*) => { - $( - #[cfg(any( - feature = "blocking", - feature = "fs", - feature = "dns", - feature = "io-std", - feature = "rt-threaded", - ))] - $item - )* - } -} - -/// Enables blocking API internals -macro_rules! cfg_blocking_impl_or_task { - ($($item:item)*) => { - $( - #[cfg(any( - feature = "blocking", - feature = "fs", - feature = "dns", - feature = "io-std", - feature = "rt-threaded", - feature = "task", - ))] - $item - )* - } -} - /// Enables enter::block_on macro_rules! cfg_block_on { ($($item:item)*) => { $( #[cfg(any( - feature = "blocking", feature = "fs", feature = "dns", feature = "io-std", @@ -75,29 +15,13 @@ macro_rules! cfg_block_on { } } -/// Enables blocking API internals -macro_rules! cfg_not_blocking_impl { - ($($item:item)*) => { - $( - #[cfg(not(any( - feature = "blocking", - feature = "fs", - feature = "dns", - feature = "io-std", - feature = "rt-threaded", - )))] - $item - )* - } -} - /// Enables internal `AtomicWaker` impl macro_rules! cfg_atomic_waker_impl { ($($item:item)*) => { $( #[cfg(any( feature = "process", - all(feature = "rt-core", feature = "rt-util"), + feature = "rt-util", feature = "signal", feature = "tcp", feature = "time", @@ -324,6 +248,16 @@ macro_rules! cfg_rt_core { } } +macro_rules! cfg_task { + ($($item:item)*) => { + $( + #[cfg(any(feature = "rt-core", feature = "rt-util"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "rt-core", feature = "rt-util"))))] + $item + )* + } +} + macro_rules! doc_rt_core { ($($item:item)*) => { $( @@ -451,12 +385,12 @@ macro_rules! cfg_coop { ($($item:item)*) => { $( #[cfg(any( - feature = "blocking", feature = "dns", feature = "fs", feature = "io-std", feature = "process", feature = "rt-core", + feature = "rt-util", feature = "signal", feature = "sync", feature = "stream", diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index 2643c360..a9d87657 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -16,7 +16,7 @@ mod ready; mod thread_local; #[macro_use] -#[cfg(feature = "rt-core")] +#[cfg(any(feature = "rt-core", feature = "rt-util"))] pub(crate) mod scoped_tls; cfg_macros! { diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs index fc1cdfcf..7f11bc68 100644 --- a/tokio/src/macros/support.rs +++ b/tokio/src/macros/support.rs @@ -1,5 +1,6 @@ cfg_macros! { - pub use crate::future::{maybe_done, poll_fn}; + pub use crate::future::poll_fn; + pub use crate::future::maybe_done::maybe_done; pub use crate::util::thread_rng_n; } diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs index 86b0962b..e2d09d47 100644 --- a/tokio/src/net/addr.rs +++ b/tokio/src/net/addr.rs @@ -153,22 +153,22 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future { - use crate::runtime::spawn_blocking; + use crate::blocking::spawn_blocking; use sealed::MaybeReady; // First check if the input parses as a socket address let res: Result<SocketAddr, _> = self.parse(); |