From 8880222036f37c6204c8466f25e828447f16dacb Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 12 Oct 2020 13:44:54 -0400 Subject: rt: Remove `threaded_scheduler()` and `basic_scheduler()` (#2876) Co-authored-by: Alice Ryhl Co-authored-by: Carl Lerche --- tokio/Cargo.toml | 8 +- tokio/src/blocking.rs | 48 ++ tokio/src/coop.rs | 17 +- tokio/src/fs/mod.rs | 4 +- tokio/src/future/block_on.rs | 15 + tokio/src/future/mod.rs | 23 +- tokio/src/future/poll_fn.rs | 2 + tokio/src/future/try_join.rs | 2 +- tokio/src/io/driver/mod.rs | 40 +- tokio/src/io/mod.rs | 4 +- tokio/src/io/stdio_common.rs | 6 +- tokio/src/lib.rs | 28 +- tokio/src/loom/mod.rs | 2 + tokio/src/macros/cfg.rs | 90 +--- tokio/src/macros/mod.rs | 2 +- tokio/src/macros/support.rs | 3 +- tokio/src/net/addr.rs | 45 +- tokio/src/park/either.rs | 2 + tokio/src/park/mod.rs | 11 +- tokio/src/park/thread.rs | 25 +- tokio/src/process/unix/driver.rs | 2 + tokio/src/runtime/basic_scheduler.rs | 2 +- tokio/src/runtime/blocking/mod.rs | 20 +- tokio/src/runtime/blocking/pool.rs | 3 - tokio/src/runtime/builder.rs | 262 +++++------ tokio/src/runtime/context.rs | 6 +- tokio/src/runtime/driver.rs | 20 +- tokio/src/runtime/enter.rs | 214 +++++---- tokio/src/runtime/mod.rs | 648 +++++++++++++-------------- tokio/src/runtime/spawner.rs | 2 - tokio/src/runtime/task/error.rs | 2 +- tokio/src/runtime/task/join.rs | 2 +- tokio/src/runtime/task/mod.rs | 34 +- tokio/src/runtime/tests/loom_blocking.rs | 5 +- tokio/src/runtime/tests/loom_pool.rs | 5 +- tokio/src/runtime/thread_pool/atomic_cell.rs | 1 - tokio/src/runtime/thread_pool/mod.rs | 4 +- tokio/src/runtime/thread_pool/worker.rs | 161 ++++--- tokio/src/signal/registry.rs | 2 +- tokio/src/signal/unix/driver.rs | 45 +- tokio/src/signal/windows.rs | 3 +- tokio/src/stream/mod.rs | 2 +- tokio/src/sync/mod.rs | 6 +- tokio/src/sync/mpsc/bounded.rs | 8 +- tokio/src/sync/mpsc/chan.rs | 2 +- tokio/src/task/blocking.rs | 153 +++---- tokio/src/task/local.rs | 2 + tokio/src/task/mod.rs | 19 +- tokio/src/task/yield_now.rs | 2 +- tokio/src/time/clock.rs | 38 +- tokio/src/time/driver/handle.rs | 72 ++- tokio/src/time/driver/mod.rs | 2 + tokio/src/util/linked_list.rs | 2 + tokio/src/util/mod.rs | 16 +- tokio/src/util/slab.rs | 2 + tokio/src/util/trace.rs | 4 +- tokio/tests/io_driver.rs | 3 +- tokio/tests/io_driver_drop.rs | 3 +- tokio/tests/rt_basic.rs | 3 +- tokio/tests/rt_common.rs | 15 +- tokio/tests/rt_threaded.rs | 17 +- tokio/tests/signal_drop_rt.rs | 3 +- tokio/tests/signal_multi_rt.rs | 3 +- tokio/tests/sync_rwlock.rs | 2 +- tokio/tests/task_blocking.rs | 53 +-- tokio/tests/task_local.rs | 2 +- tokio/tests/task_local_set.rs | 27 +- tokio/tests/time_rt.rs | 6 +- 68 files changed, 1128 insertions(+), 1159 deletions(-) create mode 100644 tokio/src/blocking.rs create mode 100644 tokio/src/future/block_on.rs (limited to 'tokio') diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4d5f833c..cb407f93 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -30,7 +30,6 @@ default = [] # enable everything full = [ - "blocking", "dns", "fs", "io-util", @@ -47,12 +46,11 @@ full = [ "time", ] -blocking = ["rt-core"] -dns = ["rt-core"] -fs = ["rt-core", "io-util"] +dns = [] +fs = [] io-util = ["memchr"] # stdin, stdout, stderr -io-std = ["rt-core"] +io-std = [] macros = ["tokio-macros"] net = ["dns", "tcp", "udp", "uds"] process = [ diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs new file mode 100644 index 00000000..d6ef5915 --- /dev/null +++ b/tokio/src/blocking.rs @@ -0,0 +1,48 @@ +cfg_rt_core! { + pub(crate) use crate::runtime::spawn_blocking; + pub(crate) use crate::task::JoinHandle; +} + +cfg_not_rt_core! { + use std::fmt; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pub(crate) fn spawn_blocking(_f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + assert_send_sync::>>(); + panic!("requires the `rt-core` Tokio feature flag") + + } + + pub(crate) struct JoinHandle { + _p: std::marker::PhantomData, + } + + unsafe impl Send for JoinHandle {} + unsafe impl Sync for JoinHandle {} + + impl Future for JoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + unreachable!() + } + } + + impl fmt::Debug for JoinHandle + where + T: fmt::Debug, + { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } + } + + fn assert_send_sync() { + } +} diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 27e969c5..f6cca1c5 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + //! Opt-in yield points for improved cooperative scheduling. //! //! A single call to [`poll`] on a top-level task may potentially do a lot of @@ -96,14 +98,6 @@ pub(crate) fn budget(f: impl FnOnce() -> R) -> R { with_budget(Budget::initial(), f) } -cfg_rt_threaded! { - /// Set the current task's budget - #[cfg(feature = "blocking")] - pub(crate) fn set(budget: Budget) { - CURRENT.with(|cell| cell.set(budget)) - } -} - #[inline(always)] fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { struct ResetGuard<'a> { @@ -129,13 +123,18 @@ fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { } cfg_rt_threaded! { + /// Set the current task's budget + pub(crate) fn set(budget: Budget) { + CURRENT.with(|cell| cell.set(budget)) + } + #[inline(always)] pub(crate) fn has_budget_remaining() -> bool { CURRENT.with(|cell| cell.get().has_remaining()) } } -cfg_blocking_impl! { +cfg_rt_core! { /// Forcibly remove the budgeting constraints early. /// /// Returns the remaining budget diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index a2b062b1..d2757a5f 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -107,6 +107,6 @@ mod sys { pub(crate) use std::fs::File; // TODO: don't rename - pub(crate) use crate::runtime::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::blocking::JoinHandle as Blocking; } diff --git a/tokio/src/future/block_on.rs b/tokio/src/future/block_on.rs new file mode 100644 index 00000000..9fc7abc6 --- /dev/null +++ b/tokio/src/future/block_on.rs @@ -0,0 +1,15 @@ +use std::future::Future; + +cfg_rt_core! { + pub(crate) fn block_on(f: F) -> F::Output { + let mut e = crate::runtime::enter::enter(false); + e.block_on(f).unwrap() + } +} + +cfg_not_rt_core! { + pub(crate) fn block_on(f: F) -> F::Output { + let mut park = crate::park::thread::CachedParkThread::new(); + park.block_on(f).unwrap() + } +} diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index 770753f3..f7d93c98 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -1,15 +1,24 @@ -#![allow(unused_imports, dead_code)] +#![cfg_attr(not(feature = "macros"), allow(unreachable_pub))] //! Asynchronous values. -mod maybe_done; -pub use maybe_done::{maybe_done, MaybeDone}; +#[cfg(any(feature = "macros", feature = "process"))] +pub(crate) mod maybe_done; mod poll_fn; pub use poll_fn::poll_fn; -mod ready; -pub(crate) use ready::{ok, Ready}; +cfg_not_loom! { + mod ready; + pub(crate) use ready::{ok, Ready}; +} -mod try_join; -pub(crate) use try_join::try_join3; +cfg_process! { + mod try_join; + pub(crate) use try_join::try_join3; +} + +cfg_sync! { + mod block_on; + pub(crate) use block_on::block_on; +} diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs index 9b3d1370..0169bd5f 100644 --- a/tokio/src/future/poll_fn.rs +++ b/tokio/src/future/poll_fn.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + //! Definition of the `PollFn` adapter combinator use std::fmt; diff --git a/tokio/src/future/try_join.rs b/tokio/src/future/try_join.rs index 5bd80dc8..8943f61a 100644 --- a/tokio/src/future/try_join.rs +++ b/tokio/src/future/try_join.rs @@ -1,4 +1,4 @@ -use crate::future::{maybe_done, MaybeDone}; +use crate::future::maybe_done::{maybe_done, MaybeDone}; use pin_project_lite::pin_project; use std::future::Future; diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index c4f5887a..0d4133a5 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "rt-core"), allow(dead_code))] + mod ready; use ready::Ready; @@ -5,7 +7,6 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::park::{Park, Unpark}; -use crate::runtime::context; use crate::util::bit; use crate::util::slab::{self, Slab}; @@ -218,17 +219,36 @@ impl fmt::Debug for Driver { // ===== impl Handle ===== -impl Handle { - /// Returns a handle to the current reactor - /// - /// # Panics - /// - /// This function panics if there is no current reactor set. - pub(super) fn current() -> Self { - context::io_handle() - .expect("there is no reactor running, must be called from the context of Tokio runtime") +cfg_rt_core! { + impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set and `rt-core` feature + /// flag is not enabled. + pub(super) fn current() -> Self { + crate::runtime::context::io_handle() + .expect("there is no reactor running, must be called from the context of Tokio runtime") + } } +} +cfg_not_rt_core! { + impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt-core` + /// feature flag is not enabled. + pub(super) fn current() -> Self { + panic!("there is no reactor running, must be called from the context of Tokio runtime with `rt-core` enabled.") + } + } +} + +impl Handle { /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. /// diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index e1a036fb..62728ac1 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -250,7 +250,7 @@ cfg_io_blocking! { /// Types in this module can be mocked out in tests. mod sys { // TODO: don't rename - pub(crate) use crate::runtime::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::blocking::JoinHandle as Blocking; } } diff --git a/tokio/src/io/stdio_common.rs b/tokio/src/io/stdio_common.rs index 03800fcb..d21c842f 100644 --- a/tokio/src/io/stdio_common.rs +++ b/tokio/src/io/stdio_common.rs @@ -183,8 +183,7 @@ mod tests { let fut = async move { wr.write_all(data.as_bytes()).await.unwrap(); }; - crate::runtime::Builder::new() - .basic_scheduler() + crate::runtime::Builder::new_current_thread() .build() .unwrap() .block_on(fut); @@ -200,8 +199,7 @@ mod tests { data.extend(std::iter::repeat(0b1010_1010).take(MAX_BUF - checked_count + 1)); let mut writer = LoggingMockWriter::new(); let mut splitter = super::SplitByUtf8BoundaryIfWindows::new(&mut writer); - crate::runtime::Builder::new() - .basic_scheduler() + crate::runtime::Builder::new_current_thread() .build() .unwrap() .block_on(async { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index cd05cb55..1334eb88 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -348,8 +348,7 @@ cfg_fs! { pub mod fs; } -#[doc(hidden)] -pub mod future; +mod future; pub mod io; pub mod net; @@ -363,7 +362,14 @@ cfg_process! { pub mod process; } -pub mod runtime; +#[cfg(any(feature = "dns", feature = "fs", feature = "io-std"))] +mod blocking; + +cfg_rt_core! { + pub mod runtime; +} +#[cfg(all(not(feature = "rt-core"), feature = "rt-util"))] +mod runtime; pub(crate) mod coop; @@ -389,8 +395,8 @@ cfg_not_sync! { mod sync; } +pub mod task; cfg_rt_core! { - pub mod task; pub use task::spawn; } @@ -414,24 +420,24 @@ cfg_macros! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] - pub use tokio_macros::main_threaded as main; + pub use tokio_macros::main; #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] - pub use tokio_macros::test_threaded as test; + pub use tokio_macros::test; } cfg_not_rt_threaded! { #[cfg(not(test))] // Work around for rust-lang/rust#62127 - pub use tokio_macros::main_basic as main; - pub use tokio_macros::test_basic as test; + pub use tokio_macros::main_rt_core as main; + pub use tokio_macros::test_rt_core as test; } } - // Maintains old behavior + // Always fail if rt-core is not enabled. cfg_not_rt_core! { #[cfg(not(test))] - pub use tokio_macros::main; - pub use tokio_macros::test; + pub use tokio_macros::main_fail as main; + pub use tokio_macros::test_fail as test; } } diff --git a/tokio/src/loom/mod.rs b/tokio/src/loom/mod.rs index 56a41f25..5957b537 100644 --- a/tokio/src/loom/mod.rs +++ b/tokio/src/loom/mod.rs @@ -1,6 +1,8 @@ //! This module abstracts over `loom` and `std::sync` depending on whether we //! are running tests or not. +#![allow(unused)] + #[cfg(not(all(test, loom)))] mod std; #[cfg(not(all(test, loom)))] diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 8f1536f8..83102da6 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -1,70 +1,10 @@ #![allow(unused_macros)] -macro_rules! cfg_resource_drivers { - ($($item:item)*) => { - $( - #[cfg(any( - feature = "process", - all(unix, feature = "signal"), - all(not(loom), feature = "tcp"), - feature = "time", - all(not(loom), feature = "udp"), - all(not(loom), feature = "uds"), - ))] - $item - )* - } -} - -macro_rules! cfg_blocking { - ($($item:item)*) => { - $( - #[cfg(feature = "blocking")] - #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] - $item - )* - } -} - -/// Enables blocking API internals -macro_rules! cfg_blocking_impl { - ($($item:item)*) => { - $( - #[cfg(any( - feature = "blocking", - feature = "fs", - feature = "dns", - feature = "io-std", - feature = "rt-threaded", - ))] - $item - )* - } -} - -/// Enables blocking API internals -macro_rules! cfg_blocking_impl_or_task { - ($($item:item)*) => { - $( - #[cfg(any( - feature = "blocking", - feature = "fs", - feature = "dns", - feature = "io-std", - feature = "rt-threaded", - feature = "task", - ))] - $item - )* - } -} - /// Enables enter::block_on macro_rules! cfg_block_on { ($($item:item)*) => { $( #[cfg(any( - feature = "blocking", feature = "fs", feature = "dns", feature = "io-std", @@ -75,29 +15,13 @@ macro_rules! cfg_block_on { } } -/// Enables blocking API internals -macro_rules! cfg_not_blocking_impl { - ($($item:item)*) => { - $( - #[cfg(not(any( - feature = "blocking", - feature = "fs", - feature = "dns", - feature = "io-std", - feature = "rt-threaded", - )))] - $item - )* - } -} - /// Enables internal `AtomicWaker` impl macro_rules! cfg_atomic_waker_impl { ($($item:item)*) => { $( #[cfg(any( feature = "process", - all(feature = "rt-core", feature = "rt-util"), + feature = "rt-util", feature = "signal", feature = "tcp", feature = "time", @@ -324,6 +248,16 @@ macro_rules! cfg_rt_core { } } +macro_rules! cfg_task { + ($($item:item)*) => { + $( + #[cfg(any(feature = "rt-core", feature = "rt-util"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "rt-core", feature = "rt-util"))))] + $item + )* + } +} + macro_rules! doc_rt_core { ($($item:item)*) => { $( @@ -451,12 +385,12 @@ macro_rules! cfg_coop { ($($item:item)*) => { $( #[cfg(any( - feature = "blocking", feature = "dns", feature = "fs", feature = "io-std", feature = "process", feature = "rt-core", + feature = "rt-util", feature = "signal", feature = "sync", feature = "stream", diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index 2643c360..a9d87657 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -16,7 +16,7 @@ mod ready; mod thread_local; #[macro_use] -#[cfg(feature = "rt-core")] +#[cfg(any(feature = "rt-core", feature = "rt-util"))] pub(crate) mod scoped_tls; cfg_macros! { diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs index fc1cdfcf..7f11bc68 100644 --- a/tokio/src/macros/support.rs +++ b/tokio/src/macros/support.rs @@ -1,5 +1,6 @@ cfg_macros! { - pub use crate::future::{maybe_done, poll_fn}; + pub use crate::future::poll_fn; + pub use crate::future::maybe_done::maybe_done; pub use crate::util::thread_rng_n; } diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs index 86b0962b..e2d09d47 100644 --- a/tokio/src/net/addr.rs +++ b/tokio/src/net/addr.rs @@ -153,22 +153,22 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future { - use crate::runtime::spawn_blocking; + use crate::blocking::spawn_blocking; use sealed::MaybeReady; // First check if the input parses as a socket address let res: Result = self.parse(); if let Ok(addr) = res { - return MaybeReady::Ready(Some(addr)); + return MaybeReady(sealed::State::Ready(Some(addr))); } // Run DNS lookup on the blocking pool let s = self.to_owned(); - MaybeReady::Blocking(spawn_blocking(move || { + MaybeReady(sealed::State::Blocking(spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&s) - })) + }))) } } @@ -181,7 +181,7 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future { - use crate::runtime::spawn_blocking; + use crate::blocking::spawn_blocking; use sealed::MaybeReady; let (host, port) = *self; @@ -191,21 +191,21 @@ cfg_dns! { let addr = SocketAddrV4::new(addr, port); let addr = SocketAddr::V4(addr); - return MaybeReady::Ready(Some(addr)); + return MaybeReady(sealed::State::Ready(Some(addr))); } if let Ok(addr) = host.parse::() { let addr = SocketAddrV6::new(addr, port, 0, 0); let addr = SocketAddr::V6(addr); - return MaybeReady::Ready(Some(addr)); + return MaybeReady(sealed::State::Ready(Some(addr))); } let host = host.to_owned(); - MaybeReady::Blocking(spawn_blocking(move || { + MaybeReady(sealed::State::Blocking(spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) - })) + }))) } } @@ -245,15 +245,6 @@ pub(crate) mod sealed { use std::io; use std::net::SocketAddr; - cfg_dns! { - use crate::task::JoinHandle; - - use std::option; - use std::pin::Pin; - use std::task::{Context, Poll}; - use std::vec; - } - #[doc(hidden)] pub trait ToSocketAddrsPriv { type Iter: Iterator + Send + 'static; @@ -266,9 +257,19 @@ pub(crate) mod sealed { pub struct Internal; cfg_dns! { + use crate::blocking::JoinHandle; + + use std::option; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::vec; + #[doc(hidden)] #[derive(Debug)] - pub enum MaybeReady { + pub struct MaybeReady(pub(super) State); + + #[derive(Debug)] + pub(super) enum State { Ready(Option), Blocking(JoinHandle>>), } @@ -284,12 +285,12 @@ pub(crate) mod sealed { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match *self { - MaybeReady::Ready(ref mut i) => { + match self.0 { + State::Ready(ref mut i) => { let iter = OneOrMore::One(i.take().into_iter()); Poll::Ready(Ok(iter)) } - MaybeReady::Blocking(ref mut rx) => { + State::Blocking(ref mut rx) => { let res = ready!(Pin::new(rx).poll(cx))?.map(OneOrMore::More); Poll::Ready(res) diff --git a/tokio/src/park/either.rs b/tokio/src/park/either.rs index c66d1213..ee02ec15 100644 --- a/tokio/src/park/either.rs +++ b/tokio/src/park/either.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + use crate::park::{Park, Unpark}; use std::fmt; diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs index 4085a99a..e4b97512 100644 --- a/tokio/src/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -34,15 +34,12 @@ //! * `park_timeout` does the same as `park` but allows specifying a maximum //! time to block the thread for. -cfg_resource_drivers! { - mod either; - pub(crate) use self::either::Either; +cfg_rt_core! { + pub(crate) mod either; } -mod thread; -pub(crate) use self::thread::ParkThread; - -pub(crate) use self::thread::{CachedParkThread, ParkError}; +#[cfg(any(feature = "rt-core", feature = "rt-util", feature = "sync"))] +pub(crate) mod thread; use std::sync::Arc; use std::time::Duration; diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 494c02b4..2725e456 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::park::{Park, Unpark}; @@ -212,10 +214,10 @@ impl Unpark for UnparkThread { } } +use std::future::Future; use std::marker::PhantomData; -use std::rc::Rc; - use std::mem; +use std::rc::Rc; use std::task::{RawWaker, RawWakerVTable, Waker}; /// Blocks the current thread using a condition variable. @@ -246,6 +248,25 @@ impl CachedParkThread { { CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ()) } + + pub(crate) fn block_on(&mut self, f: F) -> Result { + use std::task::Context; + use std::task::Poll::Ready; + + // `get_unpark()` should not return a Result + let waker = self.get_unpark()?.into_waker(); + let mut cx = Context::from_waker(&waker); + + pin!(f); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); + } + + self.park()?; + } + } } impl Park for CachedParkThread { diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs index 2eea0043..62fe8095 100644 --- a/tokio/src/process/unix/driver.rs +++ b/tokio/src/process/unix/driver.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "rt-core"), allow(dead_code))] + //! Process driver use crate::park::Park; diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 60fe92c3..5ca84671 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -2,7 +2,7 @@ use crate::future::poll_fn; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; -use crate::sync::Notify; +use crate::sync::notify::Notify; use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index a819e9e9..fece3c27 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -3,21 +3,20 @@ //! shells. This isolates the complexity of dealing with conditional //! compilation. -cfg_blocking_impl! { - mod pool; - pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner}; +mod pool; +pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; - mod schedule; - mod shutdown; - pub(crate) mod task; +mod schedule; +mod shutdown; +pub(crate) mod task; - use crate::runtime::Builder; +use crate::runtime::Builder; - pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { - BlockingPool::new(builder, thread_cap) - } +pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) } +/* cfg_not_blocking_impl! { use crate::runtime::Builder; use std::time::Duration; @@ -40,3 +39,4 @@ cfg_not_blocking_impl! { } } } +*/ diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index df0175b1..2d44f896 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -94,10 +94,7 @@ where impl BlockingPool { pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); - #[cfg(feature = "blocking")] let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); - #[cfg(not(feature = "blocking"))] - let keep_alive = KEEP_ALIVE; BlockingPool { spawner: Spawner { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index d43666d3..8e76f52b 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,9 +1,8 @@ use crate::runtime::handle::Handle; -use crate::runtime::shell::Shell; -use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; use std::fmt; -#[cfg(feature = "blocking")] +use std::io; use std::time::Duration; /// Builds Tokio Runtime with custom configuration values. @@ -26,9 +25,8 @@ use std::time::Duration; /// /// fn main() { /// // build runtime -/// let runtime = Builder::new() -/// .threaded_scheduler() -/// .core_threads(4) +/// let runtime = Builder::new_multi_thread() +/// .worker_threads(4) /// .thread_name("my-custom-name") /// .thread_stack_size(3 * 1024 * 1024) /// .build() @@ -38,7 +36,7 @@ use std::time::Duration; /// } /// ``` pub struct Builder { - /// The task execution model to use. + /// Runtime type kind: Kind, /// Whether or not to enable the I/O driver @@ -50,7 +48,7 @@ pub struct Builder { /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. - core_threads: Option, + worker_threads: Option, /// Cap on thread usage. max_threads: usize, @@ -67,32 +65,37 @@ pub struct Builder { /// To run before each worker thread stops pub(super) before_stop: Option, - #[cfg(feature = "blocking")] - #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option, } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; -#[derive(Debug, Clone, Copy)] -enum Kind { - Shell, - #[cfg(feature = "rt-core")] - Basic, +pub(crate) enum Kind { + CurrentThread, #[cfg(feature = "rt-threaded")] - ThreadPool, + MultiThread, } impl Builder { + /// TODO + pub fn new_current_thread() -> Builder { + Builder::new(Kind::CurrentThread) + } + + /// TODO + #[cfg(feature = "rt-threaded")] + pub fn new_multi_thread() -> Builder { + Builder::new(Kind::MultiThread) + } + /// Returns a new runtime builder initialized with default configuration /// values. /// /// Configuration methods can be chained on the return value. - pub fn new() -> Builder { + pub(crate) fn new(kind: Kind) -> Builder { Builder { - // No task execution by default - kind: Kind::Shell, + kind, // I/O defaults to "off" enable_io: false, @@ -101,7 +104,7 @@ impl Builder { enable_time: false, // Default to lazy auto-detection (one thread per CPU core) - core_threads: None, + worker_threads: None, max_threads: 512, @@ -115,7 +118,6 @@ impl Builder { after_start: None, before_stop: None, - #[cfg(feature = "blocking")] keep_alive: None, } } @@ -131,8 +133,7 @@ impl Builder { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); @@ -152,51 +153,63 @@ impl Builder { self } - #[deprecated(note = "In future will be replaced by core_threads method")] - /// Sets the maximum number of worker threads for the `Runtime`'s thread pool. + /// Sets the number of worker threads the `Runtime` will use. + /// + /// This should be a number between 0 and 32,768 though it is advised to + /// keep this value on the smaller side. /// - /// This must be a number between 1 and 32,768 though it is advised to keep - /// this value on the smaller side. + /// # Default /// /// The default value is the number of cores available to the system. - pub fn num_threads(&mut self, val: usize) -> &mut Self { - self.core_threads = Some(val); - self - } - - /// Sets the core number of worker threads for the `Runtime`'s thread pool. /// - /// This should be a number between 1 and 32,768 though it is advised to keep - /// this value on the smaller side. + /// # Panic /// - /// The default value is the number of cores available to the system. + /// When using the `current_thread` runtime this method will panic, since + /// those variants do not allow setting worker thread counts. /// - /// These threads will be always active and running. /// /// # Examples /// + /// ## Multi threaded runtime with 4 threads + /// /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() - /// .core_threads(4) + /// // This will spawn a work-stealing runtime with 4 worker threads. + /// let rt = runtime::Builder::new_multi_thread() + /// .worker_threads(4) /// .build() /// .unwrap(); + /// + /// rt.spawn(async move {}); /// ``` - pub fn core_threads(&mut self, val: usize) -> &mut Self { - assert_ne!(val, 0, "Core threads cannot be zero"); - self.core_threads = Some(val); + /// + /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) + /// + /// ``` + /// use tokio::runtime; + /// + /// // Create a runtime that _must_ be driven from a call + /// // to `Runtime::block_on`. + /// let rt = runtime::Builder::new_current_thread() + /// .build() + /// .unwrap(); + /// + /// // This will run the runtime and future on the current thread + /// rt.block_on(async move {}); + /// ``` + pub fn worker_threads(&mut self, val: usize) -> &mut Self { + self.worker_threads = Some(val); self } /// Specifies limit for threads, spawned by the Runtime. /// /// This is number of threads to be used by Runtime, including `core_threads` - /// Having `max_threads` less than `core_threads` results in invalid configuration + /// Having `max_threads` less than `worker_threads` results in invalid configuration /// when building multi-threaded `Runtime`, which would cause a panic. /// - /// Similarly to the `core_threads`, this number should be between 1 and 32,768. + /// Similarly to the `worker_threads`, this number should be between 0 and 32,768. /// /// The default value is 512. /// @@ -205,7 +218,6 @@ impl Builder { /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for /// blocking annotations) as `max_threads - core_threads`. pub fn max_threads(&mut self, val: usize) -> &mut Self { - assert_ne!(val, 0, "Thread limit cannot be zero"); self.max_threads = val; self } @@ -220,7 +232,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_name("my-pool") /// .build(); /// # } @@ -242,7 +254,7 @@ impl Builder { /// # use std::sync::atomic::{AtomicUsize, Ordering}; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_name_fn(|| { /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); @@ -273,8 +285,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_stack_size(32 * 1024) /// .build(); /// # } @@ -295,8 +306,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let runtime = runtime::Builder::new() - /// .threaded_scheduler() + /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_start(|| { /// println!("thread started"); /// }) @@ -322,8 +332,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let runtime = runtime::Builder::new() - /// .threaded_scheduler() + /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_stop(|| { /// println!("thread stopping"); /// }) @@ -341,26 +350,24 @@ impl Builder { /// Creates the configured `Runtime`. /// - /// The returned `ThreadPool` instance is ready to spawn tasks. + /// The returned `Runtime` instance is ready to spawn tasks. /// /// # Examples /// /// ``` /// use tokio::runtime::Builder; /// - /// let rt = Builder::new().build().unwrap(); + /// let rt = Builder::new_multi_thread().build().unwrap(); /// /// rt.block_on(async { /// println!("Hello from the Tokio runtime"); /// }); /// ``` pub fn build(&mut self) -> io::Result { - match self.kind { - Kind::Shell => self.build_shell_runtime(), - #[cfg(feature = "rt-core")] - Kind::Basic => self.build_basic_runtime(), + match &self.kind { + Kind::CurrentThread => self.build_basic_runtime(), #[cfg(feature = "rt-threaded")] - Kind::ThreadPool => self.build_threaded_runtime(), + Kind::MultiThread => self.build_threaded_runtime(), } } @@ -371,32 +378,6 @@ impl Builder { } } - fn build_shell_runtime(&mut self) -> io::Result { - use crate::runtime::Kind; - - let (driver, resources) = driver::Driver::new(self.get_cfg())?; - - let spawner = Spawner::Shell; - - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - - Ok(Runtime { - kind: Kind::Shell(Shell::new(driver)), - handle: Handle { - spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }, - blocking_pool, - }) - } - - #[cfg(feature = "blocking")] - #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] /// Sets a custom timeout for a thread in the blocking pool. /// /// By default, the timeout for a thread is set to 10 seconds. This can @@ -409,7 +390,7 @@ impl Builder { /// # use std::time::Duration; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_keep_alive(Duration::from_millis(100)) /// .build(); /// # } @@ -418,6 +399,36 @@ impl Builder { self.keep_alive = Some(duration); self } + + fn build_basic_runtime(&mut self) -> io::Result { + use crate::runtime::{BasicScheduler, Kind}; + + let (driver, resources) = driver::Driver::new(self.get_cfg())?; + + // And now put a single-threaded scheduler on top of the timer. When + // there are no futures ready to do something, it'll let the timer or + // the reactor to generate some new stimuli for the futures to continue + // in their life. + let scheduler = BasicScheduler::new(driver); + let spawner = Spawner::Basic(scheduler.spawner().clone()); + + // Blocking pool + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + Ok(Runtime { + kind: Kind::CurrentThread(scheduler), + handle: Handle { + spawner, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, + blocking_spawner, + }, + blocking_pool, + }) + } } cfg_io_driver! { @@ -432,7 +443,7 @@ cfg_io_driver! { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_io() /// .build() /// .unwrap(); @@ -455,7 +466,7 @@ cfg_time! { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_time() /// .build() /// .unwrap(); @@ -467,75 +478,15 @@ cfg_time! { } } -cfg_rt_core! { - impl Builder { - /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread. - /// - /// The executor and all necessary drivers will all be run on the current - /// thread during [`block_on`] calls. - /// - /// See also [the module level documentation][1], which has a section on scheduler - /// types. - /// - /// [1]: index.html#runtime-configurations - /// [`block_on`]: Runtime::block_on - pub fn basic_scheduler(&mut self) -> &mut Self { - self.kind = Kind::Basic; - self - } - - fn build_basic_runtime(&mut self) -> io::Result { - use crate::runtime::{BasicScheduler, Kind}; - - let (driver, resources) = driver::Driver::new(self.get_cfg())?; - - // And now put a single-threaded scheduler on top of the timer. When - // there are no futures ready to do something, it'll let the timer or - // the reactor to generate some new stimuli for the futures to continue - // in their life. - let scheduler = BasicScheduler::new(driver); - let spawner = Spawner::Basic(scheduler.spawner().clone()); - - // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - - Ok(Runtime { - kind: Kind::Basic(scheduler), - handle: Handle { - spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }, - blocking_pool, - }) - } - } -} - cfg_rt_threaded! { impl Builder { - /// Sets runtime to use a multi-threaded scheduler for executing tasks. - /// - /// See also [the module level documentation][1], which has a section on scheduler - /// types. - /// - /// [1]: index.html#runtime-configurations - pub fn threaded_scheduler(&mut self) -> &mut Self { - self.kind = Kind::ThreadPool; - self - } - fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; use crate::runtime::{Kind, ThreadPool}; use crate::runtime::park::Parker; use std::cmp; - let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); + let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -569,17 +520,10 @@ cfg_rt_threaded! { } } -impl Default for Builder { - fn default() -> Self { - Self::new() - } -} - impl fmt::Debug for Builder { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Builder") - .field("kind", &self.kind) - .field("core_threads", &self.core_threads) + .field("worker_threads", &self.worker_threads) .field("max_threads", &self.max_threads) .field( "thread_name", diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index a4f88e90..e28d5282 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -7,10 +7,8 @@ thread_local! { static CONTEXT: RefCell> = RefCell::new(None) } -cfg_blocking_impl! { - pub(crate) fn current() -> Option { - CONTEXT.with(|ctx| ctx.borrow().clone()) - } +pub(crate) fn current() -> Option { + CONTEXT.with(|ctx| ctx.borrow().clone()) } cfg_io_driver! { diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index af8e17a3..6fccb11e 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -1,16 +1,18 @@ //! Abstracts out the entire chain of runtime sub-drivers into common types. -use crate::park::{Park, ParkThread}; +use crate::park::thread::ParkThread; +use crate::park::Park; + use std::io; use std::time::Duration; // ===== io driver ===== cfg_io_driver! { - type IoDriver = crate::park::Either; + type IoDriver = crate::park::either::Either; pub(crate) type IoHandle = Option; fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> { - use crate::park::Either; + use crate::park::either::Either; #[cfg(loom)] assert!(!enable); @@ -47,11 +49,11 @@ macro_rules! cfg_signal_internal_and_unix { } cfg_signal_internal_and_unix! { - type SignalDriver = crate::park::Either; + type SignalDriver = crate::park::either::Either; pub(crate) type SignalHandle = Option; fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { - use crate::park::Either; + use crate::park::either::Either; // Enable the signal driver if IO is also enabled match io_driver { @@ -77,10 +79,10 @@ cfg_not_signal_internal! { // ===== process driver ===== cfg_process_driver! { - type ProcessDriver = crate::park::Either; + type ProcessDriver = crate::park::either::Either; fn create_process_driver(signal_driver: SignalDriver) -> io::Result { - use crate::park::Either; + use crate::park::either::Either; // Enable the signal driver if IO is also enabled match signal_driver { @@ -104,7 +106,7 @@ cfg_not_process_driver! { // ===== time driver ===== cfg_time! { - type TimeDriver = crate::park::Either, ProcessDriver>; + type TimeDriver = crate::park::either::Either, ProcessDriver>; pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; @@ -118,7 +120,7 @@ cfg_time! { process_driver: ProcessDriver, clock: Clock, ) -> (TimeDriver, TimeHandle) { - use crate::park::Either; + use crate::park::either::Either; if enable { let driver = crate::time::driver::Driver::new(process_driver, clock); diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index f934162b..79ed4d17 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -4,8 +4,8 @@ use std::marker::PhantomData; #[derive(Debug, Clone, Copy)] pub(crate) enum EnterContext { + #[cfg_attr(not(feature = "rt-core"), allow(dead_code))] Entered { - #[allow(dead_code)] allow_blocking: bool, }, NotEntered, @@ -24,32 +24,38 @@ pub(crate) struct Enter { _p: PhantomData>, } -/// Marks the current thread as being within the dynamic extent of an -/// executor. -pub(crate) fn enter(allow_blocking: bool) -> Enter { - if let Some(enter) = try_enter(allow_blocking) { - return enter; - } +cfg_rt_core! { + use crate::park::thread::ParkError; - panic!( - "Cannot start a runtime from within a runtime. This happens \ - because a function (like `block_on`) attempted to block the \ - current thread while the thread is being used to drive \ - asynchronous tasks." - ); -} + use std::time::Duration; -/// Tries to enter a runtime context, returns `None` if already in a runtime -/// context. -pub(crate) fn try_enter(allow_blocking: bool) -> Option { - ENTERED.with(|c| { - if c.get().is_entered() { - None - } else { - c.set(EnterContext::Entered { allow_blocking }); - Some(Enter { _p: PhantomData }) + /// Marks the current thread as being within the dynamic extent of an + /// executor. + pub(crate) fn enter(allow_blocking: bool) -> Enter { + if let Some(enter) = try_enter(allow_blocking) { + return enter; } - }) + + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); + } + + /// Tries to enter a runtime context, returns `None` if already in a runtime + /// context. + pub(crate) fn try_enter(allow_blocking: bool) -> Option { + ENTERED.with(|c| { + if c.get().is_entered() { + None + } else { + c.set(EnterContext::Entered { allow_blocking }); + Some(Enter { _p: PhantomData }) + } + }) + } } // Forces the current "entered" state to be cleared while the closure @@ -59,113 +65,92 @@ pub(crate) fn try_enter(allow_blocking: bool) -> Option { // // This is hidden for a reason. Do not use without fully understanding // executors. Misuing can easily cause your program to deadlock. -#[cfg(all(feature = "rt-threaded", feature = "blocking"))] -pub(crate) fn exit R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset(EnterContext); - impl Drop for Reset { - fn drop(&mut self) { - ENTERED.with(|c| { - assert!(!c.get().is_entered(), "closure claimed permanent executor"); - c.set(self.0); - }); +cfg_rt_threaded! { + pub(crate) fn exit R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset(EnterContext); + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(!c.get().is_entered(), "closure claimed permanent executor"); + c.set(self.0); + }); + } } - } - let was = ENTERED.with(|c| { - let e = c.get(); - assert!(e.is_entered(), "asked to exit when not entered"); - c.set(EnterContext::NotEntered); - e - }); + let was = ENTERED.with(|c| { + let e = c.get(); + assert!(e.is_entered(), "asked to exit when not entered"); + c.set(EnterContext::NotEntered); + e + }); - let _reset = Reset(was); - // dropping _reset after f() will reset ENTERED - f() + let _reset = Reset(was); + // dropping _reset after f() will reset ENTERED + f() + } } -cfg_rt_core! { - cfg_rt_util! { - /// Disallow blocking in the current runtime context until the guard is dropped. - pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { - let reset = ENTERED.with(|c| { - if let EnterContext::Entered { - allow_blocking: true, - } = c.get() - { - c.set(EnterContext::Entered { - allow_blocking: false, - }); - true - } else { - false - } - }); - DisallowBlockingGuard(reset) - } +cfg_rt_util! { + /// Disallow blocking in the current runtime context until the guard is dropped. + pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { + let reset = ENTERED.with(|c| { + if let EnterContext::Entered { + allow_blocking: true, + } = c.get() + { + c.set(EnterContext::Entered { + allow_blocking: false, + }); + true + } else { + false + } + }); + DisallowBlockingGuard(reset) + } - pub(crate) struct DisallowBlockingGuard(bool); - impl Drop for DisallowBlockingGuard { - fn drop(&mut self) { - if self.0 { - // XXX: Do we want some kind of assertion here, or is "best effort" okay? - ENTERED.with(|c| { - if let EnterContext::Entered { - allow_blocking: false, - } = c.get() - { - c.set(EnterContext::Entered { - allow_blocking: true, - }); - } - }) - } + pub(crate) struct DisallowBlockingGuard(bool); + impl Drop for DisallowBlockingGuard { + fn drop(&mut self) { + if self.0 { + // XXX: Do we want some kind of assertion here, or is "best effort" okay? + ENTERED.with(|c| { + if let EnterContext::Entered { + allow_blocking: false, + } = c.get() + { + c.set(EnterContext::Entered { + allow_blocking: true, + }); + } + }) } } } } cfg_rt_threaded! { - cfg_blocking! { - /// Returns true if in a runtime context. - pub(crate) fn context() -> EnterContext { - ENTERED.with(|c| c.get()) - } + /// Returns true if in a runtime context. + pub(crate) fn context() -> EnterContext { + ENTERED.with(|c| c.get()) } } -impl Enter { - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - pub(crate) fn block_on(&mut self, f: F) -> Result - where - F: std::future::Future, - { - use crate::park::{CachedParkThread, Park}; - use std::task::Context; - use std::task::Poll::Ready; - - let mut park = CachedParkThread::new(); - let waker = park.get_unpark()?.into_waker(); - let mut cx = Context::from_waker(&waker); - - pin!(f); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } +cfg_rt_core! { + impl Enter { + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on(&mut self, f: F) -> Result + where + F: std::future::Future, + { + use crate::park::thread::CachedParkThread; - park.park()?; + let mut park = CachedParkThread::new(); + park.block_on(f) } - } -} -cfg_blocking_impl! { - use crate::park::ParkError; - use std::time::Duration; - - impl Enter { /// Blocks the thread on the specified future for **at most** `timeout` /// /// If the future completes before `timeout`, the result is returned. If @@ -174,7 +159,8 @@ cfg_blocking_impl! { where F: std::future::Future, { - use crate::park::{CachedParkThread, Park}; + use crate::park::Park; + use crate::park::thread::CachedParkThread; use std::task::Context; use std::task::Poll::Ready; use std::time::Instant; diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 8e70db85..c79a942f 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -1,8 +1,7 @@ //! The Tokio runtime. //! -//! Unlike other Rust programs, asynchronous applications require -//! runtime support. In particular, the following runtime services are -//! necessary: +//! Unlike other Rust programs, asynchronous applications require runtime +//! support. In particular, the following runtime services are necessary: //! //! * An **I/O event loop**, called the driver, which drives I/O resources and //! dispatches I/O events to tasks that depend on them. @@ -10,14 +9,14 @@ //! * A **timer** for scheduling work to run after a set period of time. //! //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing -//! them to be started, shut down, and configured together. However, often -//! it is not required to configure a [`Runtime`] manually, and user may just -//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under -//! the hood. +//! them to be started, shut down, and configured together. However, often it is +//! not required to configure a [`Runtime`] manually, and user may just use the +//! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood. //! //! # Usage //! -//! When no fine tuning is required, the [`tokio::main`] attribute macro can be used. +//! When no fine tuning is required, the [`tokio::main`] attribute macro can be +//! used. //! //! ```no_run //! use tokio::net::TcpListener; @@ -111,47 +110,37 @@ //! applications. The [runtime builder] or `#[tokio::main]` attribute may be //! used to select which scheduler to use. //! -//! #### Basic Scheduler +//! #### Current-Thread Scheduler //! -//! The basic scheduler provides a _single-threaded_ future executor. All tasks -//! will be created and executed on the current thread. The basic scheduler -//! requires the `rt-core` feature flag, and can be selected using the -//! [`Builder::basic_scheduler`] method: +//! The current-thread scheduler provides a _single-threaded_ future executor. +//! All tasks will be created and executed on the current thread. This requires +//! the `rt-core` feature flag. //! ``` //! use tokio::runtime; //! //! # fn main() -> Result<(), Box> { -//! let basic_rt = runtime::Builder::new() -//! .basic_scheduler() +//! let basic_rt = runtime::Builder::new_current_thread() //! .build()?; //! # Ok(()) } //! ``` //! -//! If the `rt-core` feature is enabled and `rt-threaded` is not, -//! [`Runtime::new`] will return a basic scheduler runtime by default. +//! #### Multi-Thread Scheduler //! -//! #### Threaded Scheduler -//! -//! The threaded scheduler executes futures on a _thread pool_, using a +//! The multi-thread scheduler executes futures on a _thread pool_, using a //! work-stealing strategy. By default, it will start a worker thread for each //! CPU core available on the system. This tends to be the ideal configurations -//! for most applications. The threaded scheduler requires the `rt-threaded` feature -//! flag, and can be selected using the [`Builder::threaded_scheduler`] method: +//! for most applications. The multi-thread scheduler requires the `rt-threaded` +//! feature flag, and is selected by default: //! ``` //! use tokio::runtime; //! //! # fn main() -> Result<(), Box> { -//! let threaded_rt = runtime::Builder::new() -//! .threaded_scheduler() -//! .build()?; +//! let threaded_rt = runtime::Runtime::new()?; //! # Ok(()) } //! ``` //! -//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a -//! threaded scheduler runtime by default. -//! -//! Most applications should use the threaded scheduler, except in some niche -//! use-cases, such as when running only a single thread is required. +//! Most applications should use the multi-thread scheduler, except in some +//! niche use-cases, such as when running only a single thread is required. //! //! #### Resource drivers //! @@ -188,37 +177,31 @@ #[macro_use] mod tests; -pub(crate) mod context; +pub(crate) mod enter; + +pub(crate) mod task; cfg_rt_core! { mod basic_scheduler; use basic_scheduler::BasicScheduler; - pub(crate) mod task; -} + mod blocking; + use blocking::BlockingPool; + pub(crate) use blocking::spawn_blocking; -mod blocking; -use blocking::BlockingPool; + mod builder; + pub use self::builder::Builder; -cfg_blocking_impl! { - #[allow(unused_imports)] - pub(crate) use blocking::{spawn_blocking, try_spawn_blocking}; -} - -mod builder; -pub use self::builder::Builder; + pub(crate) mod context; + pub(crate) mod driver; -pub(crate) mod driver; + use self::enter::enter; -pub(crate) mod enter; -use self::enter::enter; + mod handle; + use handle::Handle; -mod handle; -use handle::Handle; - -mod io { - /// Re-exported for convenience. - pub(crate) use std::io::Result; + mod spawner; + use self::spawner::Spawner; } cfg_rt_threaded! { @@ -226,12 +209,6 @@ cfg_rt_threaded! { use park::Parker; } -mod shell; -use self::shell::Shell; - -mod spawner; -use self::spawner::Spawner; - cfg_rt_threaded! { mod queue; @@ -241,318 +218,293 @@ cfg_rt_threaded! { cfg_rt_core! { use crate::task::JoinHandle; -} - -use std::future::Future; -use std::time::Duration; - -/// The Tokio runtime. -/// -/// The runtime provides an I/O driver, task scheduler, [timer], and blocking -/// pool, necessary for running asynchronous tasks. -/// -/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, -/// most users will use the `#[tokio::main]` annotation on their entry point instead. -/// -/// See [module level][mod] documentation for more details. -/// -/// # Shutdown -/// -/// Shutting down the runtime is done by dropping the value. The current thread -/// will block until the shut down operation has completed. -/// -/// * Drain any scheduled work queues. -/// * Drop any futures that have not yet completed. -/// * Drop the reactor. -/// -/// Once the reactor has dropped, any outstanding I/O resources bound to -/// that reactor will no longer function. Calling any method on them will -/// result in an error. -/// -/// [timer]: crate::time -/// [mod]: index.html -/// [`new`]: method@Self::new -/// [`Builder`]: struct@Builder -/// [`tokio::run`]: fn@run -#[derive(Debug)] -pub struct Runtime { - /// Task executor - kind: Kind, - - /// Handle to runtime, also contains driver handles - handle: Handle, - - /// Blocking pool handle, used to signal shutdown - blocking_pool: BlockingPool, -} - -///