summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/executor/blocking/mod.rs28
-rw-r--r--tokio/src/executor/enter.rs14
-rw-r--r--tokio/src/executor/loom/mod.rs1
-rw-r--r--tokio/src/executor/loom/std/atomic_usize.rs1
-rw-r--r--tokio/src/executor/loom/std/mod.rs2
-rw-r--r--tokio/src/executor/mod.rs8
-rw-r--r--tokio/src/executor/park/mod.rs4
-rw-r--r--tokio/src/executor/park/thread.rs108
-rw-r--r--tokio/src/executor/thread_pool/builder.rs75
-rw-r--r--tokio/src/executor/thread_pool/mod.rs13
-rw-r--r--tokio/src/executor/thread_pool/park.rs182
-rw-r--r--tokio/src/executor/thread_pool/pool.rs23
-rw-r--r--tokio/src/executor/thread_pool/spawner.rs4
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_pool.rs63
-rw-r--r--tokio/src/executor/thread_pool/tests/pool.rs22
-rw-r--r--tokio/src/executor/thread_pool/worker.rs19
-rw-r--r--tokio/src/runtime/builder.rs2
-rw-r--r--tokio/src/timer/timer/mod.rs8
-rw-r--r--tokio/tests/executor_enter.rs17
-rw-r--r--tokio/tests/net_driver.rs2
-rw-r--r--tokio/tests/rt_thread_pool.rs8
21 files changed, 196 insertions, 408 deletions
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs
index 92dc1c36..2947062b 100644
--- a/tokio/src/executor/blocking/mod.rs
+++ b/tokio/src/executor/blocking/mod.rs
@@ -249,6 +249,34 @@ impl Drop for PoolWaiter {
}
}
+/// Run the provided blocking function without blocking the executor.
+///
+/// In general, issuing a blocking call or performing a lot of compute in a
+/// future without yielding is not okay, as it may prevent the executor from
+/// driving other futures forward. If you run a closure through this method,
+/// the current executor thread will relegate all its executor duties to another
+/// (possibly new) thread, and only then poll the task. Note that this requires
+/// additional synchronization.
+///
+/// # Examples
+///
+/// ```
+/// # async fn docs() {
+/// tokio::executor::blocking::in_place(move || {
+/// // do some compute-heavy work or call synchronous code
+/// });
+/// # }
+/// ```
+#[cfg(feature = "rt-full")]
+pub fn in_place<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ use crate::executor;
+
+ executor::enter::exit(|| executor::thread_pool::blocking(f))
+}
+
/// Run the provided closure on a thread where blocking is acceptable.
///
/// In general, issuing a blocking call or performing a lot of compute in a future without
diff --git a/tokio/src/executor/enter.rs b/tokio/src/executor/enter.rs
index 56b645a7..19ae26e0 100644
--- a/tokio/src/executor/enter.rs
+++ b/tokio/src/executor/enter.rs
@@ -1,6 +1,7 @@
use std::cell::{Cell, RefCell};
use std::error::Error;
use std::fmt;
+#[cfg(feature = "rt-full")]
use std::future::Future;
use std::marker::PhantomData;
@@ -9,13 +10,13 @@ thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
/// Represents an executor context.
///
/// For more details, see [`enter` documentation](fn.enter.html)
-pub struct Enter {
+pub(crate) struct Enter {
_p: PhantomData<RefCell<()>>,
}
/// An error returned by `enter` if an execution scope has already been
/// entered.
-pub struct EnterError {
+pub(crate) struct EnterError {
_a: (),
}
@@ -49,7 +50,7 @@ impl Error for EnterError {}
/// # Error
///
/// Returns an error if the current thread is already marked
-pub fn enter() -> Result<Enter, EnterError> {
+pub(crate) fn enter() -> Result<Enter, EnterError> {
ENTERED.with(|c| {
if c.get() {
Err(EnterError { _a: () })
@@ -68,8 +69,8 @@ pub fn enter() -> Result<Enter, EnterError> {
//
// This is hidden for a reason. Do not use without fully understanding
// executors. Misuing can easily cause your program to deadlock.
-#[doc(hidden)]
-pub fn exit<F: FnOnce() -> R, R>(f: F) -> R {
+#[cfg(feature = "rt-full")]
+pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
// Reset in case the closure panics
struct Reset;
impl Drop for Reset {
@@ -100,7 +101,8 @@ pub fn exit<F: FnOnce() -> R, R>(f: F) -> R {
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
- pub fn block_on<F: Future>(&mut self, mut f: F) -> F::Output {
+ #[cfg(feature = "rt-full")]
+ pub(crate) fn block_on<F: Future>(&mut self, mut f: F) -> F::Output {
use crate::executor::park::{Park, ParkThread};
use std::pin::Pin;
use std::task::Context;
diff --git a/tokio/src/executor/loom/mod.rs b/tokio/src/executor/loom/mod.rs
index fa071d05..f6f9d23c 100644
--- a/tokio/src/executor/loom/mod.rs
+++ b/tokio/src/executor/loom/mod.rs
@@ -22,6 +22,7 @@ pub(crate) mod std {
#[cfg(feature = "rt-full")]
pub(crate) use self::std::rand;
+#[cfg(any(feature = "blocking", feature = "rt-current-thread"))]
pub(crate) use self::std::sync;
#[cfg(any(feature = "blocking", feature = "rt-full"))]
pub(crate) use self::std::thread;
diff --git a/tokio/src/executor/loom/std/atomic_usize.rs b/tokio/src/executor/loom/std/atomic_usize.rs
index 3cabbded..4a424b1e 100644
--- a/tokio/src/executor/loom/std/atomic_usize.rs
+++ b/tokio/src/executor/loom/std/atomic_usize.rs
@@ -11,6 +11,7 @@ unsafe impl Send for AtomicUsize {}
unsafe impl Sync for AtomicUsize {}
impl AtomicUsize {
+ #[cfg(feature = "rt-current-thread")]
pub(crate) fn new(val: usize) -> AtomicUsize {
let inner = UnsafeCell::new(std::sync::atomic::AtomicUsize::new(val));
AtomicUsize { inner }
diff --git a/tokio/src/executor/loom/std/mod.rs b/tokio/src/executor/loom/std/mod.rs
index eb865d98..3293f440 100644
--- a/tokio/src/executor/loom/std/mod.rs
+++ b/tokio/src/executor/loom/std/mod.rs
@@ -56,11 +56,13 @@ pub(crate) mod rand {
}
pub(crate) mod sync {
+ #[cfg(any(feature = "blocking", feature = "rt-current-thread"))]
pub(crate) use std::sync::{Arc, Condvar, Mutex};
pub(crate) mod atomic {
#[cfg(feature = "rt-full")]
pub(crate) use crate::executor::loom::std::atomic_u32::AtomicU32;
+ #[cfg(feature = "rt-current-thread")]
pub(crate) use crate::executor::loom::std::atomic_usize::AtomicUsize;
#[cfg(feature = "rt-full")]
diff --git a/tokio/src/executor/mod.rs b/tokio/src/executor/mod.rs
index a888c61b..fa2b29e4 100644
--- a/tokio/src/executor/mod.rs
+++ b/tokio/src/executor/mod.rs
@@ -44,8 +44,10 @@
#[macro_use]
mod tests;
+#[cfg(feature = "rt-current-thread")]
mod enter;
-pub use self::enter::{enter, exit, Enter, EnterError};
+#[cfg(feature = "rt-current-thread")]
+pub(crate) use self::enter::enter;
mod global;
pub use self::global::spawn;
@@ -71,6 +73,4 @@ pub mod blocking;
pub(crate) mod current_thread;
#[cfg(feature = "rt-full")]
-pub mod thread_pool;
-
-pub use futures_util::future::RemoteHandle;
+pub(crate) mod thread_pool;
diff --git a/tokio/src/executor/park/mod.rs b/tokio/src/executor/park/mod.rs
index 16c78da8..9d6d508f 100644
--- a/tokio/src/executor/park/mod.rs
+++ b/tokio/src/executor/park/mod.rs
@@ -44,8 +44,10 @@
//! [up]: trait.Unpark.html
//! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html
+#[cfg(feature = "rt-full")]
mod thread;
-pub use self::thread::{ParkError, ParkThread, UnparkThread};
+#[cfg(feature = "rt-full")]
+pub(crate) use self::thread::ParkThread;
use std::sync::Arc;
use std::time::Duration;
diff --git a/tokio/src/executor/park/thread.rs b/tokio/src/executor/park/thread.rs
index af3d632b..c48b418d 100644
--- a/tokio/src/executor/park/thread.rs
+++ b/tokio/src/executor/park/thread.rs
@@ -3,10 +3,8 @@ use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::park::{Park, Unpark};
use std::marker::PhantomData;
-use std::mem;
use std::rc::Rc;
use std::sync::atomic::Ordering;
-use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
/// Blocks the current thread using a condition variable.
@@ -20,7 +18,7 @@ use std::time::Duration;
/// means that an instance of `ParkThread` might be unblocked by a handle
/// associated with a different `ParkThread` instance.
#[derive(Debug)]
-pub struct ParkThread {
+pub(crate) struct ParkThread {
_anchor: PhantomData<Rc<()>>,
}
@@ -30,7 +28,7 @@ pub struct ParkThread {
///
/// [`ParkThread`]: struct.ParkThread.html
#[derive(Debug)]
-pub struct ParkError {
+pub(crate) struct ParkError {
_p: (),
}
@@ -40,7 +38,7 @@ struct Parker {
/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
-pub struct UnparkThread {
+pub(crate) struct UnparkThread {
inner: Arc<Inner>,
}
@@ -62,7 +60,7 @@ thread_local! {
// ==== impl Parker ====
impl Parker {
- pub(crate) fn new() -> Self {
+ fn new() -> Self {
Self {
unparker: Arc::new(Inner {
state: AtomicUsize::new(IDLE),
@@ -72,15 +70,15 @@ impl Parker {
}
}
- pub(crate) fn unparker(&self) -> &Arc<Inner> {
+ fn unparker(&self) -> &Arc<Inner> {
&self.unparker
}
- pub(crate) fn park(&self) -> Result<(), ParkError> {
+ fn park(&self) -> Result<(), ParkError> {
self.unparker.park(None)
}
- pub(crate) fn park_timeout(&self, timeout: Duration) -> Result<(), ParkError> {
+ fn park_timeout(&self, timeout: Duration) -> Result<(), ParkError> {
self.unparker.park(Some(timeout))
}
}
@@ -88,17 +86,8 @@ impl Parker {
// ==== impl Inner ====
impl Inner {
- #[allow(clippy::wrong_self_convention)]
- pub(crate) fn into_raw(this: Arc<Inner>) -> *const () {
- Arc::into_raw(this) as *const ()
- }
-
- pub(crate) unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
- Arc::from_raw(ptr as *const Inner)
- }
-
/// Park the current thread for at most `dur`.
- pub(crate) fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
+ fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
@@ -140,7 +129,7 @@ impl Inner {
Ok(())
}
- pub(crate) fn unpark(&self) {
+ fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
@@ -172,7 +161,7 @@ impl ParkThread {
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
- pub fn new() -> ParkThread {
+ pub(crate) fn new() -> ParkThread {
ParkThread {
_anchor: PhantomData,
}
@@ -221,43 +210,64 @@ impl Unpark for UnparkThread {
}
}
-static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker);
+#[cfg(feature = "rt-full")]
+mod waker {
+ use super::{Inner, UnparkThread};
+ use crate::executor::loom::sync::Arc;
+
+ use std::mem;
+ use std::task::{RawWaker, RawWakerVTable, Waker};
-impl UnparkThread {
- pub(crate) fn into_waker(self) -> Waker {
- unsafe {
- let raw = unparker_to_raw_waker(self.inner);
- Waker::from_raw(raw)
+ impl UnparkThread {
+ pub(crate) fn into_waker(self) -> Waker {
+ unsafe {
+ let raw = unparker_to_raw_waker(self.inner);
+ Waker::from_raw(raw)
+ }
}
}
-}
-unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
- RawWaker::new(Inner::into_raw(unparker), &VTABLE)
-}
+ impl Inner {
+ #[allow(clippy::wrong_self_convention)]
+ fn into_raw(this: Arc<Inner>) -> *const () {
+ Arc::into_raw(this) as *const ()
+ }
+
+ unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
+ Arc::from_raw(ptr as *const Inner)
+ }
+ }
-unsafe fn clone(raw: *const ()) -> RawWaker {
- let unparker = Inner::from_raw(raw);
+ unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
+ RawWaker::new(
+ Inner::into_raw(unparker),
+ &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
+ )
+ }
- // Increment the ref count
- mem::forget(unparker.clone());
+ unsafe fn clone(raw: *const ()) -> RawWaker {
+ let unparker = Inner::from_raw(raw);
- unparker_to_raw_waker(unparker)
-}
+ // Increment the ref count
+ mem::forget(unparker.clone());
-unsafe fn drop_waker(raw: *const ()) {
- let _ = Inner::from_raw(raw);
-}
+ unparker_to_raw_waker(unparker)
+ }
-unsafe fn wake(raw: *const ()) {
- let unparker = Inner::from_raw(raw);
- unparker.unpark();
-}
+ unsafe fn drop_waker(raw: *const ()) {
+ let _ = Inner::from_raw(raw);
+ }
-unsafe fn wake_by_ref(raw: *const ()) {
- let unparker = Inner::from_raw(raw);
- unparker.unpark();
+ unsafe fn wake(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+ }
- // We don't actually own a reference to the unparker
- mem::forget(unparker);
+ unsafe fn wake_by_ref(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(unparker);
+ }
}
diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs
index c1298a5a..f9d40350 100644
--- a/tokio/src/executor/thread_pool/builder.rs
+++ b/tokio/src/executor/thread_pool/builder.rs
@@ -2,13 +2,12 @@ use crate::executor::loom::sync::Arc;
use crate::executor::loom::sys::num_cpus;
use crate::executor::loom::thread;
use crate::executor::park::Park;
-use crate::executor::thread_pool::park::DefaultPark;
use crate::executor::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool};
use std::{fmt, usize};
/// Builds a thread pool with custom configuration values.
-pub struct Builder {
+pub(crate) struct Builder {
/// Number of worker threads to spawn
pool_size: usize,
@@ -29,7 +28,7 @@ type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
impl Builder {
/// Returns a new thread pool builder initialized with default configuration
/// values.
- pub fn new() -> Builder {
+ pub(crate) fn new() -> Builder {
Builder {
pool_size: num_cpus(),
name: "tokio-runtime-worker".to_string(),
@@ -44,17 +43,7 @@ impl Builder {
/// this value on the smaller side.
///
/// The default value is the number of cores available to the system.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::executor::thread_pool::Builder;
- ///
- /// let thread_pool = Builder::new()
- /// .num_threads(4)
- /// .build();
- /// ```
- pub fn num_threads(&mut self, value: usize) -> &mut Self {
+ pub(crate) fn num_threads(&mut self, value: usize) -> &mut Self {
self.pool_size = value;
self
}
@@ -63,17 +52,7 @@ impl Builder {
///
/// If this configuration is not set, then the thread will use the system
/// default naming scheme.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::executor::thread_pool::Builder;
- ///
- /// let thread_pool = Builder::new()
- /// .name("my-pool")
- /// .build();
- /// ```
- pub fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
self.name = val.into();
self
}
@@ -85,17 +64,7 @@ impl Builder {
///
/// The default stack size for spawned threads is 2 MiB, though this
/// particular stack size is subject to change in the future.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::executor::thread_pool::Builder;
- ///
- /// let thread_pool = Builder::new()
- /// .stack_size(32 * 1024)
- /// .build();
- /// ```
- pub fn stack_size(&mut self, val: usize) -> &mut Self {
+ pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
self.stack_size = Some(val);
self
}
@@ -105,21 +74,7 @@ impl Builder {
/// This function is provided a function that executes the worker and is
/// expected to call it, otherwise the worker thread will shutdown without
/// doing any work.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::executor::thread_pool::Builder;
- ///
- /// let thread_pool = Builder::new()
- /// .around_worker(|index, work| {
- /// println!("worker {} is starting up", index);
- /// work();
- /// println!("worker {} is shutting down", index);
- /// })
- /// .build();
- /// ```
- pub fn around_worker<F>(&mut self, f: F) -> &mut Self
+ pub(crate) fn around_worker<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static,
{
@@ -127,27 +82,11 @@ impl Builder {
self
}
- /// Create the configured `ThreadPool`.
- ///
- /// The returned `ThreadPool` instance is ready to spawn tasks.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::executor::thread_pool::Builder;
- ///
- /// let thread_pool = Builder::new()
- /// .build();
- /// ```
- pub fn build(&self) -> ThreadPool {
- self.build_with_park(|_| DefaultPark::new())
- }
-
/// Create the configured `ThreadPool` with a custom `park` instances.
///
/// The provided closure `build_park` is called once per worker and returns
/// a `Park` instance that is used by the worker to put itself to sleep.
- pub fn build_with_park<F, P>(&self, mut build_park: F) -> ThreadPool
+ pub(crate) fn build<F, P>(&self, mut build_park: F) -> ThreadPool
where
F: FnMut(usize) -> P,
P: Park + Send + 'static,
diff --git a/tokio/src/executor/thread_pool/mod.rs b/tokio/src/executor/thread_pool/mod.rs
index c18d8766..0c7ce42a 100644
--- a/tokio/src/executor/thread_pool/mod.rs
+++ b/tokio/src/executor/thread_pool/mod.rs
@@ -1,7 +1,7 @@
//! Threadpool
mod builder;
-pub use self::builder::Builder;
+pub(crate) use self::builder::Builder;
mod current;
@@ -11,15 +11,13 @@ use self::idle::Idle;
mod owned;
use self::owned::Owned;
-mod park;
-
mod pool;
-pub use self::pool::ThreadPool;
+pub(crate) use self::pool::ThreadPool;
mod queue;
mod spawner;
-pub use self::spawner::Spawner;
+pub(crate) use self::spawner::Spawner;
mod set;
@@ -29,14 +27,13 @@ use self::shared::Shared;
mod shutdown;
mod worker;
+#[cfg(feature = "blocking")]
+pub(crate) use worker::blocking;
/// Unit tests
#[cfg(test)]
mod tests;
-#[cfg(feature = "blocking")]
-pub use worker::blocking;
-
#[cfg(not(loom))]
const LOCAL_QUEUE_CAPACITY: usize = 256;
diff --git a/tokio/src/executor/thread_pool/park.rs b/tokio/src/executor/thread_pool/park.rs
deleted file mode 100644
index 225d21db..00000000
--- a/tokio/src/executor/thread_pool/park.rs
+++ /dev/null
@@ -1,182 +0,0 @@
-use crate::executor::loom::sync::atomic::AtomicUsize;
-use crate::executor::loom::sync::{Arc, Condvar, Mutex};
-use crate::executor::park::{Park, Unpark};
-
-use std::error::Error;
-use std::fmt;
-use std::sync::atomic::Ordering::SeqCst;
-use std::time::Duration;
-
-/// Parks the thread.
-#[derive(Debug)]
-pub(crate) struct DefaultPark {
- inner: Arc<Inner>,
-}
-
-/// Unparks threads that were parked by `DefaultPark`.
-#[derive(Debug)]
-pub(crate) struct DefaultUnpark {
- inner: Arc<Inner>,
-}
-
-/// Error returned by [`ParkThread`]
-///
-/// This currently is never returned, but might at some point in the future.
-///
-/// [`ParkThread`]: struct.ParkThread.html
-#[derive(Debug)]
-pub(crate) struct ParkError {
- _p: (),
-}
-
-const EMPTY: usize = 0;
-const PARKED: usize = 1;
-const NOTIFIED: usize = 2;
-
-#[derive(Debug)]
-struct Inner {
- state: AtomicUsize,
- lock: Mutex<()>,
- cvar: Condvar,
-}
-
-impl DefaultPark {
- /// Creates a new `DefaultPark` instance.
- pub(crate) fn new() -> DefaultPark {
- DefaultPark {
- inner: Arc::new(Inner {
- state: AtomicUsize::new(EMPTY),
- lock: Mutex::new(()),
- cvar: Condvar::new(),
- }),
- }
- }
-}
-
-impl Park for DefaultPark {
- type Unpark = DefaultUnpark;
- type Error = ParkError;
-
- fn unpark(&self) -> Self::Unpark {
- let inner = self.inner.clone();
- DefaultUnpark { inner }
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.inner.park(None);
- Ok(())
- }
-
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.inner.park(Some(duration));
- Ok(())
- }
-}
-
-impl Unpark for DefaultUnpark {
- fn unpark(&self) {
- self.inner.unpark();
- }
-}
-
-impl fmt::Display for ParkError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "unknown park error")
- }
-}
-
-impl Error for ParkError {}
-
-impl Inner {
- fn park(&self, timeout: Option<Duration>) {
- // If we were previously notified then we consume this notification and return quickly.
- if self
- .state
- .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
- .is_ok()
- {
- return;
- }
-
- // If the timeout is zero, then there is no need to actually block.
- if let Some(ref dur) = timeout {
- if *dur == Duration::from_millis(0) {
- return;
- }
- }
-
- // Otherwise we need to coordinate going to sleep.
- let mut _m = self.lock.lock().unwrap();
-
- match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
- Ok(_) => {}
- // Consume this notification to avoid spurious wakeups in the next park.
- Err(NOTIFIED) => {
- // We must read `state` here, even though we know it will be `NOTIFIED`. This is
- // because `unpark` may have been called again since we read `NOTIFIED` in the
- // `compare_exchange` above. We must perform an acquire operation that synchronizes
- // with that `unpark` to observe any writes it made before the call to `unpark`. To
- // do that we must read from the write it made to `state`.
- let old = self.state.swap(EMPTY, SeqCst);
- assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
- return;
- }
- Err(n) => panic!("inconsistent park_timeout state: {}", n),
- }
-
- match timeout {
- None => {
- loop {
- // Block the current thread on the conditional variable.
- _m = self.cvar.wait(_m).unwrap();
-
- if self
- .state
- .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
- .is_ok()
- {
- return; // got a notification
- }
-
- // spurious wakeup, go back to sleep
- }
- }
- Some(timeout) => {
- // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
- // notification we just want to unconditionally set `state` back to `EMPTY`, either
- // consuming a notification or un-flagging ourselves as parked.
- _m = self.cvar.wait_timeout(_m, timeout).unwrap().0;
-
- match self.state.swap(EMPTY, SeqCst) {
- NOTIFIED => {} // got a notification
- PARKED => {} // no notification
- n => panic!("inconsistent park_timeout state: {}", n),
- }
- }
- }
- }
-
- fn unpark(&self) {
- // To ensure the unparked thread will observe any writes we made before this call, we must
- // perform a release operation that `park` can synchronize with. To do that we must write
- // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
- // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
- match self.state.swap(NOTIFIED, SeqCst) {
- EMPTY => return, // no one was waiting
- NOTIFIED => return, // already unparked
- PARKED => {} // gotta go wake someone up
- n => panic!("inconsistent state in unpark: {}", n),
- }
-
- // There is a period between when the parked thread sets `state` to `PARKED` (or last
- // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
- // If we were to notify during this period it would be ignored and then when the parked
- // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
- // stage so we can acquire `lock` to wait until it is ready to receive the notification.
- //
- // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
- // it doesn't get woken only to have to wait for us to release `lock`.
- drop(self.lock.lock());
- self.cvar.notify_one();
- }
-}
diff --git a/tokio/src/executor/thread_pool/pool.rs b/tokio/src/executor/thread_pool/pool.rs
index 67235eae..d3219d87 100644
--- a/tokio/src/executor/thread_pool/pool.rs
+++ b/tokio/src/executor/thread_pool/pool.rs
@@ -1,12 +1,12 @@
use crate::executor::blocking::PoolWaiter;
use crate::executor::task::JoinHandle;
-use crate::executor::thread_pool::{shutdown, Builder, Spawner};
+use crate::executor::thread_pool::{shutdown, Spawner};
use std::fmt;
use std::future::Future;
/// Work-stealing based thread pool for executing futures.
-pub struct ThreadPool {
+pub(crate) struct ThreadPool {
spawner: Spawner,
/// Shutdown waiter
@@ -17,11 +17,6 @@ pub struct ThreadPool {
}
impl ThreadPool {
- /// Create a new ThreadPool with default configuration
- pub fn new() -> ThreadPool {
- Builder::new().build()
- }
-
pub(super) fn from_parts(
spawner: Spawner,
shutdown_rx: shutdown::Receiver,
@@ -38,12 +33,12 @@ impl ThreadPool {
///
/// The `Spawner` handle can be cloned and enables spawning tasks from other
/// threads.
- pub fn spawner(&self) -> &Spawner {
+ pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}
/// Spawn a task
- pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
@@ -55,7 +50,7 @@ impl ThreadPool {
///
/// The future will execute on the current thread, but all spawned tasks
/// will be executed on the thread pool.
- pub fn block_on<F>(&self, future: F) -> F::Output
+ pub(crate) fn block_on<F>(&self, future: F) -> F::Output
where
F: Future,
{
@@ -69,7 +64,7 @@ impl ThreadPool {
}
/// Shutdown the thread pool.
- pub fn shutdown_now(&mut self) {
+ pub(crate) fn shutdown_now(&mut self) {
if self.spawner.workers().close() {
self.shutdown_rx.wait();
}
@@ -77,12 +72,6 @@ impl ThreadPool {
}
}
-impl Default for ThreadPool {
- fn default() -> ThreadPool {
- ThreadPool::new()
- }
-}
-
impl fmt::Debug for ThreadPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ThreadPool").finish()
diff --git a/tokio/src/executor/thread_pool/spawner.rs b/tokio/src/executor/thread_pool/spawner.rs
index b33c7cad..4fdf9594 100644
--- a/tokio/src/executor/thread_pool/spawner.rs
+++ b/tokio/src/executor/thread_pool/spawner.rs
@@ -19,7 +19,7 @@ use std::future::Future;
///
/// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner
#[derive(Clone)]
-pub struct Spawner {
+pub(crate) struct Spawner {
workers: Arc<worker::Set<Box<dyn Unpark>>>,
}
@@ -29,7 +29,7 @@ impl Spawner {
}
/// Spawn a future onto the thread pool
- pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
diff --git a/tokio/src/executor/thread_pool/tests/loom_pool.rs b/tokio/src/executor/thread_pool/tests/loom_pool.rs
index 26c794ec..6b2b5c30 100644
--- a/tokio/src/executor/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/executor/thread_pool/tests/loom_pool.rs
@@ -1,16 +1,20 @@
use crate::executor::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::executor::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::executor::loom::sync::{Arc, Mutex};
+use crate::executor::park::{Park, Unpark};
use crate::executor::tests::loom_oneshot as oneshot;
-use crate::executor::thread_pool::{self, Builder, ThreadPool};
+use crate::executor::thread_pool::{self, Builder};
use crate::spawn;
+use loom::sync::Notify;
+
use std::future::Future;
+use std::time::Duration;
#[test]
fn pool_multi_spawn() {
loom::model(|| {
- let pool = ThreadPool::new();
+ let pool = Builder::new().build(|_| LoomPark::new());
let c1 = Arc::new(AtomicUsize::new(0));
@@ -44,7 +48,7 @@ fn pool_multi_spawn() {
#[test]
fn only_blocking() {
loom::model(|| {
- let mut pool = Builder::new().num_threads(1).build();
+ let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new());
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(async move {
@@ -62,7 +66,7 @@ fn only_blocking() {
fn blocking_and_regular() {
const NUM: usize = 3;
loom::model(|| {
- let mut pool = Builder::new().num_threads(1).build();
+ let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new());
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
@@ -96,7 +100,7 @@ fn blocking_and_regular() {
#[test]
fn pool_multi_notify() {
loom::model(|| {
- let pool = ThreadPool::new();
+ let pool = Builder::