summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor/thread_pool
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/executor/thread_pool')
-rw-r--r--tokio/src/executor/thread_pool/builder.rs259
-rw-r--r--tokio/src/executor/thread_pool/current.rs85
-rw-r--r--tokio/src/executor/thread_pool/idle.rs229
-rw-r--r--tokio/src/executor/thread_pool/join.rs42
-rw-r--r--tokio/src/executor/thread_pool/mod.rs58
-rw-r--r--tokio/src/executor/thread_pool/owned.rs77
-rw-r--r--tokio/src/executor/thread_pool/park.rs182
-rw-r--r--tokio/src/executor/thread_pool/pool.rs111
-rw-r--r--tokio/src/executor/thread_pool/queue/global.rs195
-rw-r--r--tokio/src/executor/thread_pool/queue/inject.rs36
-rw-r--r--tokio/src/executor/thread_pool/queue/local.rs298
-rw-r--r--tokio/src/executor/thread_pool/queue/mod.rs41
-rw-r--r--tokio/src/executor/thread_pool/queue/worker.rs127
-rw-r--r--tokio/src/executor/thread_pool/set.rs209
-rw-r--r--tokio/src/executor/thread_pool/shared.rs104
-rw-r--r--tokio/src/executor/thread_pool/shutdown.rs48
-rw-r--r--tokio/src/executor/thread_pool/spawner.rs61
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_pool.rs138
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_queue.rs68
-rw-r--r--tokio/src/executor/thread_pool/tests/mod.rs11
-rw-r--r--tokio/src/executor/thread_pool/tests/queue.rs281
-rw-r--r--tokio/src/executor/thread_pool/tests/worker.rs68
-rw-r--r--tokio/src/executor/thread_pool/worker.rs415
23 files changed, 3143 insertions, 0 deletions
diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs
new file mode 100644
index 00000000..7955e286
--- /dev/null
+++ b/tokio/src/executor/thread_pool/builder.rs
@@ -0,0 +1,259 @@
+use crate::executor::loom::sync::Arc;
+use crate::executor::loom::sys::num_cpus;
+use crate::executor::loom::thread;
+use crate::executor::park::Park;
+use crate::executor::thread_pool::park::DefaultPark;
+use crate::executor::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool};
+
+use std::{fmt, usize};
+
+/// Builds a thread pool with custom configuration values.
+pub struct Builder {
+ /// Number of worker threads to spawn
+ pool_size: usize,
+
+ /// Thread name
+ name: String,
+
+ /// Thread stack size
+ stack_size: Option<usize>,
+
+ /// Around worker callback
+ around_worker: Option<Callback>,
+}
+
+// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
+// loom doesn't support that because it requires CoerceUnsized, which is unstable
+type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
+
+impl Builder {
+ /// Returns a new thread pool builder initialized with default configuration
+ /// values.
+ pub fn new() -> Builder {
+ Builder {
+ pool_size: num_cpus(),
+ name: "tokio-runtime-worker".to_string(),
+ stack_size: None,
+ around_worker: None,
+ }
+ }
+
+ /// Set the number of threads running async tasks.
+ ///
+ /// This must be a number between 1 and 2,048 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::executor::thread_pool::Builder;
+ ///
+ /// let thread_pool = Builder::new()
+ /// .num_threads(4)
+ /// .build();
+ /// ```
+ pub fn num_threads(&mut self, value: usize) -> &mut Self {
+ self.pool_size = value;
+ self
+ }
+
+ /// Set name of threads spawned by the scheduler
+ ///
+ /// If this configuration is not set, then the thread will use the system
+ /// default naming scheme.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::executor::thread_pool::Builder;
+ ///
+ /// let thread_pool = Builder::new()
+ /// .name("my-pool")
+ /// .build();
+ /// ```
+ pub fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
+ self.name = val.into();
+ self
+ }
+
+ /// Set the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::executor::thread_pool::Builder;
+ ///
+ /// let thread_pool = Builder::new()
+ /// .stack_size(32 * 1024)
+ /// .build();
+ /// ```
+ pub fn stack_size(&mut self, val: usize) -> &mut Self {
+ self.stack_size = Some(val);
+ self
+ }
+
+ /// Execute function `f` on each worker thread.
+ ///
+ /// This function is provided a function that executes the worker and is
+ /// expected to call it, otherwise the worker thread will shutdown without
+ /// doing any work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::executor::thread_pool::Builder;
+ ///
+ /// let thread_pool = Builder::new()
+ /// .around_worker(|index, work| {
+ /// println!("worker {} is starting up", index);
+ /// work();
+ /// println!("worker {} is shutting down", index);
+ /// })
+ /// .build();
+ /// ```
+ pub fn around_worker<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static,
+ {
+ self.around_worker = Some(Arc::new(Box::new(f)));
+ self
+ }
+
+ /// Create the configured `ThreadPool`.
+ ///
+ /// The returned `ThreadPool` instance is ready to spawn tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::executor::thread_pool::Builder;
+ ///
+ /// let thread_pool = Builder::new()
+ /// .build();
+ /// ```
+ pub fn build(&self) -> ThreadPool {
+ self.build_with_park(|_| DefaultPark::new())
+ }
+
+ /// Create the configured `ThreadPool` with a custom `park` instances.
+ ///
+ /// The provided closure `build_park` is called once per worker and returns
+ /// a `Park` instance that is used by the worker to put itself to sleep.
+ pub fn build_with_park<F, P>(&self, mut build_park: F) -> ThreadPool
+ where
+ F: FnMut(usize) -> P,
+ P: Park + Send + 'static,
+ {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+
+ let around_worker = self.around_worker.as_ref().map(Arc::clone);
+ let launch_worker = move |worker: Worker<BoxedPark<P>>| {
+ let shutdown_tx = shutdown_tx.clone();
+ let around_worker = around_worker.as_ref().map(Arc::clone);
+ Box::new(move || {
+ struct AbortOnPanic;
+
+ impl Drop for AbortOnPanic {
+ fn drop(&mut self) {
+ if thread::panicking() {
+ eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported.");
+ std::process::abort();
+ }
+ }
+ }
+
+ let _abort_on_panic = AbortOnPanic;
+ if let Some(cb) = around_worker.as_ref() {
+ let idx = worker.id();
+ let mut f = Some(move || worker.run());
+ cb(idx, &mut || {
+ (f.take()
+ .expect("around_thread callback called closure twice"))(
+ )
+ })
+ } else {
+ worker.run()
+ }
+
+ // Dropping the handle must happen __after__ the callback
+ drop(shutdown_tx);
+ }) as Box<dyn FnOnce() + Send + 'static>
+ };
+
+ let mut blocking = crate::executor::blocking::Builder::default();
+ blocking.name(self.name.clone());
+ if let Some(ss) = self.stack_size {
+ blocking.stack_size(ss);
+ }
+ let blocking = Arc::new(blocking.build());
+
+ let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
+ self.pool_size,
+ |i| BoxedPark::new(build_park(i)),
+ blocking.clone(),
+ );
+
+ // Spawn threads for each worker
+ for worker in workers {
+ crate::executor::blocking::Pool::spawn(&blocking, launch_worker(worker))
+ }
+
+ let spawner = Spawner::new(pool);
+ let blocking = crate::executor::blocking::PoolWaiter::from(blocking);
+ ThreadPool::from_parts(spawner, shutdown_rx, blocking)
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Builder {
+ Builder::new()
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Builder")
+ .field("pool_size", &self.pool_size)
+ .field("name", &self.name)
+ .field("stack_size", &self.stack_size)
+ .finish()
+ }
+}
+
+pub(crate) struct BoxedPark<P> {
+ inner: P,
+}
+
+impl<P> BoxedPark<P> {
+ pub(crate) fn new(inner: P) -> Self {
+ BoxedPark { inner }
+ }
+}
+
+impl<P> Park for BoxedPark<P>
+where
+ P: Park,
+{
+ type Unpark = Box<dyn crate::executor::park::Unpark>;
+ type Error = P::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ Box::new(self.inner.unpark())
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+}
diff --git a/tokio/src/executor/thread_pool/current.rs b/tokio/src/executor/thread_pool/current.rs
new file mode 100644
index 00000000..6910dca1
--- /dev/null
+++ b/tokio/src/executor/thread_pool/current.rs
@@ -0,0 +1,85 @@
+use crate::executor::loom::sync::Arc;
+use crate::executor::park::Unpark;
+use crate::executor::thread_pool::{worker, Owned};
+
+use std::cell::Cell;
+use std::ptr;
+
+/// Tracks the current worker
+#[derive(Debug)]
+pub(super) struct Current {
+ inner: Inner,
+}
+
+#[derive(Debug, Copy, Clone)]
+struct Inner {
+ // thread-local variables cannot track generics. However, the current worker
+ // is only checked when `P` is already known, so the type can be figured out
+ // on demand.
+ workers: *const (),
+ idx: usize,
+}
+
+// Pointer to the current worker info
+thread_local!(static CURRENT_WORKER: Cell<Inner> = Cell::new(Inner::new()));
+
+pub(super) fn set<F, R, P>(pool: &Arc<worker::Set<P>>, index: usize, f: F) -> R
+where
+ F: FnOnce() -> R,
+ P: Unpark,
+{
+ CURRENT_WORKER.with(|cell| {
+ assert!(cell.get().workers.is_null());
+
+ struct Guard<'a>(&'a Cell<Inner>);
+
+ impl Drop for Guard<'_> {
+ fn drop(&mut self) {
+ self.0.set(Inner::new());
+ }
+ }
+
+ cell.set(Inner {
+ workers: pool.shared() as *const _ as *const (),
+ idx: index,
+ });
+
+ let _g = Guard(cell);
+
+ f()
+ })
+}
+
+pub(super) fn get<F, R>(f: F) -> R
+where
+ F: FnOnce(&Current) -> R,
+{
+ CURRENT_WORKER.with(|cell| {
+ let current = Current { inner: cell.get() };
+ f(&current)
+ })
+}
+
+impl Current {
+ pub(super) fn as_member<'a, P>(&self, set: &'a worker::Set<P>) -> Option<&'a Owned<P>>
+ where
+ P: Unpark,
+ {
+ let inner = CURRENT_WORKER.with(|cell| cell.get());
+
+ if ptr::eq(inner.workers as *const _, set.shared().as_ptr()) {
+ Some(unsafe { &*set.owned()[inner.idx].get() })
+ } else {
+ None
+ }
+ }
+}
+
+impl Inner {
+ fn new() -> Inner {
+ Inner {
+ workers: ptr::null(),
+ idx: 0,
+ }
+ }
+}
diff --git a/tokio/src/executor/thread_pool/idle.rs b/tokio/src/executor/thread_pool/idle.rs
new file mode 100644
index 00000000..0fae29bf
--- /dev/null
+++ b/tokio/src/executor/thread_pool/idle.rs
@@ -0,0 +1,229 @@
+//! Coordinates idling workers
+
+use crate::executor::loom::sync::atomic::AtomicUsize;
+use crate::executor::loom::sync::Mutex;
+
+use std::fmt;
+use std::sync::atomic::Ordering::{self, AcqRel, Relaxed, SeqCst};
+
+pub(super) struct Idle {
+ /// Tracks both the number of searching workers and the number of unparked
+ /// workers.
+ ///
+ /// Used as a fast-path to avoid acquiring the lock when needed.
+ state: AtomicUsize,
+
+ /// Sleeping workers
+ sleepers: Mutex<Vec<usize>>,
+
+ /// Total number of workers.
+ num_workers: usize,
+}
+
+const UNPARK_SHIFT: usize = 16;
+const UNPARK_MASK: usize = !SEARCH_MASK;
+const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
+
+#[derive(Copy, Clone)]
+struct State(usize);
+
+impl Idle {
+ pub(super) fn new(num_workers: usize) -> Idle {
+ let init = State::new(num_workers);
+
+ Idle {
+ state: AtomicUsize::new(init.into()),
+ sleepers: Mutex::new(Vec::with_capacity(num_workers)),
+ num_workers,
+ }
+ }
+
+ /// If there are no workers actively searching, returns the index of a
+ /// worker currently sleeping.
+ pub(super) fn worker_to_notify(&self) -> Option<usize> {
+ // If at least one worker is spinning, work being notified will
+ // eventully be found. A searching thread will find **some** work and
+ // notify another worker, eventually leading to our work being found.
+ //
+ // For this to happen, this load must happen before the thread
+ // transitioning `num_searching` to zero. Acquire / Relese does not
+ // provide sufficient guarantees, so this load is done with `SeqCst` and
+ // will pair with the `fetch_sub(1)` when transitioning out of
+ // searching.
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // Acquire the lock
+ let mut sleepers = self.sleepers.lock().unwrap();
+
+ // Check again, now that the lock is acquired
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // A worker should be woken up, atomically increment the number of
+ // searching workers as well as the number of unparked workers.
+ State::unpark_one(&self.state);
+
+ // Get the worker to unpark
+ let ret = sleepers.pop();
+ debug_assert!(ret.is_some());
+
+ ret
+ }
+
+ /// Returns `true` if the worker needs to do a final check for submitted
+ /// work.
+ pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
+ // Acquire the lock
+ let mut sleepers = self.sleepers.lock().unwrap();
+
+ // Decrement the number of unparked threads
+ let ret = State::dec_num_unparked(&self.state, is_searching);
+
+ // Track the sleeping worker
+ sleepers.push(worker);
+
+ ret
+ }
+
+ pub(super) fn transition_worker_to_searching(&self) -> bool {
+ // Using `Relaxed` ordering is acceptable here as it is just an
+ // optimization. This load has does not need to synchronize with
+ // anything, and the algorithm is correct no matter what the load
+ // returns (as in, it could return absolutely any `usize` value and the
+ // pool would be correct.
+ let state = State::load(&self.state, Relaxed);
+ if 2 * state.num_searching() >= self.num_workers {
+ return false;
+ }
+
+ // It is possible for this routine to allow more than 50% of the workers
+ // to search. That is OK. Limiting searchers is only an optimization to
+ // prevent too much contention.
+ //
+ // At this point, we do not need a hard synchronization with `notify_work`, so `AcqRel` is sufficient.
+ State::inc_num_searching(&self.state, AcqRel);
+ true
+ }
+
+ /// A lightweight transition from searching -> running.
+ ///
+ /// Returns `true` if this is the final searching worker. The caller
+ /// **must** notify a new worker.
+ pub(super) fn transition_worker_from_searching(&self) -> bool {
+ State::dec_num_searching(&self.state)
+ }
+
+ /// Unpark a specific worker. This happens if tasks are submitted from
+ /// within the worker's park routine.
+ pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
+ let mut sleepers = self.sleepers.lock().unwrap();
+
+ for index in 0..sleepers.len() {
+ if sleepers[index] == worker_id {
+ sleepers.swap_remove(index);
+
+ // Update the state accordingly whle the lock is held.
+ State::unpark_one(&self.state);
+
+ return;
+ }
+ }
+ }
+
+ /// Returns `true` if `worker_id` is contained in the sleep set
+ pub(super) fn is_parked(&self, worker_id: usize) -> bool {
+ let sleepers = self.sleepers.lock().unwrap();
+ sleepers.contains(&worker_id)
+ }
+
+ fn notify_should_wakeup(&self) -> bool {
+ let state = State::load(&self.state, SeqCst);
+ state.num_searching() == 0 && state.num_unparked() < self.num_workers
+ }
+}
+
+impl State {
+ fn new(num_workers: usize) -> State {
+ // All workers start in the unparked state
+ let ret = State(num_workers << UNPARK_SHIFT);
+ debug_assert_eq!(num_workers, ret.num_unparked());
+ debug_assert_eq!(0, ret.num_searching());
+ ret
+ }
+
+ fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
+ State(cell.load(ordering))
+ }
+
+ fn unpark_one(cell: &AtomicUsize) {
+ cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
+ }
+
+ fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
+ cell.fetch_add(1, ordering);
+ }
+
+ /// Returns `true` if this is the final searching worker
+ fn dec_num_searching(cell: &AtomicUsize) -> bool {
+ let state = State(cell.fetch_sub(1, SeqCst));
+ state.num_searching() == 1
+ }
+
+ /// Track a sleeping worker
+ ///
+ /// Returns `true` if this is the final searching worker.
+ fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
+ let mut dec = 1 << UNPARK_SHIFT;
+
+ if is_searching {
+ dec += 1;
+ }
+
+ let prev = State(cell.fetch_sub(dec, SeqCst));
+ is_searching && prev.num_searching() == 1
+ }
+
+ /// Number of workers currently searching
+ fn num_searching(self) -> usize {
+ self.0 & SEARCH_MASK
+ }
+
+ /// Number of workers currently unparked
+ fn num_unparked(self) -> usize {
+ (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("worker::State")
+ .field("num_unparked", &self.num_unparked())
+ .field("num_searching", &self.num_searching())
+ .finish()
+ }
+}
+
+#[test]
+fn test_state() {
+ assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
+ assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
+
+ let state = State::new(10);
+ assert_eq!(10, state.num_unparked());
+ assert_eq!(0, state.num_searching());
+}
diff --git a/tokio/src/executor/thread_pool/join.rs b/tokio/src/executor/thread_pool/join.rs
new file mode 100644
index 00000000..e066882a
--- /dev/null
+++ b/tokio/src/executor/thread_pool/join.rs
@@ -0,0 +1,42 @@
+use crate::executor::park::Unpark;
+use crate::executor::task;
+use crate::executor::thread_pool::Shared;
+
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// An owned permission to join on a task (await its termination).
+pub struct JoinHandle<T> {
+ task: task::JoinHandle<T, Shared<Box<dyn Unpark>>>,
+}
+
+impl<T> JoinHandle<T>
+where
+ T: Send + 'static,
+{
+ pub(super) fn new(task: task::JoinHandle<T, Shared<Box<dyn Unpark>>>) -> JoinHandle<T> {
+ JoinHandle { task }
+ }
+}
+
+impl<T> Future for JoinHandle<T>
+where
+ T: Send + 'static,
+{
+ type Output = task::Result<T>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.task).poll(cx)
+ }
+}
+
+impl<T> fmt::Debug for JoinHandle<T>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("JoinHandle").finish()
+ }
+}
diff --git a/tokio/src/executor/thread_pool/mod.rs b/tokio/src/executor/thread_pool/mod.rs
new file mode 100644
index 00000000..bab594ef
--- /dev/null
+++ b/tokio/src/executor/thread_pool/mod.rs
@@ -0,0 +1,58 @@
+//! Threadpool
+
+mod builder;
+pub use self::builder::Builder;
+
+mod current;
+
+mod idle;
+use self::idle::Idle;
+
+mod join;
+pub use self::join::JoinHandle;
+
+mod owned;
+use self::owned::Owned;
+
+mod park;
+
+mod pool;
+pub use self::pool::ThreadPool;
+
+mod queue;
+
+mod spawner;
+pub use self::spawner::Spawner;
+
+mod set;
+
+mod shared;
+use self::shared::Shared;
+
+mod shutdown;
+
+mod worker;
+
+/// Unit tests
+#[cfg(test)]
+mod tests;
+
+// Re-export `task::Error`
+pub use crate::executor::task::Error;
+
+// These exports are used in tests
+#[cfg(test)]
+#[allow(warnings)]
+pub(crate) use self::worker::create_set as create_pool;
+
+pub(crate) type BoxFuture =
+ std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>;
+
+#[cfg(not(loom))]
+const LOCAL_QUEUE_CAPACITY: usize = 256;
+
+// Shrink the size of the local queue when using loom. This shouldn't impact
+// logic, but allows loom to test more edge cases in a reasonable a mount of
+// time.
+#[cfg(loom)]
+const LOCAL_QUEUE_CAPACITY: usize = 2;
diff --git a/tokio/src/executor/thread_pool/owned.rs b/tokio/src/executor/thread_pool/owned.rs
new file mode 100644
index 00000000..f0aece57
--- /dev/null
+++ b/tokio/src/executor/thread_pool/owned.rs
@@ -0,0 +1,77 @@
+use crate::executor::task::{self, Task};
+use crate::executor::thread_pool::{queue, Shared};
+use crate::executor::util::FastRand;
+
+use std::cell::Cell;
+
+/// Per-worker data accessible only by the thread driving the worker.
+#[derive(Debug)]
+pub(super) struct Owned<P: 'static> {
+ /// Worker tick number. Used to schedule bookkeeping tasks every so often.
+ pub(super) tick: Cell<u16>,
+
+ /// Caches the pool run state.
+ pub(super) is_running: Cell<bool>,
+
+ /// `true` if the worker is currently searching for more work.
+ pub(super) is_searching: Cell<bool>,
+
+ /// `true` when worker notification should be delayed.
+ ///
+ /// This is used to batch notifications triggered by the parker.
+ pub(super) defer_notification: Cell<bool>,
+
+ /// `true` if a task was submitted while `defer_notification` was set
+ pub(super) did_submit_task: Cell<bool>,
+
+ /// Fast random number generator
+ pub(super) rand: FastRand,
+
+ /// Work queue
+ pub(super) work_queue: queue::Worker<Shared<P>>,
+
+ /// List of tasks owned by the worker
+ pub(super) owned_tasks: task::OwnedList<Shared<P>>,
+}
+
+impl<P> Owned<P>
+where
+ P: 'static,
+{
+ pub(super) fn new(work_queue: queue::Worker<Shared<P>>, rand: FastRand) -> Owned<P> {
+ Owned {
+ tick: Cell::new(1),
+ is_running: Cell::new(true),
+ is_searching: Cell::new(false),
+ defer_notification: Cell::new(false),
+ did_submit_task: Cell::new(false),
+ rand,
+ work_queue,
+ owned_tasks: task::OwnedList::new(),
+ }
+ }
+
+ /// Returns `true` if a worker should be notified
+ pub(super) fn submit_local(&self, task: Task<Shared<P>>) -> bool {
+ let ret = self.work_queue.push(task);
+
+ if self.defer_notification.get() {
+ self.did_submit_task.set(true);
+ false
+ } else {
+ ret
+ }
+ }
+
+ pub(super) fn submit_local_yield(&self, task: Task<Shared<P>>) {
+ self.work_queue.push_yield(task);
+ }
+
+ pub(super) fn bind_task(&mut self, task: &Task<Shared<P>>) {
+ self.owned_tasks.insert(task);
+ }
+
+ pub(super) fn release_task(&mut self, task: &Task<Shared<P>>) {
+ self.owned_tasks.remove(task);
+ }
+}
diff --git a/tokio/src/executor/thread_pool/park.rs b/tokio/src/executor/thread_pool/park.rs
new file mode 100644
index 00000000..225d21db
--- /dev/null
+++ b/tokio/src/executor/thread_pool/park.rs
@@ -0,0 +1,182 @@
+use crate::executor::loom::sync::atomic::AtomicUsize;
+use crate::executor::loom::sync::{Arc, Condvar, Mutex};
+use crate::executor::park::{Park, Unpark};
+
+use std::error::Error;
+use std::fmt;
+use std::sync::atomic::Ordering::SeqCst;
+use std::time::Duration;
+
+/// Parks the thread.
+#[derive(Debug)]
+pub(crate) struct DefaultPark {
+ inner: Arc<Inner>,
+}
+
+/// Unparks threads that were parked by `DefaultPark`.
+#[derive(Debug)]
+pub(crate) struct DefaultUnpark {
+ inner: Arc<Inner>,
+}
+
+/// Error returned by [`ParkThread`]
+///
+/// This currently is never returned, but might at some point in the future.
+///
+/// [`ParkThread`]: struct.ParkThread.html
+#[derive(Debug)]
+pub(crate) struct ParkError {
+ _p: (),
+}
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+#[derive(Debug)]
+struct Inner {
+ state: AtomicUsize,
+ lock: Mutex<()>,
+ cvar: Condvar,
+}
+
+impl DefaultPark {
+ /// Creates a new `DefaultPark` instance.
+ pub(crate) fn new() -> DefaultPark {
+ DefaultPark {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ lock: Mutex::new(()),
+ cvar: Condvar::new(),
+ }),
+ }
+ }
+}
+
+impl Park for DefaultPark {
+ type Unpark = DefaultUnpark;
+ type Error = ParkError;
+
+ fn unpark(&self) -> Self::Unpark {
+ let inner = self.inner.clone();
+ DefaultUnpark { inner }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park(None);
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park(Some(duration));
+ Ok(())
+ }
+}
+
+impl Unpark for DefaultUnpark {
+ fn unpark(&self) {
+ self.inner.unpark();
+ }
+}
+
+impl fmt::Display for ParkError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "unknown park error")
+ }
+}
+
+impl Error for ParkError {}
+
+impl Inner {
+ fn park(&self, timeout: Option<Duration>) {
+ // If we were previously notified then we consume this notification and return quickly.
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return;
+ }
+
+ // If the timeout is zero, then there is no need to actually block.
+ if let Some(ref dur) = timeout {
+ if *dur == Duration::from_millis(0) {
+ return;
+ }
+ }
+
+ // Otherwise we need to coordinate going to sleep.
+ let mut _m = self.lock.lock().unwrap();
+
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ // Consume this notification to avoid spurious wakeups in the next park.
+ Err(NOTIFIED) => {
+ // We must read `state` here, even though we know it will be `NOTIFIED`. This is
+ // because `unpark` may have been called again since we read `NOTIFIED` in the
+ // `compare_exchange` above. We must perform an acquire operation that synchronizes
+ // with that `unpark` to observe any writes it made before the call to `unpark`. To
+ // do that we must read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ }
+ Err(n) => panic!("inconsistent park_timeout state: {}", n),
+ }
+
+ match timeout {
+ None => {
+ loop {
+ // Block the current thread on the conditional variable.
+ _m = self.cvar.wait(_m).unwrap();
+
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return; // got a notification
+ }
+
+ // spurious wakeup, go back to sleep
+ }
+ }
+ Some(timeout) => {
+ // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
+ // notification we just want to unconditionally set `state` back to `EMPTY`, either
+ // consuming a notification or un-flagging ourselves as parked.
+ _m = self.cvar.wait_timeout(_m, timeout).unwrap().0;
+
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED => {} // got a notification
+ PARKED => {} // no notification