diff options
Diffstat (limited to 'tokio/src/executor/thread_pool')
23 files changed, 3143 insertions, 0 deletions
diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs new file mode 100644 index 00000000..7955e286 --- /dev/null +++ b/tokio/src/executor/thread_pool/builder.rs @@ -0,0 +1,259 @@ +use crate::executor::loom::sync::Arc; +use crate::executor::loom::sys::num_cpus; +use crate::executor::loom::thread; +use crate::executor::park::Park; +use crate::executor::thread_pool::park::DefaultPark; +use crate::executor::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool}; + +use std::{fmt, usize}; + +/// Builds a thread pool with custom configuration values. +pub struct Builder { + /// Number of worker threads to spawn + pool_size: usize, + + /// Thread name + name: String, + + /// Thread stack size + stack_size: Option<usize>, + + /// Around worker callback + around_worker: Option<Callback>, +} + +// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized +// loom doesn't support that because it requires CoerceUnsized, which is unstable +type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; + +impl Builder { + /// Returns a new thread pool builder initialized with default configuration + /// values. + pub fn new() -> Builder { + Builder { + pool_size: num_cpus(), + name: "tokio-runtime-worker".to_string(), + stack_size: None, + around_worker: None, + } + } + + /// Set the number of threads running async tasks. + /// + /// This must be a number between 1 and 2,048 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// # Examples + /// + /// ``` + /// use tokio::executor::thread_pool::Builder; + /// + /// let thread_pool = Builder::new() + /// .num_threads(4) + /// .build(); + /// ``` + pub fn num_threads(&mut self, value: usize) -> &mut Self { + self.pool_size = value; + self + } + + /// Set name of threads spawned by the scheduler + /// + /// If this configuration is not set, then the thread will use the system + /// default naming scheme. + /// + /// # Examples + /// + /// ``` + /// use tokio::executor::thread_pool::Builder; + /// + /// let thread_pool = Builder::new() + /// .name("my-pool") + /// .build(); + /// ``` + pub fn name<S: Into<String>>(&mut self, val: S) -> &mut Self { + self.name = val.into(); + self + } + + /// Set the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// use tokio::executor::thread_pool::Builder; + /// + /// let thread_pool = Builder::new() + /// .stack_size(32 * 1024) + /// .build(); + /// ``` + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.stack_size = Some(val); + self + } + + /// Execute function `f` on each worker thread. + /// + /// This function is provided a function that executes the worker and is + /// expected to call it, otherwise the worker thread will shutdown without + /// doing any work. + /// + /// # Examples + /// + /// ``` + /// use tokio::executor::thread_pool::Builder; + /// + /// let thread_pool = Builder::new() + /// .around_worker(|index, work| { + /// println!("worker {} is starting up", index); + /// work(); + /// println!("worker {} is shutting down", index); + /// }) + /// .build(); + /// ``` + pub fn around_worker<F>(&mut self, f: F) -> &mut Self + where + F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static, + { + self.around_worker = Some(Arc::new(Box::new(f))); + self + } + + /// Create the configured `ThreadPool`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// use tokio::executor::thread_pool::Builder; + /// + /// let thread_pool = Builder::new() + /// .build(); + /// ``` + pub fn build(&self) -> ThreadPool { + self.build_with_park(|_| DefaultPark::new()) + } + + /// Create the configured `ThreadPool` with a custom `park` instances. + /// + /// The provided closure `build_park` is called once per worker and returns + /// a `Park` instance that is used by the worker to put itself to sleep. + pub fn build_with_park<F, P>(&self, mut build_park: F) -> ThreadPool + where + F: FnMut(usize) -> P, + P: Park + Send + 'static, + { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + + let around_worker = self.around_worker.as_ref().map(Arc::clone); + let launch_worker = move |worker: Worker<BoxedPark<P>>| { + let shutdown_tx = shutdown_tx.clone(); + let around_worker = around_worker.as_ref().map(Arc::clone); + Box::new(move || { + struct AbortOnPanic; + + impl Drop for AbortOnPanic { + fn drop(&mut self) { + if thread::panicking() { + eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); + std::process::abort(); + } + } + } + + let _abort_on_panic = AbortOnPanic; + if let Some(cb) = around_worker.as_ref() { + let idx = worker.id(); + let mut f = Some(move || worker.run()); + cb(idx, &mut || { + (f.take() + .expect("around_thread callback called closure twice"))( + ) + }) + } else { + worker.run() + } + + // Dropping the handle must happen __after__ the callback + drop(shutdown_tx); + }) as Box<dyn FnOnce() + Send + 'static> + }; + + let mut blocking = crate::executor::blocking::Builder::default(); + blocking.name(self.name.clone()); + if let Some(ss) = self.stack_size { + blocking.stack_size(ss); + } + let blocking = Arc::new(blocking.build()); + + let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( + self.pool_size, + |i| BoxedPark::new(build_park(i)), + blocking.clone(), + ); + + // Spawn threads for each worker + for worker in workers { + crate::executor::blocking::Pool::spawn(&blocking, launch_worker(worker)) + } + + let spawner = Spawner::new(pool); + let blocking = crate::executor::blocking::PoolWaiter::from(blocking); + ThreadPool::from_parts(spawner, shutdown_rx, blocking) + } +} + +impl Default for Builder { + fn default() -> Builder { + Builder::new() + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Builder") + .field("pool_size", &self.pool_size) + .field("name", &self.name) + .field("stack_size", &self.stack_size) + .finish() + } +} + +pub(crate) struct BoxedPark<P> { + inner: P, +} + +impl<P> BoxedPark<P> { + pub(crate) fn new(inner: P) -> Self { + BoxedPark { inner } + } +} + +impl<P> Park for BoxedPark<P> +where + P: Park, +{ + type Unpark = Box<dyn crate::executor::park::Unpark>; + type Error = P::Error; + + fn unpark(&self) -> Self::Unpark { + Box::new(self.inner.unpark()) + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } +} diff --git a/tokio/src/executor/thread_pool/current.rs b/tokio/src/executor/thread_pool/current.rs new file mode 100644 index 00000000..6910dca1 --- /dev/null +++ b/tokio/src/executor/thread_pool/current.rs @@ -0,0 +1,85 @@ +use crate::executor::loom::sync::Arc; +use crate::executor::park::Unpark; +use crate::executor::thread_pool::{worker, Owned}; + +use std::cell::Cell; +use std::ptr; + +/// Tracks the current worker +#[derive(Debug)] +pub(super) struct Current { + inner: Inner, +} + +#[derive(Debug, Copy, Clone)] +struct Inner { + // thread-local variables cannot track generics. However, the current worker + // is only checked when `P` is already known, so the type can be figured out + // on demand. + workers: *const (), + idx: usize, +} + +// Pointer to the current worker info +thread_local!(static CURRENT_WORKER: Cell<Inner> = Cell::new(Inner::new())); + +pub(super) fn set<F, R, P>(pool: &Arc<worker::Set<P>>, index: usize, f: F) -> R +where + F: FnOnce() -> R, + P: Unpark, +{ + CURRENT_WORKER.with(|cell| { + assert!(cell.get().workers.is_null()); + + struct Guard<'a>(&'a Cell<Inner>); + + impl Drop for Guard<'_> { + fn drop(&mut self) { + self.0.set(Inner::new()); + } + } + + cell.set(Inner { + workers: pool.shared() as *const _ as *const (), + idx: index, + }); + + let _g = Guard(cell); + + f() + }) +} + +pub(super) fn get<F, R>(f: F) -> R +where + F: FnOnce(&Current) -> R, +{ + CURRENT_WORKER.with(|cell| { + let current = Current { inner: cell.get() }; + f(¤t) + }) +} + +impl Current { + pub(super) fn as_member<'a, P>(&self, set: &'a worker::Set<P>) -> Option<&'a Owned<P>> + where + P: Unpark, + { + let inner = CURRENT_WORKER.with(|cell| cell.get()); + + if ptr::eq(inner.workers as *const _, set.shared().as_ptr()) { + Some(unsafe { &*set.owned()[inner.idx].get() }) + } else { + None + } + } +} + +impl Inner { + fn new() -> Inner { + Inner { + workers: ptr::null(), + idx: 0, + } + } +} diff --git a/tokio/src/executor/thread_pool/idle.rs b/tokio/src/executor/thread_pool/idle.rs new file mode 100644 index 00000000..0fae29bf --- /dev/null +++ b/tokio/src/executor/thread_pool/idle.rs @@ -0,0 +1,229 @@ +//! Coordinates idling workers + +use crate::executor::loom::sync::atomic::AtomicUsize; +use crate::executor::loom::sync::Mutex; + +use std::fmt; +use std::sync::atomic::Ordering::{self, AcqRel, Relaxed, SeqCst}; + +pub(super) struct Idle { + /// Tracks both the number of searching workers and the number of unparked + /// workers. + /// + /// Used as a fast-path to avoid acquiring the lock when needed. + state: AtomicUsize, + + /// Sleeping workers + sleepers: Mutex<Vec<usize>>, + + /// Total number of workers. + num_workers: usize, +} + +const UNPARK_SHIFT: usize = 16; +const UNPARK_MASK: usize = !SEARCH_MASK; +const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1; + +#[derive(Copy, Clone)] +struct State(usize); + +impl Idle { + pub(super) fn new(num_workers: usize) -> Idle { + let init = State::new(num_workers); + + Idle { + state: AtomicUsize::new(init.into()), + sleepers: Mutex::new(Vec::with_capacity(num_workers)), + num_workers, + } + } + + /// If there are no workers actively searching, returns the index of a + /// worker currently sleeping. + pub(super) fn worker_to_notify(&self) -> Option<usize> { + // If at least one worker is spinning, work being notified will + // eventully be found. A searching thread will find **some** work and + // notify another worker, eventually leading to our work being found. + // + // For this to happen, this load must happen before the thread + // transitioning `num_searching` to zero. Acquire / Relese does not + // provide sufficient guarantees, so this load is done with `SeqCst` and + // will pair with the `fetch_sub(1)` when transitioning out of + // searching. + if !self.notify_should_wakeup() { + return None; + } + + // Acquire the lock + let mut sleepers = self.sleepers.lock().unwrap(); + + // Check again, now that the lock is acquired + if !self.notify_should_wakeup() { + return None; + } + + // A worker should be woken up, atomically increment the number of + // searching workers as well as the number of unparked workers. + State::unpark_one(&self.state); + + // Get the worker to unpark + let ret = sleepers.pop(); + debug_assert!(ret.is_some()); + + ret + } + + /// Returns `true` if the worker needs to do a final check for submitted + /// work. + pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { + // Acquire the lock + let mut sleepers = self.sleepers.lock().unwrap(); + + // Decrement the number of unparked threads + let ret = State::dec_num_unparked(&self.state, is_searching); + + // Track the sleeping worker + sleepers.push(worker); + + ret + } + + pub(super) fn transition_worker_to_searching(&self) -> bool { + // Using `Relaxed` ordering is acceptable here as it is just an + // optimization. This load has does not need to synchronize with + // anything, and the algorithm is correct no matter what the load + // returns (as in, it could return absolutely any `usize` value and the + // pool would be correct. + let state = State::load(&self.state, Relaxed); + if 2 * state.num_searching() >= self.num_workers { + return false; + } + + // It is possible for this routine to allow more than 50% of the workers + // to search. That is OK. Limiting searchers is only an optimization to + // prevent too much contention. + // + // At this point, we do not need a hard synchronization with `notify_work`, so `AcqRel` is sufficient. + State::inc_num_searching(&self.state, AcqRel); + true + } + + /// A lightweight transition from searching -> running. + /// + /// Returns `true` if this is the final searching worker. The caller + /// **must** notify a new worker. + pub(super) fn transition_worker_from_searching(&self) -> bool { + State::dec_num_searching(&self.state) + } + + /// Unpark a specific worker. This happens if tasks are submitted from + /// within the worker's park routine. + pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { + let mut sleepers = self.sleepers.lock().unwrap(); + + for index in 0..sleepers.len() { + if sleepers[index] == worker_id { + sleepers.swap_remove(index); + + // Update the state accordingly whle the lock is held. + State::unpark_one(&self.state); + + return; + } + } + } + + /// Returns `true` if `worker_id` is contained in the sleep set + pub(super) fn is_parked(&self, worker_id: usize) -> bool { + let sleepers = self.sleepers.lock().unwrap(); + sleepers.contains(&worker_id) + } + + fn notify_should_wakeup(&self) -> bool { + let state = State::load(&self.state, SeqCst); + state.num_searching() == 0 && state.num_unparked() < self.num_workers + } +} + +impl State { + fn new(num_workers: usize) -> State { + // All workers start in the unparked state + let ret = State(num_workers << UNPARK_SHIFT); + debug_assert_eq!(num_workers, ret.num_unparked()); + debug_assert_eq!(0, ret.num_searching()); + ret + } + + fn load(cell: &AtomicUsize, ordering: Ordering) -> State { + State(cell.load(ordering)) + } + + fn unpark_one(cell: &AtomicUsize) { + cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst); + } + + fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) { + cell.fetch_add(1, ordering); + } + + /// Returns `true` if this is the final searching worker + fn dec_num_searching(cell: &AtomicUsize) -> bool { + let state = State(cell.fetch_sub(1, SeqCst)); + state.num_searching() == 1 + } + + /// Track a sleeping worker + /// + /// Returns `true` if this is the final searching worker. + fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool { + let mut dec = 1 << UNPARK_SHIFT; + + if is_searching { + dec += 1; + } + + let prev = State(cell.fetch_sub(dec, SeqCst)); + is_searching && prev.num_searching() == 1 + } + + /// Number of workers currently searching + fn num_searching(self) -> usize { + self.0 & SEARCH_MASK + } + + /// Number of workers currently unparked + fn num_unparked(self) -> usize { + (self.0 & UNPARK_MASK) >> UNPARK_SHIFT + } +} + +impl From<usize> for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("worker::State") + .field("num_unparked", &self.num_unparked()) + .field("num_searching", &self.num_searching()) + .finish() + } +} + +#[test] +fn test_state() { + assert_eq!(0, UNPARK_MASK & SEARCH_MASK); + assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK)); + + let state = State::new(10); + assert_eq!(10, state.num_unparked()); + assert_eq!(0, state.num_searching()); +} diff --git a/tokio/src/executor/thread_pool/join.rs b/tokio/src/executor/thread_pool/join.rs new file mode 100644 index 00000000..e066882a --- /dev/null +++ b/tokio/src/executor/thread_pool/join.rs @@ -0,0 +1,42 @@ +use crate::executor::park::Unpark; +use crate::executor::task; +use crate::executor::thread_pool::Shared; + +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// An owned permission to join on a task (await its termination). +pub struct JoinHandle<T> { + task: task::JoinHandle<T, Shared<Box<dyn Unpark>>>, +} + +impl<T> JoinHandle<T> +where + T: Send + 'static, +{ + pub(super) fn new(task: task::JoinHandle<T, Shared<Box<dyn Unpark>>>) -> JoinHandle<T> { + JoinHandle { task } + } +} + +impl<T> Future for JoinHandle<T> +where + T: Send + 'static, +{ + type Output = task::Result<T>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.task).poll(cx) + } +} + +impl<T> fmt::Debug for JoinHandle<T> +where + T: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } +} diff --git a/tokio/src/executor/thread_pool/mod.rs b/tokio/src/executor/thread_pool/mod.rs new file mode 100644 index 00000000..bab594ef --- /dev/null +++ b/tokio/src/executor/thread_pool/mod.rs @@ -0,0 +1,58 @@ +//! Threadpool + +mod builder; +pub use self::builder::Builder; + +mod current; + +mod idle; +use self::idle::Idle; + +mod join; +pub use self::join::JoinHandle; + +mod owned; +use self::owned::Owned; + +mod park; + +mod pool; +pub use self::pool::ThreadPool; + +mod queue; + +mod spawner; +pub use self::spawner::Spawner; + +mod set; + +mod shared; +use self::shared::Shared; + +mod shutdown; + +mod worker; + +/// Unit tests +#[cfg(test)] +mod tests; + +// Re-export `task::Error` +pub use crate::executor::task::Error; + +// These exports are used in tests +#[cfg(test)] +#[allow(warnings)] +pub(crate) use self::worker::create_set as create_pool; + +pub(crate) type BoxFuture = + std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>; + +#[cfg(not(loom))] +const LOCAL_QUEUE_CAPACITY: usize = 256; + +// Shrink the size of the local queue when using loom. This shouldn't impact +// logic, but allows loom to test more edge cases in a reasonable a mount of +// time. +#[cfg(loom)] +const LOCAL_QUEUE_CAPACITY: usize = 2; diff --git a/tokio/src/executor/thread_pool/owned.rs b/tokio/src/executor/thread_pool/owned.rs new file mode 100644 index 00000000..f0aece57 --- /dev/null +++ b/tokio/src/executor/thread_pool/owned.rs @@ -0,0 +1,77 @@ +use crate::executor::task::{self, Task}; +use crate::executor::thread_pool::{queue, Shared}; +use crate::executor::util::FastRand; + +use std::cell::Cell; + +/// Per-worker data accessible only by the thread driving the worker. +#[derive(Debug)] +pub(super) struct Owned<P: 'static> { + /// Worker tick number. Used to schedule bookkeeping tasks every so often. + pub(super) tick: Cell<u16>, + + /// Caches the pool run state. + pub(super) is_running: Cell<bool>, + + /// `true` if the worker is currently searching for more work. + pub(super) is_searching: Cell<bool>, + + /// `true` when worker notification should be delayed. + /// + /// This is used to batch notifications triggered by the parker. + pub(super) defer_notification: Cell<bool>, + + /// `true` if a task was submitted while `defer_notification` was set + pub(super) did_submit_task: Cell<bool>, + + /// Fast random number generator + pub(super) rand: FastRand, + + /// Work queue + pub(super) work_queue: queue::Worker<Shared<P>>, + + /// List of tasks owned by the worker + pub(super) owned_tasks: task::OwnedList<Shared<P>>, +} + +impl<P> Owned<P> +where + P: 'static, +{ + pub(super) fn new(work_queue: queue::Worker<Shared<P>>, rand: FastRand) -> Owned<P> { + Owned { + tick: Cell::new(1), + is_running: Cell::new(true), + is_searching: Cell::new(false), + defer_notification: Cell::new(false), + did_submit_task: Cell::new(false), + rand, + work_queue, + owned_tasks: task::OwnedList::new(), + } + } + + /// Returns `true` if a worker should be notified + pub(super) fn submit_local(&self, task: Task<Shared<P>>) -> bool { + let ret = self.work_queue.push(task); + + if self.defer_notification.get() { + self.did_submit_task.set(true); + false + } else { + ret + } + } + + pub(super) fn submit_local_yield(&self, task: Task<Shared<P>>) { + self.work_queue.push_yield(task); + } + + pub(super) fn bind_task(&mut self, task: &Task<Shared<P>>) { + self.owned_tasks.insert(task); + } + + pub(super) fn release_task(&mut self, task: &Task<Shared<P>>) { + self.owned_tasks.remove(task); + } +} diff --git a/tokio/src/executor/thread_pool/park.rs b/tokio/src/executor/thread_pool/park.rs new file mode 100644 index 00000000..225d21db --- /dev/null +++ b/tokio/src/executor/thread_pool/park.rs @@ -0,0 +1,182 @@ +use crate::executor::loom::sync::atomic::AtomicUsize; +use crate::executor::loom::sync::{Arc, Condvar, Mutex}; +use crate::executor::park::{Park, Unpark}; + +use std::error::Error; +use std::fmt; +use std::sync::atomic::Ordering::SeqCst; +use std::time::Duration; + +/// Parks the thread. +#[derive(Debug)] +pub(crate) struct DefaultPark { + inner: Arc<Inner>, +} + +/// Unparks threads that were parked by `DefaultPark`. +#[derive(Debug)] +pub(crate) struct DefaultUnpark { + inner: Arc<Inner>, +} + +/// Error returned by [`ParkThread`] +/// +/// This currently is never returned, but might at some point in the future. +/// +/// [`ParkThread`]: struct.ParkThread.html +#[derive(Debug)] +pub(crate) struct ParkError { + _p: (), +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +#[derive(Debug)] +struct Inner { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl DefaultPark { + /// Creates a new `DefaultPark` instance. + pub(crate) fn new() -> DefaultPark { + DefaultPark { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }), + } + } +} + +impl Park for DefaultPark { + type Unpark = DefaultUnpark; + type Error = ParkError; + + fn unpark(&self) -> Self::Unpark { + let inner = self.inner.clone(); + DefaultUnpark { inner } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(None); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park(Some(duration)); + Ok(()) + } +} + +impl Unpark for DefaultUnpark { + fn unpark(&self) { + self.inner.unpark(); + } +} + +impl fmt::Display for ParkError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "unknown park error") + } +} + +impl Error for ParkError {} + +impl Inner { + fn park(&self, timeout: Option<Duration>) { + // If we were previously notified then we consume this notification and return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // If the timeout is zero, then there is no need to actually block. + if let Some(ref dur) = timeout { + if *dur == Duration::from_millis(0) { + return; + } + } + + // Otherwise we need to coordinate going to sleep. + let mut _m = self.lock.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + // Consume this notification to avoid spurious wakeups in the next park. + Err(NOTIFIED) => { + // We must read `state` here, even though we know it will be `NOTIFIED`. This is + // because `unpark` may have been called again since we read `NOTIFIED` in the + // `compare_exchange` above. We must perform an acquire operation that synchronizes + // with that `unpark` to observe any writes it made before the call to `unpark`. To + // do that we must read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } + Err(n) => panic!("inconsistent park_timeout state: {}", n), + } + + match timeout { + None => { + loop { + // Block the current thread on the conditional variable. + _m = self.cvar.wait(_m).unwrap(); + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; // got a notification + } + + // spurious wakeup, go back to sleep + } + } + Some(timeout) => { + // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a + // notification we just want to unconditionally set `state` back to `EMPTY`, either + // consuming a notification or un-flagging ourselves as parked. + _m = self.cvar.wait_timeout(_m, timeout).unwrap().0; + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification + PARKED => {} // no notification |