//! Timer state structures. //! //! This module contains the heart of the intrusive timer implementation, and as //! such the structures inside are full of tricky concurrency and unsafe code. //! //! # Ground rules //! //! The heart of the timer implementation here is the `TimerShared` structure, //! shared between the `TimerEntry` and the driver. Generally, we permit access //! to `TimerShared` ONLY via either 1) a mutable reference to `TimerEntry` or //! 2) a held driver lock. //! //! It follows from this that any changes made while holding BOTH 1 and 2 will //! be reliably visible, regardless of ordering. This is because of the acq/rel //! fences on the driver lock ensuring ordering with 2, and rust mutable //! reference rules for 1 (a mutable reference to an object can't be passed //! between threads without an acq/rel barrier, and same-thread we have local //! happens-before ordering). //! //! # State field //! //! Each timer has a state field associated with it. This field contains either //! the current scheduled time, or a special flag value indicating its state. //! This state can either indicate that the timer is on the 'pending' queue (and //! thus will be fired with an `Ok(())` result soon) or that it has already been //! fired/deregistered. //! //! This single state field allows for code that is firing the timer to //! synchronize with any racing `reset` calls reliably. //! //! # Cached vs true timeouts //! //! To allow for the use case of a timeout that is periodically reset before //! expiration to be as lightweight as possible, we support optimistically //! lock-free timer resets, in the case where a timer is rescheduled to a later //! point than it was originally scheduled for. //! //! This is accomplished by lazily rescheduling timers. That is, we update the //! state field field with the true expiration of the timer from the holder of //! the [`TimerEntry`]. When the driver services timers (ie, whenever it's //! walking lists of timers), it checks this "true when" value, and reschedules //! based on it. //! //! We do, however, also need to track what the expiration time was when we //! originally registered the timer; this is used to locate the right linked //! list when the timer is being cancelled. This is referred to as the "cached //! when" internally. //! //! There is of course a race condition between timer reset and timer //! expiration. If the driver fails to observe the updated expiration time, it //! could trigger expiration of the timer too early. However, because //! `mark_pending` performs a compare-and-swap, it will identify this race and //! refuse to mark the timer as pending. use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::Ordering; use crate::sync::AtomicWaker; use crate::time::Instant; use crate::util::linked_list; use super::Handle; use std::cell::UnsafeCell as StdUnsafeCell; use std::task::{Context, Poll, Waker}; use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; type TimerResult = Result<(), crate::time::error::Error>; const STATE_DEREGISTERED: u64 = u64::max_value(); const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; /// Not all platforms support 64-bit compare-and-swap. This hack replaces the /// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow, /// unfortunately, but 32-bit platforms are a bit niche so it'll do for now. /// /// Note: We use "x86 or 64-bit pointers" as the condition here because /// target_has_atomic is not stable. #[cfg(all( not(tokio_force_time_entry_locked), any(target_arch = "x86", target_pointer_width = "64") ))] type AtomicU64 = crate::loom::sync::atomic::AtomicU64; #[cfg(not(all( not(tokio_force_time_entry_locked), any(target_arch = "x86", target_pointer_width = "64") )))] #[derive(Debug)] struct AtomicU64 { inner: crate::loom::sync::Mutex, } #[cfg(not(all( not(tokio_force_time_entry_locked), any(target_arch = "x86", target_pointer_width = "64") )))] impl AtomicU64 { fn new(v: u64) -> Self { Self { inner: crate::loom::sync::Mutex::new(v), } } fn load(&self, _order: Ordering) -> u64 { debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock *self.inner.lock() } fn store(&self, v: u64, _order: Ordering) { debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock *self.inner.lock() = v; } fn compare_exchange( &self, current: u64, new: u64, _success: Ordering, _failure: Ordering, ) -> Result { debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock debug_assert_ne!(_failure, Ordering::SeqCst); let mut lock = self.inner.lock(); if *lock == current { *lock = new; Ok(current) } else { Err(*lock) } } fn compare_exchange_weak( &self, current: u64, new: u64, success: Ordering, failure: Ordering, ) -> Result { self.compare_exchange(current, new, success, failure) } } /// This structure holds the current shared state of the timer - its scheduled /// time (if registered), or otherwise the result of the timer completing, as /// well as the registered waker. /// /// Generally, the StateCell is only permitted to be accessed from two contexts: /// Either a thread holding the corresponding &mut TimerEntry, or a thread /// holding the timer driver lock. The write actions on the StateCell amount to /// passing "ownership" of the StateCell between these contexts; moving a timer /// from the TimerEntry to the driver requires _both_ holding the &mut /// TimerEntry and the driver lock, while moving it back (firing the timer) /// requires only the driver lock. pub(super) struct StateCell { /// Holds either the scheduled expiration time for this timer, or (if the /// timer has been fired and is unregistered), [`u64::max_value()`]. state: AtomicU64, /// If the timer is fired (an Acquire order read on state shows /// `u64::max_value()`), holds the result that should be returned from /// polling the timer. Otherwise, the contents are unspecified and reading /// without holding the driver lock is undefined behavior. result: UnsafeCell, /// The currently-registered waker waker: CachePadded, } impl Default for StateCell { fn default() -> Self { Self::new() } } impl std::fmt::Debug for StateCell { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "StateCell({:?})", self.read_state()) } } impl StateCell { fn new() -> Self { Self { state: AtomicU64::new(STATE_DEREGISTERED), result: UnsafeCell::new(Ok(())), waker: CachePadded(AtomicWaker::new()), } } fn is_pending(&self) -> bool { self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE } /// Returns the current expiration time, or None if not currently scheduled. fn when(&self) -> Option { let cur_state = self.state.load(Ordering::Relaxed); if cur_state == u64::max_value() { None } else { Some(cur_state) } } /// If the timer is completed, returns the result of the timer. Otherwise, /// returns None and registers the waker. fn poll(&self, waker: &Waker) -> Poll { // We must register first. This ensures that either `fire` will // observe the new waker, or we will observe a racing fire to have set // the state, or both. self.waker.0.register_by_ref(waker); self.read_state() } fn read_state(&self) -> Poll { let cur_state = self.state.load(Ordering::Acquire); if cur_state == STATE_DEREGISTERED { // SAFETY: The driver has fired this timer; this involves writing // the result, and then writing (with release ordering) the state // field. Poll::Ready(unsafe { self.result.with(|p| *p) }) } else { Poll::Pending } } /// Marks this timer as being moved to the pending list, if its scheduled /// time is not after `not_after`. /// /// If the timer is scheduled for a time after not_after, returns an Err /// containing the current scheduled time. /// /// SAFETY: Must hold the driver lock. unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { // Quick initial debug check to see if the timer is already fired. Since // firing the timer can only happen with the driver lock held, we know // we shouldn't be able to "miss" a transition to a fired state, even // with relaxed ordering. let mut cur_state = self.state.load(Ordering::Relaxed); loop { debug_assert!(cur_state < STATE_MIN_VALUE); if cur_state > not_after { break Err(cur_state); } match self.state.compare_exchange( cur_state, STATE_PENDING_FIRE, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { break Ok(()); } Err(actual_state) => { cur_state = actual_state; } } } } /// Fires the timer, setting the result to the provided result. /// /// Returns: /// * `Some(waker) - if fired and a waker needs to be invoked once the /// driver lock is released /// * `None` - if fired and a waker does not need to be invoked, or if /// already fired /// /// SAFETY: The driver lock must be held. unsafe fn fire(&self, result: TimerResult) -> Option { // Quick initial check to see if the timer is already fired. Since // firing the timer can only happen with the driver lock held, we know // we shouldn't be able to "miss" a transition to a fired state, even // with relaxed ordering. let cur_state = self.state.load(Ordering::Relaxed); if cur_state == STATE_DEREGISTERED { return None; } // SAFETY: We assume the driver lock is held and the timer is not // fired, so only the driver is accessing this field. // // We perform a release-ordered store to state below, to ensure this // write is visible before the state update is visible. unsafe { self.result.with_mut(|p| *p = result) }; self.state.store(STATE_DEREGISTERED, Ordering::Release); self.waker.0.take_waker() } /// Marks the timer as registered (poll will return None) and sets the /// expiration time. /// /// While this function is memory-safe, it should only be called from a /// context holding both `&mut TimerEntry` and the driver lock. fn set_expiration(&self, timestamp: u64) { debug_assert!(timestamp < STATE_MIN_VALUE); // We can use relaxed ordering because we hold the driver lock and will // fence when we release the lock. self.state.store(timestamp, Ordering::Relaxed); } /// Attempts to adjust the timer to a new timestamp. /// /// If the timer has already been fired, is pending firing, or the new /// timestamp is earlier than the old timestamp, (or occasionally /// spuriously) returns Err without changing the timer's state. In this /// case, the timer must be deregistered and re-registered. fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { let mut prior = self.state.load(Ordering::Relaxed); loop { if new_timestamp < prior || prior >= STATE_MIN_VALUE { return Err(()); } match self.state.compare_exchange_weak( prior, new_timestamp, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { return Ok(()); } Err(true_prior) => { prior = true_prior; } } } } /// Returns true if the state of this timer indicates that the timer might /// be registered with the driver. This check is performed with relaxed /// ordering, but is conservative - if it returns false, the timer is /// definitely _not_ registered. pub(super) fn might_be_registered(&self) -> bool { self.state.load(Ordering::Relaxed) != u64::max_value() } } /// A timer entry. /// /// This is the handle to a timer that is controlled by the requester of the /// timer. As this participates in intrusive data structures, it must be pinned /// before polling. #[derive(Debug)] pub(super) struct TimerEntry { /// Arc reference to the driver. We can only free the driver after /// deregistering everything from their respective timer wheels. driver: Handle, /// Shared inner structure; this is part of an intrusive linked list, and /// therefore other references can exist to it while mutable references to /// Entry exist. /// /// This is manipulated only under the inner mutex. TODO: Can we use loom /// cells for this? inner: StdUnsafeCell, /// Initial deadline for the timer. This is used to register on the first /// poll, as we can't register prior to being pinned. initial_deadline: Option, } unsafe impl Send for TimerEntry {} unsafe impl Sync for TimerEntry {} /// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the /// timer entry. Generally, at most one TimerHandle exists for a timer at a time /// (enforced by the timer state machine). /// /// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats /// of pointer safety apply. In particular, TimerHandle does not itself enforce /// that the timer does still exist; however, normally an TimerHandle is created /// immediately before registering the timer, and is consumed when firing the /// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce /// memory safety, all operations are unsafe. #[derive(Debug)] pub(crate) struct TimerHandle { inner: NonNull, } pub(super) type EntryList = crate::util::linked_list::LinkedList; /// The shared state structure of a timer. This structure is shared between the /// frontend (`Entry`) and driver backend. /// /// Note that this structure is located inside the `TimerEntry` structure. #[derive(Debug)] pub(crate) struct TimerShared { /// Current state. This records whether the timer entry is currently under /// the ownership of the driver, and if not, its current state (not /// complete, fired, error, etc). state: StateCell, /// Data manipulated by the driver thread itself, only. driver_state: CachePadded, _p: PhantomPinned, } impl TimerShared { pub(super) fn new() -> Self { Self { state: StateCell::default(), driver_state: CachePadded(TimerSharedPadded::new()), _p: PhantomPinned, } } /// Gets the cached time-of-expiration value pub(super) fn cached_when(&self) -> u64 { // Cached-when is only accessed under the driver lock, so we can use relaxed self.driver_state.0.cached_when.load(Ordering::Relaxed) } /// Gets the true time-of-expiration value, and copies it into the cached /// time-of-expiration value. /// /// SAFETY: Must be called with the driver lock held, and when this entry is /// not in any timer wheel lists. pub(super) unsafe fn sync_when(&self) -> u64 { let true_when = self.true_when(); self.driver_state .0 .cached_when .store(true_when, Ordering::Relaxed); true_when } /// Sets the cached time-of-expiration value. /// /// SAFETY: Must be called with the driver lock held, and when this entry is /// not in any timer wheel lists. unsafe fn set_cached_when(&self, when: u64) { self.driver_state .0 .cached_when .store(when, Ordering::Relaxed); } /// Returns the true time-of-expiration value, with relaxed memory ordering. pub(super) fn true_when(&self) -> u64 { self.state.when().expect("Timer already fired") } /// Sets the true time-of-expiration value, even if it is less than the /// current expiration or the timer is deregistered. /// /// SAFETY: Must only be called with the driver lock held and the entry not /// in the timer wheel. pub(super) unsafe fn set_expiration(&self, t: u64) { self.state.set_expiration(t); self.driver_state.0.cached_when.store(t, Ordering::Relaxed); } /// Sets the true time-of-expiration only if it is after the current. pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { self.state.extend_expiration(t) } /// Returns a TimerHandle for this timer. pub(super) fn handle(&self) -> TimerHandle { TimerHandle { inner: NonNull::from(self), } } /// Returns true if the state of this timer indicates that the timer might /// be registered with the driver. This check is performed with relaxed /// ordering, but is conservative - if it returns false, the timer is /// definitely _not_ registered. pub(super) fn might_be_registered(&self) -> bool { self.state.might_be_registered() } } /// Additional shared state between the driver and the timer which is cache /// padded. This contains the information that the driver thread accesses most /// frequently to minimize contention. In particular, we move it away from the /// waker, as the waker is updated on every poll. struct TimerSharedPadded { /// The expiration time for which this entry is currently registered. /// Generally owned by the driver, but is accessed by the entry when not /// registered. cached_when: AtomicU64, /// The true expiration time. Set by the timer future, read by the driver. true_when: AtomicU64, /// A link within the doubly-linked list of timers on a particular level and /// slot. Valid only if state is equal to Registered. /// /// Only accessed under the entry lock. pointers: StdUnsafeCell>, } impl std::fmt::Debug for TimerSharedPadded { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("TimerSharedPadded") .field("when", &self.true_when.load(Ordering::Relaxed)) .field("cached_when", &self.cached_when.load(Ordering::Relaxed)) .finish() } } impl TimerSharedPadded { fn new() -> Self { Self { cached_when: AtomicU64::new(0), true_when: AtomicU64::new(0), pointers: StdUnsafeCell::new(linked_list::Pointers::new()), } } } unsafe impl Send for TimerShared {} unsafe impl Sync for TimerShared {} unsafe impl linked_list::Link for TimerShared { type Handle = TimerHandle; type Target = TimerShared; fn as_raw(handle: &Self::Handle) -> NonNull { handle.inner } unsafe fn from_raw(ptr: NonNull) -> Self::Handle { TimerHandle { inner: ptr } } unsafe fn pointers( target: NonNull, ) -> NonNull> { unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() } } } // ===== impl Entry ===== impl TimerEntry { pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { let driver = handle.clone(); Self { driver, inner: StdUnsafeCell::new(TimerShared::new()), initial_deadline: Some(deadline), } } fn inner(&self) -> &TimerShared { unsafe { &*self.inner.get() } } pub(crate) fn is_elapsed(&self) -> bool { !self.inner().state.might_be_registered() && self.initial_deadline.is_none() } /// Cancels and deregisters the timer. This operation is irreversible. pub(crate) fn cancel(self: Pin<&mut Self>) { // We need to perform an acq/rel fence with the driver thread, and the // simplest way to do so is to grab the driver lock. // // Why is this necessary? We're about to release this timer's memory for // some other non-timer use. However, we've been doing a bunch of // relaxed (or even non-atomic) writes from the driver thread, and we'll // be doing more from _this thread_ (as this memory is interpreted as // something else). // // It is critical to ensure that, from the point of view of the driver, // those future non-timer writes happen-after the timer is fully fired, // and from the purpose of this thread, the driver's writes all // happen-before we drop the timer. This in turn requires us to perform // an acquire-release barrier in _both_ directions between the driver // and dropping thread. // // The lock acquisition in clear_entry serves this purpose. All of the // driver manipulations happen with the lock held, so we can just take // the lock and be sure that this drop happens-after everything the // driver did so far and happens-before everything the driver does in // the future. While we have the lock held, we also go ahead and // deregister the entry if necessary. unsafe { self.driver.clear_entry(NonNull::from(self.inner())) }; } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None; let tick = self.driver.time_source().deadline_to_tick(new_time); if self.inner().extend_expiration(tick).is_ok() { return; } unsafe { self.driver.reregister(tick, self.inner().into()); } } pub(crate) fn poll_elapsed( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if let Some(deadline) = self.initial_deadline { self.as_mut().reset(deadline); } let this = unsafe { self.get_unchecked_mut() }; this.inner().state.poll(cx.waker()) } } impl TimerHandle { pub(super) unsafe fn cached_when(&self) -> u64 { unsafe { self.inner.as_ref().cached_when() } } pub(super) unsafe fn sync_when(&self) -> u64 { unsafe { self.inner.as_ref().sync_when() } } pub(super) unsafe fn is_pending(&self) -> bool { unsafe { self.inner.as_ref().state.is_pending() } } /// Forcibly sets the true and cached expiration times to the given tick. /// /// SAFETY: The caller must ensure that the handle remains valid, the driver /// lock is held, and that the timer is not in any wheel linked lists. pub(super) unsafe fn set_expiration(&self, tick: u64) { self.inner.as_ref().set_expiration(tick); } /// Attempts to mark this entry as pending. If the expiration time is after /// `not_after`, however, returns an Err with the current expiration time. /// /// If an `Err` is returned, the `cached_when` value will be updated to this /// new expiration time. /// /// SAFETY: The caller must ensure that the handle remains valid, the driver /// lock is held, and that the timer is not in any wheel linked lists. /// After returning Ok, the entry must be added to the pending list. pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { match self.inner.as_ref().state.mark_pending(not_after) { Ok(()) => { // mark this as being on the pending queue in cached_when self.inner.as_ref().set_cached_when(u64::max_value()); Ok(()) } Err(tick) => { self.inner.as_ref().set_cached_when(tick); Err(tick) } } } /// Attempts to transition to a terminal state. If the state is already a /// terminal state, does nothing. /// /// Because the entry might be dropped after the state is moved to a /// terminal state, this function consumes the handle to ensure we don't /// access the entry afterwards. /// /// Returns the last-registered waker, if any. /// /// SAFETY: The driver lock must be held while invoking this function, and /// the entry must not be in any wheel linked lists. pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option { self.inner.as_ref().state.fire(completed_state) } } impl Drop for TimerEntry { fn drop(&mut self) { unsafe { Pin::new_unchecked(self) }.as_mut().cancel() } } #[cfg_attr(target_arch = "x86_64", repr(align(128)))] #[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] #[derive(Debug, Default)] struct CachePadded(T);