diff options
67 files changed, 1657 insertions, 1358 deletions
diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 62c8d94b..65c8fdee 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -110,6 +110,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { fn #name(#inputs) #ret { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() .block_on(async { #body }) @@ -211,6 +212,7 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { fn #name() #ret { tokio::runtime::Builder::new() .basic_scheduler() + .enable_all() .build() .unwrap() .block_on(async { #body }) diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index 5a2b74bf..e6a243a1 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. //! //! diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index bdd4a9f9..d70a0c22 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -14,6 +14,7 @@ //! Tokio and Futures based testing utilites pub mod io; + mod macros; pub mod task; @@ -27,7 +28,11 @@ pub mod task; pub fn block_on<F: std::future::Future>(future: F) -> F::Output { use tokio::runtime; - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); rt.block_on(future) } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0619a50f..fdec4695 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -99,7 +99,7 @@ fnv = { version = "1.0.6", optional = true } futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } -mio = { version = "0.6.14", optional = true } +mio = { version = "0.6.20", optional = true } num_cpus = { version = "1.8.0", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index c9e4e637..1ded892c 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! Asynchronous file and standard stream adaptation. //! //! This module contains utility methods and adapter types for input/output to @@ -96,6 +98,6 @@ mod sys { pub(crate) use std::fs::File; // TODO: don't rename - pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::runtime::spawn_blocking as run; pub(crate) use crate::task::JoinHandle as Blocking; } diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 4f47dc38..bb784541 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -4,7 +4,7 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::loom::sync::atomic::AtomicUsize; -use crate::runtime::{Park, Unpark}; +use crate::park::{Park, Unpark}; use crate::util::slab::{Address, Slab}; use mio::event::Evented; diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 86f2653b..627e643f 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -1,3 +1,5 @@ +#![cfg_attr(loom, allow(dead_code, unreachable_pub))] + //! Asynchronous I/O. //! //! This module is the asynchronous version of `std::io`. Primarily, it @@ -204,7 +206,7 @@ cfg_io_blocking! { /// Types in this module can be mocked out in tests. mod sys { // TODO: don't rename - pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::runtime::spawn_blocking as run; pub(crate) use crate::task::JoinHandle as Blocking; } } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 88290f98..a0f1c194 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -208,9 +208,6 @@ #[macro_use] mod macros; -// Blocking task implementation -pub(crate) mod blocking; - cfg_fs! { pub mod fs; } @@ -218,10 +215,10 @@ cfg_fs! { mod future; pub mod io; - pub mod net; mod loom; +mod park; pub mod prelude; diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 2c5b7eaa..e6faa3b1 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(feature = "full"), allow(unused_imports, dead_code))] +#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] mod atomic_u32; mod atomic_u64; @@ -45,8 +45,8 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::atomic_u64::AtomicU64; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::spin_loop_hint; pub(crate) use std::sync::atomic::{fence, AtomicPtr}; + pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; } } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 5e84a3ac..a3146688 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -1,8 +1,12 @@ #![allow(unused_macros)] -macro_rules! cfg_atomic_waker { +macro_rules! cfg_resource_drivers { ($($item:item)*) => { - $( #[cfg(any(feature = "io-driver", feature = "time"))] $item )* + $( + #[cfg(any(feature = "io-driver", feature = "time"))] + #[cfg(not(loom))] + $item + )* } } diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs index 108557e4..aa66c5fa 100644 --- a/tokio/src/net/addr.rs +++ b/tokio/src/net/addr.rs @@ -124,7 +124,7 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self) -> Self::Future { - use crate::blocking; + use crate::runtime::spawn_blocking; use sealed::MaybeReady; // First check if the input parses as a socket address @@ -137,7 +137,7 @@ cfg_dns! { // Run DNS lookup on the blocking pool let s = self.to_owned(); - MaybeReady::Blocking(blocking::spawn_blocking(move || { + MaybeReady::Blocking(spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&s) })) } @@ -152,7 +152,7 @@ cfg_dns! { type Future = sealed::MaybeReady; fn to_socket_addrs(&self) -> Self::Future { - use crate::blocking; + use crate::runtime::spawn_blocking; use sealed::MaybeReady; let (host, port) = *self; @@ -174,7 +174,7 @@ cfg_dns! { let host = host.to_owned(); - MaybeReady::Blocking(blocking::spawn_blocking(move || { + MaybeReady::Blocking(spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) })) } diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs index f02b6259..ac913b21 100644 --- a/tokio/src/net/mod.rs +++ b/tokio/src/net/mod.rs @@ -1,3 +1,5 @@ +#![cfg(not(loom))] + //! TCP/UDP/Unix bindings for `tokio`. //! //! This module contains the TCP/UDP/Unix networking types, similar to the standard diff --git a/tokio/src/park/either.rs b/tokio/src/park/either.rs new file mode 100644 index 00000000..67f1e172 --- /dev/null +++ b/tokio/src/park/either.rs @@ -0,0 +1,65 @@ +use crate::park::{Park, Unpark}; + +use std::fmt; +use std::time::Duration; + +pub(crate) enum Either<A, B> { + A(A), + B(B), +} + +impl<A, B> Park for Either<A, B> +where + A: Park, + B: Park, +{ + type Unpark = Either<A::Unpark, B::Unpark>; + type Error = Either<A::Error, B::Error>; + + fn unpark(&self) -> Self::Unpark { + match self { + Either::A(a) => Either::A(a.unpark()), + Either::B(b) => Either::B(b.unpark()), + } + } + + fn park(&mut self) -> Result<(), Self::Error> { + match self { + Either::A(a) => a.park().map_err(Either::A), + Either::B(b) => b.park().map_err(Either::B), + } + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + match self { + Either::A(a) => a.park_timeout(duration).map_err(Either::A), + Either::B(b) => b.park_timeout(duration).map_err(Either::B), + } + } +} + +impl<A, B> Unpark for Either<A, B> +where + A: Unpark, + B: Unpark, +{ + fn unpark(&self) { + match self { + Either::A(a) => a.unpark(), + Either::B(b) => b.unpark(), + } + } +} + +impl<A, B> fmt::Debug for Either<A, B> +where + A: fmt::Debug, + B: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Either::A(a) => a.fmt(fmt), + Either::B(b) => b.fmt(fmt), + } + } +} diff --git a/tokio/src/runtime/park/mod.rs b/tokio/src/park/mod.rs index bae96d9d..e6b0c72b 100644 --- a/tokio/src/runtime/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -44,12 +44,18 @@ //! [up]: trait.Unpark.html //! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html +cfg_resource_drivers! { + mod either; + pub(crate) use self::either::Either; +} + mod thread; -#[cfg(feature = "rt-threaded")] -pub(crate) use self::thread::CachedParkThread; -#[cfg(not(feature = "io-driver"))] pub(crate) use self::thread::ParkThread; +cfg_blocking_impl! { + pub(crate) use self::thread::CachedParkThread; +} + use std::sync::Arc; use std::time::Duration; @@ -58,7 +64,7 @@ use std::time::Duration; /// See [module documentation][mod] for more details. /// /// [mod]: ../index.html -pub trait Park { +pub(crate) trait Park { /// Unpark handle type for the `Park` implementation. type Unpark: Unpark; @@ -112,7 +118,7 @@ pub trait Park { /// /// [mod]: ../index.html /// [`Park`]: trait.Park.html -pub trait Unpark: Sync + Send + 'static { +pub(crate) trait Unpark: Sync + Send + 'static { /// Unblock a thread that is blocked by the associated `Park` handle. /// /// Calling `unpark` atomically makes available the unpark token, if it is diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs new file mode 100644 index 00000000..dc844871 --- /dev/null +++ b/tokio/src/park/thread.rs @@ -0,0 +1,316 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::park::{Park, Unpark}; + +use std::sync::atomic::Ordering::SeqCst; +use std::time::Duration; + +#[derive(Debug)] +pub(crate) struct ParkThread { + inner: Arc<Inner>, +} + +/// Error returned by [`ParkThread`] +/// +/// This currently is never returned, but might at some point in the future. +/// +/// [`ParkThread`]: struct.ParkThread.html +#[derive(Debug)] +pub(crate) struct ParkError { + _p: (), +} + +/// Unblocks a thread that was blocked by `ParkThread`. +#[derive(Clone, Debug)] +pub(crate) struct UnparkThread { + inner: Arc<Inner>, +} + +#[derive(Debug)] +struct Inner { + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +thread_local! { + static CURRENT_PARKER: ParkThread = ParkThread::new(); +} + +// ==== impl ParkThread ==== + +impl ParkThread { + pub(crate) fn new() -> Self { + Self { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + }), + } + } +} + +impl Park for ParkThread { + type Unpark = UnparkThread; + type Error = ParkError; + + fn unpark(&self) -> Self::Unpark { + let inner = self.inner.clone(); + UnparkThread { inner } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration); + Ok(()) + } +} + +// ==== impl Inner ==== + +impl Inner { + /// Park the current thread for at most `dur`. + fn park(&self) { + // If we were previously notified then we consume this notification and + // return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // Otherwise we need to coordinate going to sleep + let mut m = self.mutex.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + loop { + m = self.condvar.wait(m).unwrap(); + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // spurious wakeup, go back to sleep + } + } + + fn park_timeout(&self, dur: Duration) { + // Like `park` above we have a fast path for an already-notified thread, + // and afterwards we start coordinating for a sleep. return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + let m = self.mutex.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read again here, see `park`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park_timeout state; actual = {}", actual), + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification, we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap(); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before + // this call, we must perform a release operation that `park` can + // synchronize with. To do that we must write `NOTIFIED` even if `state` + // is already `NOTIFIED`. That is why this must be a swap rather than a + // compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.mutex.lock().unwrap()); + + self.condvar.notify_one() + } +} + +impl Default for ParkThread { + fn default() -> Self { + Self::new() + } +} + +// ===== impl UnparkThread ===== + +impl Unpark for UnparkThread { + fn unpark(&self) { + self.inner.unpark(); + } +} + +cfg_blocking_impl! { + use std::marker::PhantomData; + use std::rc::Rc; + + use std::mem; + use std::task::{RawWaker, RawWakerVTable, Waker}; + + /// Blocks the current thread using a condition variable. + #[derive(Debug)] + pub(crate) struct CachedParkThread { + _anchor: PhantomData<Rc<()>>, + } + + impl CachedParkThread { + /// Create a new `ParkThread` handle for the current thread. + /// + /// This type cannot be moved to other threads, so it should be created on + /// the thread that the caller intends to park. + pub(crate) fn new() -> CachedParkThread { + CachedParkThread { + _anchor: PhantomData, + } + } + + /// Get a reference to the `ParkThread` handle for this thread. + fn with_current<F, R>(&self, f: F) -> R + where + F: FnOnce(&ParkThread) -> R, + { + |