summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorbdonlan <bdonlan@gmail.com>2020-11-23 10:42:50 -0800
committerGitHub <noreply@github.com>2020-11-23 10:42:50 -0800
commitae67851f11b7cc1f577de8ce21767ce3e2c7aff9 (patch)
treebe43cb76333b0e9e42a101d659f9b2e41555d779 /tokio/src
parentf927f01a34d7cedf0cdc820f729a7a6cd56e83dd (diff)
time: use intrusive lists for timer tracking (#3080)
More-or-less a half-rewrite of the current time driver, supporting the use of intrusive futures for timer registration. Fixes: #3028, #3069
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/loom/std/mod.rs2
-rw-r--r--tokio/src/stream/throttle.rs2
-rw-r--r--tokio/src/stream/timeout.rs2
-rw-r--r--tokio/src/time/driver/atomic_stack.rs124
-rw-r--r--tokio/src/time/driver/entry.rs854
-rw-r--r--tokio/src/time/driver/handle.rs41
-rw-r--r--tokio/src/time/driver/mod.rs494
-rw-r--r--tokio/src/time/driver/sleep.rs (renamed from tokio/src/time/sleep.rs)44
-rw-r--r--tokio/src/time/driver/tests/mod.rs246
-rw-r--r--tokio/src/time/driver/wheel/level.rs (renamed from tokio/src/time/wheel/level.rs)68
-rw-r--r--tokio/src/time/driver/wheel/mod.rs (renamed from tokio/src/time/wheel/mod.rs)155
-rw-r--r--tokio/src/time/driver/wheel/stack.rs (renamed from tokio/src/time/wheel/stack.rs)0
-rw-r--r--tokio/src/time/error.rs26
-rw-r--r--tokio/src/time/mod.rs43
-rw-r--r--tokio/src/time/tests/mod.rs2
-rw-r--r--tokio/src/time/tests/test_sleep.rs12
-rw-r--r--tokio/src/time/timeout.rs2
-rw-r--r--tokio/src/util/mod.rs1
18 files changed, 1306 insertions, 812 deletions
diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs
index 414ef906..c3f74efb 100644
--- a/tokio/src/loom/std/mod.rs
+++ b/tokio/src/loom/std/mod.rs
@@ -47,7 +47,7 @@ pub(crate) mod rand {
}
pub(crate) mod sync {
- pub(crate) use std::sync::Arc;
+ pub(crate) use std::sync::{Arc, Weak};
// Below, make sure all the feature-influenced types are exported for
// internal use. Note however that some are not _currently_ named by
diff --git a/tokio/src/stream/throttle.rs b/tokio/src/stream/throttle.rs
index 8f4a256d..ff1fbf01 100644
--- a/tokio/src/stream/throttle.rs
+++ b/tokio/src/stream/throttle.rs
@@ -17,7 +17,7 @@ where
let delay = if duration == Duration::from_millis(0) {
None
} else {
- Some(Sleep::new_timeout(Instant::now() + duration, duration))
+ Some(Sleep::new_timeout(Instant::now() + duration))
};
Throttle {
diff --git a/tokio/src/stream/timeout.rs b/tokio/src/stream/timeout.rs
index 669973ff..61154da0 100644
--- a/tokio/src/stream/timeout.rs
+++ b/tokio/src/stream/timeout.rs
@@ -23,7 +23,7 @@ pin_project! {
impl<S: Stream> Timeout<S> {
pub(super) fn new(stream: S, duration: Duration) -> Self {
let next = Instant::now() + duration;
- let deadline = Sleep::new_timeout(next, duration);
+ let deadline = Sleep::new_timeout(next);
Timeout {
stream: Fuse::new(stream),
diff --git a/tokio/src/time/driver/atomic_stack.rs b/tokio/src/time/driver/atomic_stack.rs
deleted file mode 100644
index 5dcc4726..00000000
--- a/tokio/src/time/driver/atomic_stack.rs
+++ /dev/null
@@ -1,124 +0,0 @@
-use crate::time::driver::Entry;
-use crate::time::error::Error;
-
-use std::ptr;
-use std::sync::atomic::AtomicPtr;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::Arc;
-
-/// A stack of `Entry` nodes
-#[derive(Debug)]
-pub(crate) struct AtomicStack {
- /// Stack head
- head: AtomicPtr<Entry>,
-}
-
-/// Entries that were removed from the stack
-#[derive(Debug)]
-pub(crate) struct AtomicStackEntries {
- ptr: *mut Entry,
-}
-
-/// Used to indicate that the timer has shutdown.
-const SHUTDOWN: *mut Entry = 1 as *mut _;
-
-impl AtomicStack {
- pub(crate) fn new() -> AtomicStack {
- AtomicStack {
- head: AtomicPtr::new(ptr::null_mut()),
- }
- }
-
- /// Pushes an entry onto the stack.
- ///
- /// Returns `true` if the entry was pushed, `false` if the entry is already
- /// on the stack, `Err` if the timer is shutdown.
- pub(crate) fn push(&self, entry: &Arc<Entry>) -> Result<bool, Error> {
- // First, set the queued bit on the entry
- let queued = entry.queued.fetch_or(true, SeqCst);
-
- if queued {
- // Already queued, nothing more to do
- return Ok(false);
- }
-
- let ptr = Arc::into_raw(entry.clone()) as *mut _;
-
- let mut curr = self.head.load(SeqCst);
-
- loop {
- if curr == SHUTDOWN {
- // Don't leak the entry node
- let _ = unsafe { Arc::from_raw(ptr) };
-
- return Err(Error::shutdown());
- }
-
- // Update the `next` pointer. This is safe because setting the queued
- // bit is a "lock" on this field.
- unsafe {
- *(entry.next_atomic.get()) = curr;
- }
-
- let actual = self.head.compare_and_swap(curr, ptr, SeqCst);
-
- if actual == curr {
- break;
- }
-
- curr = actual;
- }
-
- Ok(true)
- }
-
- /// Takes all entries from the stack
- pub(crate) fn take(&self) -> AtomicStackEntries {
- let ptr = self.head.swap(ptr::null_mut(), SeqCst);
- AtomicStackEntries { ptr }
- }
-
- /// Drains all remaining nodes in the stack and prevent any new nodes from
- /// being pushed onto the stack.
- pub(crate) fn shutdown(&self) {
- // Shutdown the processing queue
- let ptr = self.head.swap(SHUTDOWN, SeqCst);
-
- // Let the drop fn of `AtomicStackEntries` handle draining the stack
- drop(AtomicStackEntries { ptr });
- }
-}
-
-// ===== impl AtomicStackEntries =====
-
-impl Iterator for AtomicStackEntries {
- type Item = Arc<Entry>;
-
- fn next(&mut self) -> Option<Self::Item> {
- if self.ptr.is_null() || self.ptr == SHUTDOWN {
- return None;
- }
-
- // Convert the pointer to an `Arc<Entry>`
- let entry = unsafe { Arc::from_raw(self.ptr) };
-
- // Update `self.ptr` to point to the next element of the stack
- self.ptr = unsafe { *entry.next_atomic.get() };
-
- // Unset the queued flag
- let res = entry.queued.fetch_and(false, SeqCst);
- debug_assert!(res);
-
- // Return the entry
- Some(entry)
- }
-}
-
-impl Drop for AtomicStackEntries {
- fn drop(&mut self) {
- for entry in self {
- // Flag the entry as errored
- entry.error(Error::shutdown());
- }
- }
-}
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index b40cae73..e0926797 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -1,362 +1,684 @@
-use crate::loom::sync::atomic::AtomicU64;
-use crate::sync::AtomicWaker;
-use crate::time::driver::{Handle, Inner};
-use crate::time::{error::Error, Duration, Instant};
-
-use std::cell::UnsafeCell;
-use std::ptr;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::atomic::{AtomicBool, AtomicU8};
-use std::sync::{Arc, Weak};
-use std::task::{self, Poll};
-use std::u64;
-
-/// Internal state shared between a `Sleep` instance and the timer.
-///
-/// This struct is used as a node in two intrusive data structures:
-///
-/// * An atomic stack used to signal to the timer thread that the entry state
-/// has changed. The timer thread will observe the entry on this stack and
-/// perform any actions as necessary.
-///
-/// * A doubly linked list used **only** by the timer thread. Each slot in the
-/// timer wheel is a head pointer to the list of entries that must be
-/// processed during that timer tick.
-#[derive(Debug)]
-pub(crate) struct Entry {
- /// Only accessed from `Registration`.
- time: CachePadded<UnsafeCell<Time>>,
-
- /// Timer internals. Using a weak pointer allows the timer to shutdown
- /// without all `Sleep` instances having completed.
- ///
- /// When empty, it means that the entry has not yet been linked with a
- /// timer instance.
- inner: Weak<Inner>,
-
- /// Tracks the entry state. This value contains the following information:
- ///
- /// * The deadline at which the entry must be "fired".
- /// * A flag indicating if the entry has already been fired.
- /// * Whether or not the entry transitioned to the error state.
- ///
- /// When an `Entry` is created, `state` is initialized to the instant at
- /// which the entry must be fired. When a timer is reset to a different
- /// instant, this value is changed.
- state: AtomicU64,
+//! Timer state structures.
+//!
+//! This module contains the heart of the intrusive timer implementation, and as
+//! such the structures inside are full of tricky concurrency and unsafe code.
+//!
+//! # Ground rules
+//!
+//! The heart of the timer implementation here is the `TimerShared` structure,
+//! shared between the `TimerEntry` and the driver. Generally, we permit access
+//! to `TimerShared` ONLY via either 1) a mutable reference to `TimerEntry` or
+//! 2) a held driver lock.
+//!
+//! It follows from this that any changes made while holding BOTH 1 and 2 will
+//! be reliably visible, regardless of ordering. This is because of the acq/rel
+//! fences on the driver lock ensuring ordering with 2, and rust mutable
+//! reference rules for 1 (a mutable reference to an object can't be passed
+//! between threads without an acq/rel barrier, and same-thread we have local
+//! happens-before ordering).
+//!
+//! # State field
+//!
+//! Each timer has a state field associated with it. This field contains either
+//! the current scheduled time, or a special flag value indicating its state.
+//! This state can either indicate that the timer is on the 'pending' queue (and
+//! thus will be fired with an `Ok(())` result soon) or that it has already been
+//! fired/deregistered.
+//!
+//! This single state field allows for code that is firing the timer to
+//! synchronize with any racing `reset` calls reliably.
+//!
+//! # Cached vs true timeouts
+//!
+//! To allow for the use case of a timeout that is periodically reset before
+//! expiration to be as lightweight as possible, we support optimistically
+//! lock-free timer resets, in the case where a timer is rescheduled to a later
+//! point than it was originally scheduled for.
+//!
+//! This is accomplished by lazily rescheduling timers. That is, we update the
+//! state field field with the true expiration of the timer from the holder of
+//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
+//! walking lists of timers), it checks this "true when" value, and reschedules
+//! based on it.
+//!
+//! We do, however, also need to track what the expiration time was when we
+//! originally registered the timer; this is used to locate the right linked
+//! list when the timer is being cancelled. This is referred to as the "cached
+//! when" internally.
+//!
+//! There is of course a race condition between timer reset and timer
+//! expiration. If the driver fails to observe the updated expiration time, it
+//! could trigger expiration of the timer too early. However, because
+//! `mark_pending` performs a compare-and-swap, it will identify this race and
+//! refuse to mark the timer as pending.
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::Ordering;
- /// Stores the actual error. If `state` indicates that an error occurred,
- /// this is guaranteed to be a non-zero value representing the first error
- /// that occurred. Otherwise its value is undefined.
- error: AtomicU8,
+use crate::sync::AtomicWaker;
+use crate::time::Instant;
+use crate::util::linked_list;
- /// Task to notify once the deadline is reached.
- waker: AtomicWaker,
+use super::Handle;
- /// True when the entry is queued in the "process" stack. This value
- /// is set before pushing the value and unset after popping the value.
- ///
- /// TODO: This could possibly be rolled up into `state`.
- pub(super) queued: AtomicBool,
-
- /// Next entry in the "process" linked list.
- ///
- /// Access to this field is coordinated by the `queued` flag.
- ///
- /// Represents a strong Arc ref.
- pub(super) next_atomic: UnsafeCell<*mut Entry>,
+use std::cell::UnsafeCell as StdUnsafeCell;
+use std::task::{Context, Poll, Waker};
+use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
- /// When the entry expires, relative to the `start` of the timer
- /// (Inner::start). This is only used by the timer.
- ///
- /// A `Sleep` instance can be reset to a different deadline by the thread
- /// that owns the `Sleep` instance. In this case, the timer thread will not
- /// immediately know that this has happened. The timer thread must know the
- /// last deadline that it saw as it uses this value to locate the entry in
- /// its wheel.
- ///
- /// Once the timer thread observes that the instant has changed, it updates
- /// the wheel and sets this value. The idea is that this value eventually
- /// converges to the value of `state` as the timer thread makes updates.
- when: UnsafeCell<Option<u64>>,
+type TimerResult = Result<(), crate::time::error::Error>;
- /// Next entry in the State's linked list.
- ///
- /// This is only accessed by the timer
- pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>,
+const STATE_DEREGISTERED: u64 = u64::max_value();
+const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
+const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
- /// Previous entry in the State's linked list.
- ///
- /// This is only accessed by the timer and is used to unlink a canceled
- /// entry.
- ///
- /// This is a weak reference.
- pub(crate) prev_stack: UnsafeCell<*const Entry>,
-}
-
-/// Stores the info for `Sleep`.
+/// Not all platforms support 64-bit compare-and-swap. This hack replaces the
+/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow,
+/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now.
+///
+/// Note: We use "x86 or 64-bit pointers" as the condition here because
+/// target_has_atomic is not stable.
+#[cfg(all(
+ not(tokio_force_time_entry_locked),
+ any(target_arch = "x86", target_pointer_width = "64")
+))]
+type AtomicU64 = crate::loom::sync::atomic::AtomicU64;
+
+#[cfg(not(all(
+ not(tokio_force_time_entry_locked),
+ any(target_arch = "x86", target_pointer_width = "64")
+)))]
#[derive(Debug)]
-pub(crate) struct Time {
- pub(crate) deadline: Instant,
- pub(crate) duration: Duration,
+struct AtomicU64 {
+ inner: crate::loom::sync::Mutex<u64>,
}
-/// Flag indicating a timer entry has elapsed
-const ELAPSED: u64 = 1 << 63;
-
-/// Flag indicating a timer entry has reached an error state
-const ERROR: u64 = u64::MAX;
+#[cfg(not(all(
+ not(tokio_force_time_entry_locked),
+ any(target_arch = "x86", target_pointer_width = "64")
+)))]
+impl AtomicU64 {
+ fn new(v: u64) -> Self {
+ Self {
+ inner: crate::loom::sync::Mutex::new(v),
+ }
+ }
-// ===== impl Entry =====
+ fn load(&self, _order: Ordering) -> u64 {
+ debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
+ *self.inner.lock()
+ }
-impl Entry {
- pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
- let inner = handle.inner().unwrap();
+ fn store(&self, v: u64, _order: Ordering) {
+ debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
+ *self.inner.lock() = v;
+ }
- // Attempt to increment the number of active timeouts
- let entry = if let Err(err) = inner.increment() {
- let entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
- entry.error(err);
- entry
+ fn compare_exchange(
+ &self,
+ current: u64,
+ new: u64,
+ _success: Ordering,
+ _failure: Ordering,
+ ) -> Result<u64, u64> {
+ debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock
+ debug_assert_ne!(_failure, Ordering::SeqCst);
+
+ let mut lock = self.inner.lock();
+
+ if *lock == current {
+ *lock = new;
+ Ok(current)
} else {
- let when = inner.normalize_deadline(deadline);
- let state = if when <= inner.elapsed() {
- ELAPSED
- } else {
- when
- };
- Entry::new2(deadline, duration, Arc::downgrade(&inner), state)
- };
-
- let entry = Arc::new(entry);
- if let Err(err) = inner.queue(&entry) {
- entry.error(err);
+ Err(*lock)
}
-
- entry
}
- /// Only called by `Registration`
- pub(crate) fn time_ref(&self) -> &Time {
- unsafe { &*self.time.0.get() }
+ fn compare_exchange_weak(
+ &self,
+ current: u64,
+ new: u64,
+ success: Ordering,
+ failure: Ordering,
+ ) -> Result<u64, u64> {
+ self.compare_exchange(current, new, success, failure)
}
+}
- /// Only called by `Registration`
- #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281
- pub(crate) unsafe fn time_mut(&self) -> &mut Time {
- &mut *self.time.0.get()
- }
+/// This structure holds the current shared state of the timer - its scheduled
+/// time (if registered), or otherwise the result of the timer completing, as
+/// well as the registered waker.
+///
+/// Generally, the StateCell is only permitted to be accessed from two contexts:
+/// Either a thread holding the corresponding &mut TimerEntry, or a thread
+/// holding the timer driver lock. The write actions on the StateCell amount to
+/// passing "ownership" of the StateCell between these contexts; moving a timer
+/// from the TimerEntry to the driver requires _both_ holding the &mut
+/// TimerEntry and the driver lock, while moving it back (firing the timer)
+/// requires only the driver lock.
+pub(super) struct StateCell {
+ /// Holds either the scheduled expiration time for this timer, or (if the
+ /// timer has been fired and is unregistered), [`u64::max_value()`].
+ state: AtomicU64,
+ /// If the timer is fired (an Acquire order read on state shows
+ /// `u64::max_value()`), holds the result that should be returned from
+ /// polling the timer. Otherwise, the contents are unspecified and reading
+ /// without holding the driver lock is undefined behavior.
+ result: UnsafeCell<TimerResult>,
+ /// The currently-registered waker
+ waker: CachePadded<AtomicWaker>,
+}
- pub(crate) fn when(&self) -> u64 {
- self.when_internal().expect("invalid internal state")
+impl Default for StateCell {
+ fn default() -> Self {
+ Self::new()
}
+}
- /// The current entry state as known by the timer. This is not the value of
- /// `state`, but lets the timer know how to converge its state to `state`.
- pub(crate) fn when_internal(&self) -> Option<u64> {
- unsafe { *self.when.get() }
+impl std::fmt::Debug for StateCell {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "StateCell({:?})", self.read_state())
}
+}
- pub(crate) fn set_when_internal(&self, when: Option<u64>) {
- unsafe {
- *self.when.get() = when;
+impl StateCell {
+ fn new() -> Self {
+ Self {
+ state: AtomicU64::new(STATE_DEREGISTERED),
+ result: UnsafeCell::new(Ok(())),
+ waker: CachePadded(AtomicWaker::new()),
}
}
- /// Called by `Timer` to load the current value of `state` for processing
- pub(crate) fn load_state(&self) -> Option<u64> {
- let state = self.state.load(SeqCst);
+ fn is_pending(&self) -> bool {
+ self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
+ }
- if is_elapsed(state) {
+ /// Returns the current expiration time, or None if not currently scheduled.
+ fn when(&self) -> Option<u64> {
+ let cur_state = self.state.load(Ordering::Relaxed);
+
+ if cur_state == u64::max_value() {
None
} else {
- Some(state)
+ Some(cur_state)
}
}
- pub(crate) fn is_elapsed(&self) -> bool {
- let state = self.state.load(SeqCst);
- is_elapsed(state)
+ /// If the timer is completed, returns the result of the timer. Otherwise,
+ /// returns None and registers the waker.
+ fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
+ // We must register first. This ensures that either `fire` will
+ // observe the new waker, or we will observe a racing fire to have set
+ // the state, or both.
+ self.waker.0.register_by_ref(waker);
+
+ self.read_state()
}
- pub(crate) fn fire(&self, when: u64) {
- let mut curr = self.state.load(SeqCst);
+ fn read_state(&self) -> Poll<TimerResult> {
+ let cur_state = self.state.load(Ordering::Acquire);
+
+ if cur_state == STATE_DEREGISTERED {
+ // SAFETY: The driver has fired this timer; this involves writing
+ // the result, and then writing (with release ordering) the state
+ // field.
+ Poll::Ready(unsafe { self.result.with(|p| *p) })
+ } else {
+ Poll::Pending
+ }
+ }
+
+ /// Marks this timer as being moved to the pending list, if its scheduled
+ /// time is not after `not_after`.
+ ///
+ /// If the timer is scheduled for a time after not_after, returns an Err
+ /// containing the current scheduled time.
+ ///
+ /// SAFETY: Must hold the driver lock.
+ unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
+ // Quick initial debug check to see if the timer is already fired. Since
+ // firing the timer can only happen with the driver lock held, we know
+ // we shouldn't be able to "miss" a transition to a fired state, even
+ // with relaxed ordering.
+ let mut cur_state = self.state.load(Ordering::Relaxed);
loop {
- if is_elapsed(curr) || curr > when {
- return;
- }
+ debug_assert!(cur_state < STATE_MIN_VALUE);
- let next = ELAPSED | curr;
- let actual = self.state.compare_and_swap(curr, next, SeqCst);
+ if cur_state > not_after {
+ break Err(cur_state);
+ }
- if curr == actual {
- break;
+ match self.state.compare_exchange(
+ cur_state,
+ STATE_PENDING_FIRE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ break Ok(());
+ }
+ Err(actual_state) => {
+ cur_state = actual_state;
+ }
}
+ }
+ }
- curr = actual;
+ /// Fires the timer, setting the result to the provided result.
+ ///
+ /// Returns:
+ /// * `Some(waker) - if fired and a waker needs to be invoked once the
+ /// driver lock is released
+ /// * `None` - if fired and a waker does not need to be invoked, or if
+ /// already fired
+ ///
+ /// SAFETY: The driver lock must be held.
+ unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
+ // Quick initial check to see if the timer is already fired. Since
+ // firing the timer can only happen with the driver lock held, we know
+ // we shouldn't be able to "miss" a transition to a fired state, even
+ // with relaxed ordering.
+ let cur_state = self.state.load(Ordering::Relaxed);
+ if cur_state == STATE_DEREGISTERED {
+ return None;
}
- self.waker.wake();
- }
+ // SAFETY: We assume the driver lock is held and the timer is not
+ // fired, so only the driver is accessing this field.
+ //
+ // We perform a release-ordered store to state below, to ensure this
+ // write is visible before the state update is visible.
+ unsafe { self.result.with_mut(|p| *p = result) };
+
+ self.state.store(STATE_DEREGISTERED, Ordering::Release);
- pub(crate) fn error(&self, error: Error) {
- // Record the precise nature of the error, if there isn't already an
- // error present. If we don't actually transition to the error state
- // below, that's fine, as the error details we set here will be ignored.
- self.error.compare_and_swap(0, error.as_u8(), SeqCst);
+ self.waker.0.take_waker()
+ }
- // Only transition to the error state if not currently elapsed
- let mut curr = self.state.load(SeqCst);
+ /// Marks the timer as registered (poll will return None) and sets the
+ /// expiration time.
+ ///
+ /// While this function is memory-safe, it should only be called from a
+ /// context holding both `&mut TimerEntry` and the driver lock.
+ fn set_expiration(&self, timestamp: u64) {
+ debug_assert!(timestamp < STATE_MIN_VALUE);
+
+ // We can use relaxed ordering because we hold the driver lock and will
+ // fence when we release the lock.
+ self.state.store(timestamp, Ordering::Relaxed);
+ }
+ /// Attempts to adjust the timer to a new timestamp.
+ ///
+ /// If the timer has already been fired, is pending firing, or the new
+ /// timestamp is earlier than the old timestamp, (or occasionally
+ /// spuriously) returns Err without changing the timer's state. In this
+ /// case, the timer must be deregistered and re-registered.
+ fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
+ let mut prior = self.state.load(Ordering::Relaxed);
loop {
- if is_elapsed(curr) {
- return;
+ if new_timestamp < prior || prior >= STATE_MIN_VALUE {
+ return Err(());
}
- let next = ERROR;
+ match self.state.compare_exchange_weak(
+ prior,
+ new_timestamp,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ return Ok(());
+ }
+ Err(true_prior) => {
+ prior = true_prior;
+ }
+ }
+ }
+ }
- let actual = self.state.compare_and_swap(curr, next, SeqCst);
+ /// Returns true if the state of this timer indicates that the timer might
+ /// be registered with the driver. This check is performed with relaxed
+ /// ordering, but is conservative - if it returns false, the timer is
+ /// definitely _not_ registered.
+ pub(super) fn might_be_registered(&self) -> bool {
+ self.state.load(Ordering::Relaxed) != u64::max_value()
+ }
+}
- if curr == actual {
- break;
- }
+/// A timer entry.
+///
+/// This is the handle to a timer that is controlled by the requester of the
+/// timer. As this participates in intrusive data structures, it must be pinned
+/// before polling.
+#[derive(Debug)]
+pub(super) struct TimerEntry {
+ /// Arc reference to the driver. We can only free the driver after
+ /// deregistering everything from their respective timer wheels.
+ driver: Handle,
+ /// Shared inner structure; this is part of an intrusive linked list, and
+ /// therefore other references can exist to it while mutable references to
+ /// Entry exist.
+ ///
+ /// This is manipulated only under the inner mutex. TODO: Can we use loom
+ /// cells for this?
+ inner: StdUnsafeCell<TimerShared>,
+ /// Initial deadline for the timer. This is used to register on the first
+ /// poll, as we can't register prior to being pinned.
+ initial_deadline: Option<Instant>,
+}
+
+unsafe impl Send for TimerEntry {}
+unsafe impl Sync for TimerEntry {}
+
+/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the
+/// timer entry. Generally, at most one TimerHandle exists for a timer at a time
+/// (enforced by the timer state machine).
+///
+/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats
+/// of pointer safety apply. In particular, TimerHandle does not itself enforce
+/// that the timer does still exist; however, normally an TimerHandle is created
+/// immediately before registering the timer, and is consumed when firing the
+/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce
+/// memory safety, all operations are unsafe.
+#[derive(Debug)]
+pub(crate) struct TimerHandle {
+ inner: NonNull<TimerShared>,
+}
+
+pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
+
+/// The shared state structure of a timer. This structure is shared between the
+/// frontend (`Entry`) and driver backend.
+///
+/// Note that this structure is located inside the `TimerEntry` structure.
+#[derive(Debug)]
+pub(crate) struct TimerShared {
+ /// Current state. This records whether the timer entry is currently under
+ /// the ownership of the driver, and if not, its current state (not
+ /// complete, fired, error, etc).
+ state: StateCell,
+
+ /// Data manipulated by the driver thread itself, only.
+ driver_state: CachePadded<TimerSharedPadded>,
- curr = actual;
+ _p: PhantomPinned,
+}
+
+impl TimerShared {
+ pub(super) fn new() -> Self {
+ Self {
+ state: StateCell::default(),
+ driver_state: CachePadded(TimerSharedPadded::new()),
+ _p: PhantomPinned,
}
+ }
- self.waker.wake();
+ /// Gets the cached time-of-expiration value
+ pub(super) fn cached_when(&self) -> u64 {
+ // Cached-when is only accessed under the driver lock, so we can use relaxed
+ self.driver_state.0.cached_when.load(Ordering::Relaxed)
}
- pub(crate) fn cancel(entry: &Arc<Entry>) {
- let state = entry.state.fetch_or(ELAPSED, SeqCst);
+ /// Gets the true time-of-expiration value, and copies it into the cached
+ /// time-of-expiration value.
+ ///
+ /// SAFETY: Must be called with the driver lock held, and when this entry is
+ /// not in any timer wheel lists.
+ pub(super) unsafe fn sync_when(&self) -> u64 {
+ let true_when = self.true_when();
- if is_elapsed(state) {
- // Nothing more to do
- return;
- }
+ self.driver_state
+ .0
+ .cached_when
+ .store(true_when, Ordering::Relaxed);
+
+ true_when
+ }
- // If registered with a timer instance, try to upgrade the Arc.
- let inner = match entry.upgrade_inner() {
- Some(inner) => inner,
- None => return,
- };
+ /// Returns the true time-of-expiration value, with relaxed memory ordering.
+ pub(super) fn true_when(&self) -> u64 {
+ self.state.when().expect("Timer already fired")
+ }
- let _ = inner.queue(entry);
+ /// Sets the true time-of-expiration value, even if it is less than the
+ /// current expiration or the timer is deregistered.
+ ///
+ /// SAFETY: Must only be called with the driver lock held and the entry not
+ /// in the timer wheel.
+ pub(super) unsafe fn set_expiration(&self, t: u64) {
+ self.state.set_expiration(t);
+ self.driver_state.0.cached_when.store(t, Ordering::Relaxed);
}
- pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
- let mut curr = self.state.load(SeqCst);
+ /// Sets the true time-of-expiration only if it is after the current.
+ pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
+ self.state.extend_expiration(t)
+ }
- if is_elapsed(curr) {
- return Poll::Ready(if curr == ERROR {
- Err(Error::from_u8(self.error.load(SeqCst)))
- } else {
- Ok(())
- });
+ /// Returns a TimerHandle for this timer.
+ pub(super) fn handle(&self) -> TimerHandle {
+ TimerHandle {
+ inner: NonNull::from(self),
}
+ }
- self.waker.register_by_ref(cx.waker());
+ /// Returns true if the state of this timer indicates that the timer might
+ /// be registered with the driver. This check is performed with relaxed
+ /// ordering, but is conservative - if it returns false, the timer is
+ /// definitely _not_ registered.
+ pub(super) fn might_be_registered(&self) -> bool {
+ self.state.might_be_registered()
+ }
+}
- curr = self.state.load(SeqCst);
+/// Additional shared state between the driver and the timer which is cache
+/// padded. This contains the information that the driver thread accesses most
+/// frequently to minimize contention. In particular, we move it away from the
+/// waker, as the waker is updated on every poll.
+struct TimerSharedPadded {
+ /// The expiration time for which this ent