diff options
48 files changed, 132 insertions, 239 deletions
diff --git a/tokio/src/executor/blocking/builder.rs b/tokio/src/executor/blocking/builder.rs index 05ea28c2..e755ae23 100644 --- a/tokio/src/executor/blocking/builder.rs +++ b/tokio/src/executor/blocking/builder.rs @@ -1,5 +1,5 @@ use crate::executor::blocking::Pool; -use crate::executor::loom::thread; +use crate::loom::thread; use std::usize; diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 2947062b..d105fe22 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -1,7 +1,7 @@ //! Thread pool for blocking operations -use crate::executor::loom::sync::{Arc, Condvar, Mutex}; -use crate::executor::loom::thread; +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::thread; #[cfg(feature = "blocking")] use crate::sync::oneshot; diff --git a/tokio/src/executor/loom/mod.rs b/tokio/src/executor/loom/mod.rs deleted file mode 100644 index f6f9d23c..00000000 --- a/tokio/src/executor/loom/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -//! Stub out the necessary APIs to model with loom. - -#[cfg(not(all(test, loom)))] -pub(crate) mod std; - -#[cfg(all(test, loom))] -pub(crate) mod std { - pub(crate) use loom::{alloc, cell, sync, thread}; - - pub(crate) mod rand { - pub(crate) fn seed() -> u64 { - 1 - } - } - - pub(crate) mod sys { - pub(crate) fn num_cpus() -> usize { - 2 - } - } -} - -#[cfg(feature = "rt-full")] -pub(crate) use self::std::rand; -#[cfg(any(feature = "blocking", feature = "rt-current-thread"))] -pub(crate) use self::std::sync; -#[cfg(any(feature = "blocking", feature = "rt-full"))] -pub(crate) use self::std::thread; -#[cfg(feature = "rt-current-thread")] -pub(crate) use self::std::{alloc, cell, sys}; diff --git a/tokio/src/executor/mod.rs b/tokio/src/executor/mod.rs index fa2b29e4..6134ea3b 100644 --- a/tokio/src/executor/mod.rs +++ b/tokio/src/executor/mod.rs @@ -52,8 +52,6 @@ pub(crate) use self::enter::enter; mod global; pub use self::global::spawn; -pub(crate) mod loom; - pub mod park; #[cfg(feature = "rt-current-thread")] diff --git a/tokio/src/executor/park/thread.rs b/tokio/src/executor/park/thread.rs index c48b418d..880dfb49 100644 --- a/tokio/src/executor/park/thread.rs +++ b/tokio/src/executor/park/thread.rs @@ -1,6 +1,6 @@ -use crate::executor::loom::sync::atomic::AtomicUsize; -use crate::executor::loom::sync::{Arc, Condvar, Mutex}; use crate::executor::park::{Park, Unpark}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{Arc, Condvar, Mutex}; use std::marker::PhantomData; use std::rc::Rc; @@ -213,7 +213,7 @@ impl Unpark for UnparkThread { #[cfg(feature = "rt-full")] mod waker { use super::{Inner, UnparkThread}; - use crate::executor::loom::sync::Arc; + use crate::loom::sync::Arc; use std::mem; use std::task::{RawWaker, RawWakerVTable, Waker}; diff --git a/tokio/src/executor/task/core.rs b/tokio/src/executor/task/core.rs index d0e1e3dd..7a3016ee 100644 --- a/tokio/src/executor/task/core.rs +++ b/tokio/src/executor/task/core.rs @@ -1,9 +1,9 @@ -use crate::executor::loom::alloc::Track; -use crate::executor::loom::cell::CausalCell; use crate::executor::task::raw::{self, Vtable}; use crate::executor::task::state::State; use crate::executor::task::waker::waker_ref; use crate::executor::task::Schedule; +use crate::loom::alloc::Track; +use crate::loom::cell::CausalCell; use std::cell::UnsafeCell; use std::future::Future; diff --git a/tokio/src/executor/task/harness.rs b/tokio/src/executor/task/harness.rs index e71e9a64..56b3cfbb 100644 --- a/tokio/src/executor/task/harness.rs +++ b/tokio/src/executor/task/harness.rs @@ -1,8 +1,8 @@ -use crate::executor::loom::alloc::Track; -use crate::executor::loom::cell::CausalCheck; use crate::executor::task::core::{Cell, Core, Header, Trailer}; use crate::executor::task::state::Snapshot; use crate::executor::task::{JoinError, Schedule, Task}; +use crate::loom::alloc::Track; +use crate::loom::cell::CausalCheck; use std::future::Future; use std::marker::PhantomData; diff --git a/tokio/src/executor/task/join.rs b/tokio/src/executor/task/join.rs index b395b0ec..faf6eb56 100644 --- a/tokio/src/executor/task/join.rs +++ b/tokio/src/executor/task/join.rs @@ -1,5 +1,5 @@ -use crate::executor::loom::alloc::Track; use crate::executor::task::raw::RawTask; +use crate::loom::alloc::Track; use std::fmt; use std::future::Future; diff --git a/tokio/src/executor/task/raw.rs b/tokio/src/executor/task/raw.rs index 033bd91b..6a34a3d4 100644 --- a/tokio/src/executor/task/raw.rs +++ b/tokio/src/executor/task/raw.rs @@ -1,8 +1,8 @@ -use crate::executor::loom::alloc::Track; use crate::executor::task::core::Cell; use crate::executor::task::harness::Harness; use crate::executor::task::state::{Snapshot, State}; use crate::executor::task::{Header, Schedule}; +use crate::loom::alloc::Track; use std::future::Future; use std::ptr::NonNull; diff --git a/tokio/src/executor/task/stack.rs b/tokio/src/executor/task/stack.rs index 20ac5c46..cd675e10 100644 --- a/tokio/src/executor/task/stack.rs +++ b/tokio/src/executor/task/stack.rs @@ -1,5 +1,5 @@ -use crate::executor::loom::sync::atomic::AtomicPtr; use crate::executor::task::{Header, Task}; +use crate::loom::sync::atomic::AtomicPtr; use std::marker::PhantomData; use std::ptr::{self, NonNull}; diff --git a/tokio/src/executor/task/state.rs b/tokio/src/executor/task/state.rs index 09b18f37..3adfea91 100644 --- a/tokio/src/executor/task/state.rs +++ b/tokio/src/executor/task/state.rs @@ -1,4 +1,4 @@ -use crate::executor::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::AtomicUsize; use std::fmt; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; @@ -224,7 +224,7 @@ impl State { /// /// Returns a snapshot of the state **after** the transition. pub(super) fn release_task(&self) -> Snapshot { - use crate::executor::loom::sync::atomic; + use crate::loom::sync::atomic; const DELTA: usize = RELEASED; @@ -283,7 +283,7 @@ impl State { /// /// Returns a snapshot of the state **after** the transition. pub(super) fn complete_join_handle(&self) -> Snapshot { - use crate::executor::loom::sync::atomic; + use crate::loom::sync::atomic; const DELTA: usize = JOIN_INTEREST; @@ -337,7 +337,7 @@ impl State { /// Store the join waker. pub(super) fn store_join_waker(&self) -> Snapshot { - use crate::executor::loom::sync::atomic; + use crate::loom::sync::atomic; const DELTA: usize = JOIN_WAKER; @@ -407,7 +407,7 @@ impl State { /// Returns `true` if the task should be released. pub(super) fn ref_dec(&self) -> bool { - use crate::executor::loom::sync::atomic; + use crate::loom::sync::atomic; let prev = self.val.fetch_sub(WAKER_ONE, Release); let next = Snapshot(prev - WAKER_ONE); diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs index f9d40350..c1b1ed19 100644 --- a/tokio/src/executor/thread_pool/builder.rs +++ b/tokio/src/executor/thread_pool/builder.rs @@ -1,8 +1,7 @@ -use crate::executor::loom::sync::Arc; -use crate::executor::loom::sys::num_cpus; -use crate::executor::loom::thread; use crate::executor::park::Park; use crate::executor::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool}; +use crate::loom::sync::Arc; +use crate::loom::sys::num_cpus; use std::{fmt, usize}; @@ -109,7 +108,7 @@ impl Builder { impl Drop for AbortOnPanic { fn drop(&mut self) { - if thread::panicking() { + if std::thread::panicking() { eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); std::process::abort(); } diff --git a/tokio/src/executor/thread_pool/current.rs b/tokio/src/executor/thread_pool/current.rs index f02be101..7bbe3e34 100644 --- a/tokio/src/executor/thread_pool/current.rs +++ b/tokio/src/executor/thread_pool/current.rs @@ -1,6 +1,6 @@ -use crate::executor::loom::sync::Arc; use crate::executor::park::Unpark; use crate::executor::thread_pool::{worker, Owned}; +use crate::loom::sync::Arc; use std::cell::Cell; use std::ptr; diff --git a/tokio/src/executor/thread_pool/idle.rs b/tokio/src/executor/thread_pool/idle.rs index 0fae29bf..acf80df8 100644 --- a/tokio/src/executor/thread_pool/idle.rs +++ b/tokio/src/executor/thread_pool/idle.rs @@ -1,7 +1,7 @@ //! Coordinates idling workers -use crate::executor::loom::sync::atomic::AtomicUsize; -use crate::executor::loom::sync::Mutex; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; use std::fmt; use std::sync::atomic::Ordering::{self, AcqRel, Relaxed, SeqCst}; diff --git a/tokio/src/executor/thread_pool/queue/global.rs b/tokio/src/executor/thread_pool/queue/global.rs index 6bcd6216..20a9d36f 100644 --- a/tokio/src/executor/thread_pool/queue/global.rs +++ b/tokio/src/executor/thread_pool/queue/global.rs @@ -1,6 +1,6 @@ -use crate::executor::loom::sync::atomic::AtomicUsize; -use crate::executor::loom::sync::Mutex; use crate::executor::task::{Header, Task}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; use std::marker::PhantomData; use std::ptr::{self, NonNull}; diff --git a/tokio/src/executor/thread_pool/queue/inject.rs b/tokio/src/executor/thread_pool/queue/inject.rs index 5f8caf1a..045ce922 100644 --- a/tokio/src/executor/thread_pool/queue/inject.rs +++ b/tokio/src/executor/thread_pool/queue/inject.rs @@ -1,6 +1,6 @@ -use crate::executor::loom::sync::Arc; use crate::executor::task::Task; use crate::executor::thread_pool::queue::Cluster; +use crate::loom::sync::Arc; pub(crate) struct Inject<T: 'static> { cluster: Arc<Cluster<T>>, diff --git a/tokio/src/executor/thread_pool/queue/local.rs b/tokio/src/executor/thread_pool/queue/local.rs index e3323cb4..62472b0b 100644 --- a/tokio/src/executor/thread_pool/queue/local.rs +++ b/tokio/src/executor/thread_pool/queue/local.rs @@ -1,8 +1,8 @@ -use crate::executor::loom::cell::{CausalCell, CausalCheck}; -use crate::executor::loom::sync::atomic::{self, AtomicU32}; use crate::executor::task::Task; use crate::executor::thread_pool::queue::global; use crate::executor::thread_pool::LOCAL_QUEUE_CAPACITY; +use crate::loom::cell::{CausalCell, CausalCheck}; +use crate::loom::sync::atomic::{self, AtomicU32}; use std::fmt; use std::mem::MaybeUninit; diff --git a/tokio/src/executor/thread_pool/queue/mod.rs b/tokio/src/executor/thread_pool/queue/mod.rs index e873c31a..88633ee3 100644 --- a/tokio/src/executor/thread_pool/queue/mod.rs +++ b/tokio/src/executor/thread_pool/queue/mod.rs @@ -8,7 +8,7 @@ mod worker; pub(crate) use self::inject::Inject; pub(crate) use self::worker::Worker; -use crate::executor::loom::sync::Arc; +use crate::loom::sync::Arc; pub(crate) fn build<T: 'static>(workers: usize) -> Vec<Worker<T>> { let local: Vec<_> = (0..workers).map(|_| local::Queue::new()).collect(); diff --git a/tokio/src/executor/thread_pool/queue/worker.rs b/tokio/src/executor/thread_pool/queue/worker.rs index 2cd34a97..be255225 100644 --- a/tokio/src/executor/thread_pool/queue/worker.rs +++ b/tokio/src/executor/thread_pool/queue/worker.rs @@ -1,6 +1,6 @@ -use crate::executor::loom::sync::Arc; use crate::executor::task::Task; use crate::executor::thread_pool::queue::{local, Cluster, Inject}; +use crate::loom::sync::Arc; use std::cell::Cell; use std::fmt; diff --git a/tokio/src/executor/thread_pool/set.rs b/tokio/src/executor/thread_pool/set.rs index e878cad5..c55f5d7d 100644 --- a/tokio/src/executor/thread_pool/set.rs +++ b/tokio/src/executor/thread_pool/set.rs @@ -2,12 +2,12 @@ //! //! - Attempt to spin. -use crate::executor::loom::rand::seed; -use crate::executor::loom::sync::Arc; use crate::executor::park::Unpark; use crate::executor::task::{self, JoinHandle, Task}; use crate::executor::thread_pool::{current, queue, Idle, Owned, Shared}; use crate::executor::util::{CachePadded, FastRand}; +use crate::loom::rand::seed; +use crate::loom::sync::Arc; use std::cell::UnsafeCell; use std::future::Future; diff --git a/tokio/src/executor/thread_pool/shutdown.rs b/tokio/src/executor/thread_pool/shutdown.rs index b7c4177f..9a0ee479 100644 --- a/tokio/src/executor/thread_pool/shutdown.rs +++ b/tokio/src/executor/thread_pool/shutdown.rs @@ -3,7 +3,7 @@ //! Each worker holds the `Sender` half. When all the `Sender` halves are //! dropped, the `Receiver` receives a notification. -use crate::executor::loom::sync::Arc; +use crate::loom::sync::Arc; use crate::sync::oneshot; #[derive(Debug, Clone)] diff --git a/tokio/src/executor/thread_pool/spawner.rs b/tokio/src/executor/thread_pool/spawner.rs index 4fdf9594..28ebe164 100644 --- a/tokio/src/executor/thread_pool/spawner.rs +++ b/tokio/src/executor/thread_pool/spawner.rs @@ -1,7 +1,7 @@ -use crate::executor::loom::sync::Arc; use crate::executor::park::Unpark; use crate::executor::task::JoinHandle; use crate::executor::thread_pool::worker; +use crate::loom::sync::Arc; use std::fmt; use std::future::Future; diff --git a/tokio/src/executor/thread_pool/tests/loom_pool.rs b/tokio/src/executor/thread_pool/tests/loom_pool.rs index 6b2b5c30..fff00d67 100644 --- a/tokio/src/executor/thread_pool/tests/loom_pool.rs +++ b/tokio/src/executor/thread_pool/tests/loom_pool.rs @@ -1,14 +1,13 @@ -use crate::executor::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use crate::executor::loom::sync::atomic::{AtomicBool, AtomicUsize}; -use crate::executor::loom::sync::{Arc, Mutex}; use crate::executor::park::{Park, Unpark}; use crate::executor::tests::loom_oneshot as oneshot; use crate::executor::thread_pool::{self, Builder}; use crate::spawn; -use loom::sync::Notify; +use loom::sync::atomic::{AtomicBool, AtomicUsize}; +use loom::sync::{Arc, Mutex, Notify}; use std::future::Future; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use std::time::Duration; #[test] @@ -190,7 +189,7 @@ fn gated() -> impl Future<Output = &'static str> { } fn gated2(thread: bool) -> impl Future<Output = &'static str> { - use crate::executor::loom::thread; + use loom::thread; use std::sync::Arc; let gate = Arc::new(AtomicBool::new(false)); diff --git a/tokio/src/executor/thread_pool/worker.rs b/tokio/src/executor/thread_pool/worker.rs index 25d25966..10aeeae4 100644 --- a/tokio/src/executor/thread_pool/worker.rs +++ b/tokio/src/executor/thread_pool/worker.rs @@ -1,7 +1,7 @@ -use crate::executor::loom::sync::Arc; use crate::executor::park::{Park, Unpark}; use crate::executor::task::Task; use crate::executor::thread_pool::{current, Owned, Shared, Spawner}; +use crate::loom::sync::Arc; use std::cell::Cell; use std::ops::{Deref, DerefMut}; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 5d18e97c..89547c1d 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -98,7 +98,7 @@ pub mod io; #[cfg(feature = "net-driver")] pub mod net; -#[cfg(feature = "net-driver")] +#[cfg(any(feature = "sync", feature = "blocking", feature = "rt-current-thread"))] mod loom; pub mod prelude; diff --git a/tokio/src/loom.rs b/tokio/src/loom.rs deleted file mode 100644 index 57ce2df6..00000000 --- a/tokio/src/loom.rs +++ /dev/null @@ -1,45 +0,0 @@ -//! This module abstracts over `loom` and `std::sync` depending on whether we -//! are running tests or not. -pub(crate) use self::inner::*; - -#[cfg(all(test, loom))] -mod inner { - pub(crate) use loom::sync::CausalCell; - pub(crate) use loom::sync::Mutex; - pub(crate) mod atomic { - pub(crate) use loom::sync::atomic::*; - pub(crate) use std::sync::atomic::Ordering; - } -} - -#[cfg(not(all(test, loom)))] -mod inner { - use std::cell::UnsafeCell; - pub(crate) use std::sync::atomic; - pub(crate) use std::sync::Mutex; - - #[derive(Debug)] - pub(crate) struct CausalCell<T>(UnsafeCell<T>); - - impl<T> CausalCell<T> { - pub(crate) fn new(data: T) -> CausalCell<T> { - CausalCell(UnsafeCell::new(data)) - } - - #[inline(always)] - pub(crate) fn with<F, R>(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - #[inline(always)] - pub(crate) fn with_mut<F, R>(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } - } -} diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs new file mode 100644 index 00000000..78913952 --- /dev/null +++ b/tokio/src/loom/mocked.rs @@ -0,0 +1,13 @@ +pub(crate) use loom::*; + +pub(crate) mod rand { + pub(crate) fn seed() -> u64 { + 1 + } +} + +pub(crate) mod sys { + pub(crate) fn num_cpus() -> usize { + 2 + } +} diff --git a/tokio/src/loom/mod.rs b/tokio/src/loom/mod.rs new file mode 100644 index 00000000..56a41f25 --- /dev/null +++ b/tokio/src/loom/mod.rs @@ -0,0 +1,12 @@ +//! This module abstracts over `loom` and `std::sync` depending on whether we +//! are running tests or not. + +#[cfg(not(all(test, loom)))] +mod std; +#[cfg(not(all(test, loom)))] +pub(crate) use self::std::*; + +#[cfg(all(test, loom))] +mod mocked; +#[cfg(all(test, loom))] +pub(crate) use self::mocked::*; diff --git a/tokio/src/loom/std/alloc.rs b/tokio/src/loom/std/alloc.rs new file mode 100644 index 00000000..25b199b1 --- /dev/null +++ b/tokio/src/loom/std/alloc.rs @@ -0,0 +1,18 @@ +#[derive(Debug)] +pub(crate) struct Track<T> { + value: T, +} + +impl<T> Track<T> { + pub(crate) fn new(value: T) -> Track<T> { + Track { value } + } + + pub(crate) fn get_mut(&mut self) -> &mut T { + &mut self.value + } + + pub(crate) fn into_inner(self) -> T { + self.value + } +} diff --git a/tokio/src/executor/loom/std/atomic_u32.rs b/tokio/src/loom/std/atomic_u32.rs index 0128ab2b..0128ab2b 100644 --- a/tokio/src/executor/loom/std/atomic_u32.rs +++ b/tokio/src/loom/std/atomic_u32.rs diff --git a/tokio/src/executor/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs index 4a424b1e..d255d087 100644 --- a/tokio/src/executor/loom/std/atomic_usize.rs +++ b/tokio/src/loom/std/atomic_usize.rs @@ -1,6 +1,6 @@ use std::cell::UnsafeCell; use std::fmt; -use std::ops::Deref; +use std::ops; /// `AtomicUsize` providing an additional `load_unsync` function. pub(crate) struct AtomicUsize { @@ -11,7 +11,6 @@ unsafe impl Send for AtomicUsize {} unsafe impl Sync for AtomicUsize {} impl AtomicUsize { - #[cfg(feature = "rt-current-thread")] pub(crate) fn new(val: usize) -> AtomicUsize { let inner = UnsafeCell::new(std::sync::atomic::AtomicUsize::new(val)); AtomicUsize { inner } @@ -23,13 +22,12 @@ impl AtomicUsize { /// /// All mutations must have happened before the unsynchronized load. /// Additionally, there must be no concurrent mutations. - #[cfg(feature = "rt-full")] pub(crate) unsafe fn unsync_load(&self) -> usize { *(*self.inner.get()).get_mut() } } -impl Deref for AtomicUsize { +impl ops::Deref for AtomicUsize { type Target = std::sync::atomic::AtomicUsize; fn deref(&self) -> &Self::Target { @@ -39,8 +37,15 @@ impl Deref for AtomicUsize { } } +impl ops::DerefMut for AtomicUsize { + fn deref_mut(&mut self) -> &mut Self::Target { + // safety: we hold `&mut self` + unsafe { &mut *self.inner.get() } + } +} + impl fmt::Debug for AtomicUsize { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - self.deref().fmt(fmt) + (**self).fmt(fmt) } } diff --git a/tokio/src/executor/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs index bb9b31e5..c4917e5f 100644 --- a/tokio/src/executor/loom/std/causal_cell.rs +++ b/tokio/src/loom/std/causal_cell.rs @@ -1,5 +1,6 @@ use std::cell::UnsafeCell; +#[derive(Debug)] pub(crate) struct CausalCell<T>(UnsafeCell<T>); #[derive(Default)] @@ -44,6 +45,5 @@ impl<T> CausalCell<T> { impl CausalCheck { pub(crate) fn check(self) {} - #[cfg(feature = "rt-full")] pub(crate) fn join(&mut self, _other: CausalCheck) {} } diff --git a/tokio/src/executor/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 3293f440..5bbf1531 100644 --- a/tokio/src/executor/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,39 +1,21 @@ // rt-full implies rt-current-thread -#[cfg(feature = "rt-full")] +#![cfg_attr(not(feature = "rt-full"), allow(unused_imports, dead_code))] + mod atomic_u32; mod atomic_usize; -#[cfg(feature = "rt-current-thread")] mod causal_cell; -#[cfg(feature = "rt-current-thread")] -pub(crate) mod alloc { - #[derive(Debug)] - pub(crate) struct Track<T> { - value: T, - } - - impl<T> Track<T> { - pub(crate) fn new(value: T) -> Track<T> { - Track { value } - } +pub(crate) mod alloc; - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.value - } - - pub(crate) fn into_inner(self) -> T { - self.value - } - |