diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-04 14:12:24 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-04 14:12:24 -0800 |
commit | 94f9b04b066cfc3da5c3ee2c961c21a9496135dd (patch) | |
tree | 8909528fef4351469835564f48cc7577559c19b3 /tokio/src | |
parent | 966ccd5d5306adf6b6c39721331c2a3c32be6fa8 (diff) |
executor: switch some APIs to crate private. (#1731)
* switch `enter` to crate private
* make executor types pub(crate)
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/executor/blocking/mod.rs | 28 | ||||
-rw-r--r-- | tokio/src/executor/enter.rs | 14 | ||||
-rw-r--r-- | tokio/src/executor/loom/mod.rs | 1 | ||||
-rw-r--r-- | tokio/src/executor/loom/std/atomic_usize.rs | 1 | ||||
-rw-r--r-- | tokio/src/executor/loom/std/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/executor/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/executor/park/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/executor/park/thread.rs | 108 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/builder.rs | 75 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/mod.rs | 13 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/park.rs | 182 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/pool.rs | 23 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/spawner.rs | 4 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/tests/loom_pool.rs | 63 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/tests/pool.rs | 22 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/worker.rs | 19 | ||||
-rw-r--r-- | tokio/src/runtime/builder.rs | 2 | ||||
-rw-r--r-- | tokio/src/timer/timer/mod.rs | 8 |
18 files changed, 193 insertions, 384 deletions
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 92dc1c36..2947062b 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -249,6 +249,34 @@ impl Drop for PoolWaiter { } } +/// Run the provided blocking function without blocking the executor. +/// +/// In general, issuing a blocking call or performing a lot of compute in a +/// future without yielding is not okay, as it may prevent the executor from +/// driving other futures forward. If you run a closure through this method, +/// the current executor thread will relegate all its executor duties to another +/// (possibly new) thread, and only then poll the task. Note that this requires +/// additional synchronization. +/// +/// # Examples +/// +/// ``` +/// # async fn docs() { +/// tokio::executor::blocking::in_place(move || { +/// // do some compute-heavy work or call synchronous code +/// }); +/// # } +/// ``` +#[cfg(feature = "rt-full")] +pub fn in_place<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + use crate::executor; + + executor::enter::exit(|| executor::thread_pool::blocking(f)) +} + /// Run the provided closure on a thread where blocking is acceptable. /// /// In general, issuing a blocking call or performing a lot of compute in a future without diff --git a/tokio/src/executor/enter.rs b/tokio/src/executor/enter.rs index 56b645a7..19ae26e0 100644 --- a/tokio/src/executor/enter.rs +++ b/tokio/src/executor/enter.rs @@ -1,6 +1,7 @@ use std::cell::{Cell, RefCell}; use std::error::Error; use std::fmt; +#[cfg(feature = "rt-full")] use std::future::Future; use std::marker::PhantomData; @@ -9,13 +10,13 @@ thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); /// Represents an executor context. /// /// For more details, see [`enter` documentation](fn.enter.html) -pub struct Enter { +pub(crate) struct Enter { _p: PhantomData<RefCell<()>>, } /// An error returned by `enter` if an execution scope has already been /// entered. -pub struct EnterError { +pub(crate) struct EnterError { _a: (), } @@ -49,7 +50,7 @@ impl Error for EnterError {} /// # Error /// /// Returns an error if the current thread is already marked -pub fn enter() -> Result<Enter, EnterError> { +pub(crate) fn enter() -> Result<Enter, EnterError> { ENTERED.with(|c| { if c.get() { Err(EnterError { _a: () }) @@ -68,8 +69,8 @@ pub fn enter() -> Result<Enter, EnterError> { // // This is hidden for a reason. Do not use without fully understanding // executors. Misuing can easily cause your program to deadlock. -#[doc(hidden)] -pub fn exit<F: FnOnce() -> R, R>(f: F) -> R { +#[cfg(feature = "rt-full")] +pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { // Reset in case the closure panics struct Reset; impl Drop for Reset { @@ -100,7 +101,8 @@ pub fn exit<F: FnOnce() -> R, R>(f: F) -> R { impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. - pub fn block_on<F: Future>(&mut self, mut f: F) -> F::Output { + #[cfg(feature = "rt-full")] + pub(crate) fn block_on<F: Future>(&mut self, mut f: F) -> F::Output { use crate::executor::park::{Park, ParkThread}; use std::pin::Pin; use std::task::Context; diff --git a/tokio/src/executor/loom/mod.rs b/tokio/src/executor/loom/mod.rs index fa071d05..f6f9d23c 100644 --- a/tokio/src/executor/loom/mod.rs +++ b/tokio/src/executor/loom/mod.rs @@ -22,6 +22,7 @@ pub(crate) mod std { #[cfg(feature = "rt-full")] pub(crate) use self::std::rand; +#[cfg(any(feature = "blocking", feature = "rt-current-thread"))] pub(crate) use self::std::sync; #[cfg(any(feature = "blocking", feature = "rt-full"))] pub(crate) use self::std::thread; diff --git a/tokio/src/executor/loom/std/atomic_usize.rs b/tokio/src/executor/loom/std/atomic_usize.rs index 3cabbded..4a424b1e 100644 --- a/tokio/src/executor/loom/std/atomic_usize.rs +++ b/tokio/src/executor/loom/std/atomic_usize.rs @@ -11,6 +11,7 @@ unsafe impl Send for AtomicUsize {} unsafe impl Sync for AtomicUsize {} impl AtomicUsize { + #[cfg(feature = "rt-current-thread")] pub(crate) fn new(val: usize) -> AtomicUsize { let inner = UnsafeCell::new(std::sync::atomic::AtomicUsize::new(val)); AtomicUsize { inner } diff --git a/tokio/src/executor/loom/std/mod.rs b/tokio/src/executor/loom/std/mod.rs index eb865d98..3293f440 100644 --- a/tokio/src/executor/loom/std/mod.rs +++ b/tokio/src/executor/loom/std/mod.rs @@ -56,11 +56,13 @@ pub(crate) mod rand { } pub(crate) mod sync { + #[cfg(any(feature = "blocking", feature = "rt-current-thread"))] pub(crate) use std::sync::{Arc, Condvar, Mutex}; pub(crate) mod atomic { #[cfg(feature = "rt-full")] pub(crate) use crate::executor::loom::std::atomic_u32::AtomicU32; + #[cfg(feature = "rt-current-thread")] pub(crate) use crate::executor::loom::std::atomic_usize::AtomicUsize; #[cfg(feature = "rt-full")] diff --git a/tokio/src/executor/mod.rs b/tokio/src/executor/mod.rs index a888c61b..fa2b29e4 100644 --- a/tokio/src/executor/mod.rs +++ b/tokio/src/executor/mod.rs @@ -44,8 +44,10 @@ #[macro_use] mod tests; +#[cfg(feature = "rt-current-thread")] mod enter; -pub use self::enter::{enter, exit, Enter, EnterError}; +#[cfg(feature = "rt-current-thread")] +pub(crate) use self::enter::enter; mod global; pub use self::global::spawn; @@ -71,6 +73,4 @@ pub mod blocking; pub(crate) mod current_thread; #[cfg(feature = "rt-full")] -pub mod thread_pool; - -pub use futures_util::future::RemoteHandle; +pub(crate) mod thread_pool; diff --git a/tokio/src/executor/park/mod.rs b/tokio/src/executor/park/mod.rs index 16c78da8..9d6d508f 100644 --- a/tokio/src/executor/park/mod.rs +++ b/tokio/src/executor/park/mod.rs @@ -44,8 +44,10 @@ //! [up]: trait.Unpark.html //! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html +#[cfg(feature = "rt-full")] mod thread; -pub use self::thread::{ParkError, ParkThread, UnparkThread}; +#[cfg(feature = "rt-full")] +pub(crate) use self::thread::ParkThread; use std::sync::Arc; use std::time::Duration; diff --git a/tokio/src/executor/park/thread.rs b/tokio/src/executor/park/thread.rs index af3d632b..c48b418d 100644 --- a/tokio/src/executor/park/thread.rs +++ b/tokio/src/executor/park/thread.rs @@ -3,10 +3,8 @@ use crate::executor::loom::sync::{Arc, Condvar, Mutex}; use crate::executor::park::{Park, Unpark}; use std::marker::PhantomData; -use std::mem; use std::rc::Rc; use std::sync::atomic::Ordering; -use std::task::{RawWaker, RawWakerVTable, Waker}; use std::time::Duration; /// Blocks the current thread using a condition variable. @@ -20,7 +18,7 @@ use std::time::Duration; /// means that an instance of `ParkThread` might be unblocked by a handle /// associated with a different `ParkThread` instance. #[derive(Debug)] -pub struct ParkThread { +pub(crate) struct ParkThread { _anchor: PhantomData<Rc<()>>, } @@ -30,7 +28,7 @@ pub struct ParkThread { /// /// [`ParkThread`]: struct.ParkThread.html #[derive(Debug)] -pub struct ParkError { +pub(crate) struct ParkError { _p: (), } @@ -40,7 +38,7 @@ struct Parker { /// Unblocks a thread that was blocked by `ParkThread`. #[derive(Clone, Debug)] -pub struct UnparkThread { +pub(crate) struct UnparkThread { inner: Arc<Inner>, } @@ -62,7 +60,7 @@ thread_local! { // ==== impl Parker ==== impl Parker { - pub(crate) fn new() -> Self { + fn new() -> Self { Self { unparker: Arc::new(Inner { state: AtomicUsize::new(IDLE), @@ -72,15 +70,15 @@ impl Parker { } } - pub(crate) fn unparker(&self) -> &Arc<Inner> { + fn unparker(&self) -> &Arc<Inner> { &self.unparker } - pub(crate) fn park(&self) -> Result<(), ParkError> { + fn park(&self) -> Result<(), ParkError> { self.unparker.park(None) } - pub(crate) fn park_timeout(&self, timeout: Duration) -> Result<(), ParkError> { + fn park_timeout(&self, timeout: Duration) -> Result<(), ParkError> { self.unparker.park(Some(timeout)) } } @@ -88,17 +86,8 @@ impl Parker { // ==== impl Inner ==== impl Inner { - #[allow(clippy::wrong_self_convention)] - pub(crate) fn into_raw(this: Arc<Inner>) -> *const () { - Arc::into_raw(this) as *const () - } - - pub(crate) unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> { - Arc::from_raw(ptr as *const Inner) - } - /// Park the current thread for at most `dur`. - pub(crate) fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> { + fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> { // If currently notified, then we skip sleeping. This is checked outside // of the lock to avoid acquiring a mutex if not necessary. match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { @@ -140,7 +129,7 @@ impl Inner { Ok(()) } - pub(crate) fn unpark(&self) { + fn unpark(&self) { // First, try transitioning from IDLE -> NOTIFY, this does not require a // lock. match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { @@ -172,7 +161,7 @@ impl ParkThread { /// /// This type cannot be moved to other threads, so it should be created on /// the thread that the caller intends to park. - pub fn new() -> ParkThread { + pub(crate) fn new() -> ParkThread { ParkThread { _anchor: PhantomData, } @@ -221,43 +210,64 @@ impl Unpark for UnparkThread { } } -static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker); +#[cfg(feature = "rt-full")] +mod waker { + use super::{Inner, UnparkThread}; + use crate::executor::loom::sync::Arc; + + use std::mem; + use std::task::{RawWaker, RawWakerVTable, Waker}; -impl UnparkThread { - pub(crate) fn into_waker(self) -> Waker { - unsafe { - let raw = unparker_to_raw_waker(self.inner); - Waker::from_raw(raw) + impl UnparkThread { + pub(crate) fn into_waker(self) -> Waker { + unsafe { + let raw = unparker_to_raw_waker(self.inner); + Waker::from_raw(raw) + } } } -} -unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker { - RawWaker::new(Inner::into_raw(unparker), &VTABLE) -} + impl Inner { + #[allow(clippy::wrong_self_convention)] + fn into_raw(this: Arc<Inner>) -> *const () { + Arc::into_raw(this) as *const () + } + + unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> { + Arc::from_raw(ptr as *const Inner) + } + } -unsafe fn clone(raw: *const ()) -> RawWaker { - let unparker = Inner::from_raw(raw); + unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker { + RawWaker::new( + Inner::into_raw(unparker), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), + ) + } - // Increment the ref count - mem::forget(unparker.clone()); + unsafe fn clone(raw: *const ()) -> RawWaker { + let unparker = Inner::from_raw(raw); - unparker_to_raw_waker(unparker) -} + // Increment the ref count + mem::forget(unparker.clone()); -unsafe fn drop_waker(raw: *const ()) { - let _ = Inner::from_raw(raw); -} + unparker_to_raw_waker(unparker) + } -unsafe fn wake(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); -} + unsafe fn drop_waker(raw: *const ()) { + let _ = Inner::from_raw(raw); + } -unsafe fn wake_by_ref(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); + unsafe fn wake(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); + } - // We don't actually own a reference to the unparker - mem::forget(unparker); + unsafe fn wake_by_ref(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); + + // We don't actually own a reference to the unparker + mem::forget(unparker); + } } diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs index c1298a5a..f9d40350 100644 --- a/tokio/src/executor/thread_pool/builder.rs +++ b/tokio/src/executor/thread_pool/builder.rs @@ -2,13 +2,12 @@ use crate::executor::loom::sync::Arc; use crate::executor::loom::sys::num_cpus; use crate::executor::loom::thread; use crate::executor::park::Park; -use crate::executor::thread_pool::park::DefaultPark; use crate::executor::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool}; use std::{fmt, usize}; /// Builds a thread pool with custom configuration values. -pub struct Builder { +pub(crate) struct Builder { /// Number of worker threads to spawn pool_size: usize, @@ -29,7 +28,7 @@ type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; impl Builder { /// Returns a new thread pool builder initialized with default configuration /// values. - pub fn new() -> Builder { + pub(crate) fn new() -> Builder { Builder { pool_size: num_cpus(), name: "tokio-runtime-worker".to_string(), @@ -44,17 +43,7 @@ impl Builder { /// this value on the smaller side. /// /// The default value is the number of cores available to the system. - /// - /// # Examples - /// - /// ``` - /// use tokio::executor::thread_pool::Builder; - /// - /// let thread_pool = Builder::new() - /// .num_threads(4) - /// .build(); - /// ``` - pub fn num_threads(&mut self, value: usize) -> &mut Self { + pub(crate) fn num_threads(&mut self, value: usize) -> &mut Self { self.pool_size = value; self } @@ -63,17 +52,7 @@ impl Builder { /// /// If this configuration is not set, then the thread will use the system /// default naming scheme. - /// - /// # Examples - /// - /// ``` - /// use tokio::executor::thread_pool::Builder; - /// - /// let thread_pool = Builder::new() - /// .name("my-pool") - /// .build(); - /// ``` - pub fn name<S: Into<String>>(&mut self, val: S) -> &mut Self { + pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self { self.name = val.into(); self } @@ -85,17 +64,7 @@ impl Builder { /// /// The default stack size for spawned threads is 2 MiB, though this /// particular stack size is subject to change in the future. - /// - /// # Examples - /// - /// ``` - /// use tokio::executor::thread_pool::Builder; - /// - /// let thread_pool = Builder::new() - /// .stack_size(32 * 1024) - /// .build(); - /// ``` - pub fn stack_size(&mut self, val: usize) -> &mut Self { + pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self { self.stack_size = Some(val); self } @@ -105,21 +74,7 @@ impl Builder { /// This function is provided a function that executes the worker and is /// expected to call it, otherwise the worker thread will shutdown without /// doing any work. - /// - /// # Examples - /// - /// ``` - /// use tokio::executor::thread_pool::Builder; - /// - /// let thread_pool = Builder::new() - /// .around_worker(|index, work| { - /// println!("worker {} is starting up", index); - /// work(); - /// println!("worker {} is shutting down", index); - /// }) - /// .build(); - /// ``` - pub fn around_worker<F>(&mut self, f: F) -> &mut Self + pub(crate) fn around_worker<F>(&mut self, f: F) -> &mut Self where F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static, { @@ -127,27 +82,11 @@ impl Builder { self } - /// Create the configured `ThreadPool`. - /// - /// The returned `ThreadPool` instance is ready to spawn tasks. - /// - /// # Examples - /// - /// ``` - /// use tokio::executor::thread_pool::Builder; - /// - /// let thread_pool = Builder::new() - /// .build(); - /// ``` - pub fn build(&self) -> ThreadPool { - self.build_with_park(|_| DefaultPark::new()) - } - /// Create the configured `ThreadPool` with a custom `park` instances. /// /// The provided closure `build_park` is called once per worker and returns /// a `Park` instance that is used by the worker to put itself to sleep. - pub fn build_with_park<F, P>(&self, mut build_park: F) -> ThreadPool + pub(crate) fn build<F, P>(&self, mut build_park: F) -> ThreadPool where F: FnMut(usize) -> P, P: Park + Send + 'static, diff --git a/tokio/src/executor/thread_pool/mod.rs b/tokio/src/executor/thread_pool/mod.rs index c18d8766..0c7ce42a 100644 --- a/tokio/src/executor/thread_pool/mod.rs +++ b/tokio/src/executor/thread_pool/mod.rs @@ -1,7 +1,7 @@ //! Threadpool mod builder; -pub use self::builder::Builder; +pub(crate) use self::builder::Builder; mod current; @@ -11,15 +11,13 @@ use self::idle::Idle; mod owned; use self::owned::Owned; -mod park; - mod pool; -pub use self::pool::ThreadPool; +pub(crate) use self::pool::ThreadPool; mod queue; mod spawner; -pub use self::spawner::Spawner; +pub(crate) use self::spawner::Spawner; mod set; @@ -29,14 +27,13 @@ use self::shared::Shared; mod shutdown; mod worker; +#[cfg(feature = "blocking")] +pub(crate) use worker::blocking; /// Unit tests #[cfg(test)] mod tests; -#[cfg(feature = "blocking")] -pub use worker::blocking; - #[cfg(not(loom))] const LOCAL_QUEUE_CAPACITY: usize = 256; diff --git a/tokio/src/executor/thread_pool/park.rs b/tokio/src/executor/thread_pool/park.rs deleted file mode 100644 index 225d21db..00000000 --- a/tokio/src/executor/thread_pool/park.rs +++ /dev/null @@ -1,182 +0,0 @@ -use crate::executor::loom::sync::atomic::AtomicUsize; -use crate::executor::loom::sync::{Arc, Condvar, Mutex}; -use crate::executor::park::{Park, Unpark}; - -use std::error::Error; -use std::fmt; -use std::sync::atomic::Ordering::SeqCst; -use std::time::Duration; - -/// Parks the thread. -#[derive(Debug)] -pub(crate) struct DefaultPark { - inner: Arc<Inner>, -} - -/// Unparks threads that were parked by `DefaultPark`. -#[derive(Debug)] -pub(crate) struct DefaultUnpark { - inner: Arc<Inner>, -} - -/// Error returned by [`ParkThread`] -/// -/// This currently is never returned, but might at some point in the future. -/// -/// [`ParkThread`]: struct.ParkThread.html -#[derive(Debug)] -pub(crate) struct ParkError { - _p: (), -} - -const EMPTY: usize = 0; -const PARKED: usize = 1; -const NOTIFIED: usize = 2; - -#[derive(Debug)] -struct Inner { - state: AtomicUsize, - lock: Mutex<()>, - cvar: Condvar, -} - -impl DefaultPark { - /// Creates a new `DefaultPark` instance. - pub(crate) fn new() -> DefaultPark { - DefaultPark { - inner: Arc::new(Inner { - state: AtomicUsize::new(EMPTY), - lock: Mutex::new(()), - cvar: Condvar::new(), - }), - } - } -} - -impl Park for DefaultPark { - type Unpark = DefaultUnpark; - type Error = ParkError; - - fn unpark(&self) -> Self::Unpark { - let inner = self.inner.clone(); - DefaultUnpark { inner } - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park(None); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.inner.park(Some(duration)); - Ok(()) - } -} - -impl Unpark for DefaultUnpark { - fn unpark(&self) { - self.inner.unpark(); - } -} - -impl fmt::Display for ParkError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "unknown park error") - } -} - -impl Error for ParkError {} - -impl Inner { - fn park(&self, timeout: Option<Duration>) { - // If we were previously notified then we consume this notification and return quickly. - if self - .state - .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) - .is_ok() - { - return; - } - - // If the timeout is zero, then there is no need to actually block. - if let Some(ref dur) = timeout { - if *dur == Duration::from_millis(0) { - return; - } - } - - // Otherwise we need to coordinate going to sleep. - let mut _m = self.lock.lock().unwrap(); - - match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { - Ok(_) => {} - // Consume this notification to avoid spurious wakeups in the next park. - Err(NOTIFIED) => { - // We must read `state` here, even though we know it will be `NOTIFIED`. This is - // because `unpark` may have been called again since we read `NOTIFIED` in the - // `compare_exchange` above. We must perform an acquire operation that synchronizes - // with that `unpark` to observe any writes it made before the call to `unpark`. To - // do that we must read from the write it made to `state`. - let old = self.state.swap(EMPTY, SeqCst); - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } - Err(n) => panic!("inconsistent park_timeout state: {}", n), - } - - match timeout { - None => { - loop { - // Block the current thread on the conditional variable. - _m = self.cvar.wait(_m).unwrap(); - - if self - .state - .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) - .is_ok() - { - return; // got a notification - } - - // spurious wakeup, go back to sleep - } - } - Some(timeout) => { - // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a - // notification we just want to unconditionally set `state` back to `EMPTY`, either - // consuming a notification or un-flagging ourselves as parked. - _m = self.cvar.wait_timeout(_m, timeout).unwrap().0; - - match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => {} // got a notification - PARKED => {} // no notification - n => panic!("inconsistent park_timeout state: {}", n), - } - } - } - } - - fn unpark(&self) { - // To ensure the unparked thread will observe any writes we made before this call, we must - // perform a release operation that `park` can synchronize with. To do that we must write - // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather - // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. - match self.state.swap(NOTIFIED, SeqCst) { - EMPTY => return, // no one was waiting - NOTIFIED => return, // already unparked - PARKED => {} // gotta go wake someone up - n => panic!("inconsistent state in unpark: {}", n), - } - - // There is a period between when the parked thread sets `state` to `PARKED` (or last - // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. - // If we were to notify during this period it would be ignored and then when the parked - // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this - // stage so we can acquire `lock` to wait until it is ready to receive the notification. - // - // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes - // it doesn't get woken only to have to wait for us to release `lock`. - drop(self.lock.lock()); - self.cvar.notify_one(); - } -} diff --git a/tokio/src/executor/thread_pool/pool.rs b/tokio/src/executor/thread_pool/pool.rs index 67235eae..d3219d87 100644 --- a/tokio/src/executor/thread_pool/pool.rs +++ b/tokio/src/executor/thread_pool/pool.rs @@ -1,12 +1,12 @@ use crate::executor::blocking::PoolWaiter; use crate::executor::task::JoinHandle; -use crate::executor::thread_pool::{shutdown, Builder, Spawner}; +use crate::executor::thread_pool::{shutdown, Spawner}; use std::fmt; use std::future::Future; /// Work-stealing based thread pool for executing futures. -pub struct ThreadPool { +pub(crate) struct ThreadPool { spawner: Spawner, /// Shutdown waiter @@ -17,11 +17,6 @@ pub struct ThreadPool { } impl ThreadPool { - /// Create a new ThreadPool with default configuration - pub fn new() -> ThreadPool { - Builder::new().build() - } - pub(super) fn from_parts( spawner: Spawner, shutdown_rx: shutdown::Receiver, @@ -38,12 +33,12 @@ impl ThreadPool { /// /// The `Spawner` handle can be cloned and enables spawning tasks from other /// threads. - pub fn spawner(&self) -> &Spawner { + pub(crate) fn spawner(&self) -> &Spawner { &self.spawner } /// Spawn a task - pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static, @@ -55,7 +50,7 @@ impl ThreadPool { /// /// The future will execute on the current thread, but all spawned tasks /// will be executed on the thread pool. - pub fn block_on<F>(&self, future: F) -> F::Output + pub(crate) fn block_on<F>(&self, future: F) -> F::Output where F: Future, { @@ -69,7 +64,7 @@ impl ThreadPool { } /// Shutdown the thread pool. - pub fn shutdown_now(&mut self) { + pub(crate) fn shutdown_now(&mut self) { if self.spawner.workers().close() { self.shutdown_rx.wait(); } @@ -77,12 +72,6 @@ impl ThreadPool { } } -impl Default for ThreadPool { - fn default() -> ThreadPool { - ThreadPool::new() - } -} - impl fmt::Debug for ThreadPool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("ThreadPool").finish() diff --git a/tokio/src/executor/thread_pool/spawner.rs b/tokio/src/executor/thread_pool/spawner.rs index b33c7cad..4fdf9594 100644 --- a/tokio/src/executor/thread_pool/spawner.rs +++ b/tokio/src/executor/thread_pool/spawner.rs @@ -19,7 +19,7 @@ use std::future::Future; /// /// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner #[derive(Clone)] -pub struct Spawner { +pub(crate) struct Spawner { workers: Arc<worker::Set<Box<dyn Unpark>>>, } @@ -29,7 +29,7 @@ impl Spawner { } /// Spawn a future onto the thread pool - pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static, diff --git a/tokio/src/executor/thread_pool/tests/loom_pool.rs b/tokio/src/executor/thread_pool/tests/loom_pool.rs index 26c794ec..6b2b5c30 100644 --- a/tokio/src/executor/thread_pool/tests/loom_pool.rs +++ b/tokio/src/executor/thread_pool/tests/loom_pool.rs @@ -1,16 +1,20 @@ use crate::executor::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crate::executor::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::executor::loom::sync::{Arc, Mutex}; +use crate::executor::park::{Park, Unpark}; use crate::executor::tests::loom_oneshot as oneshot; -use crate::executor::thread_pool::{self, Builder, ThreadPool}; +use crate::executor::thread_pool::{self, Builder}; use crate::spawn; +use loom::sync::Notify; + use std::future::Future; +use std::time::Duration; #[test] fn pool_multi_spawn() { loom::model(|| { - let pool = ThreadPool::new(); + let pool = Builder::new().build(|_| LoomPark::new()); let c1 = Arc::new(AtomicUsize::new(0)); @@ -44,7 +48,7 @@ fn pool_multi_spawn() { #[test] fn only_blocking() { loom::model(|| { - let mut pool = Builder::new().num_threads(1).build(); + let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new()); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { @@ -62,7 +66,7 @@ fn only_blocking() { fn blocking_and_regular() { const NUM: usize = 3; loom::model(|| { - let mut pool = Builder::new().num_threads(1).build(); + let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new()); |