From ae67851f11b7cc1f577de8ce21767ce3e2c7aff9 Mon Sep 17 00:00:00 2001 From: bdonlan Date: Mon, 23 Nov 2020 10:42:50 -0800 Subject: time: use intrusive lists for timer tracking (#3080) More-or-less a half-rewrite of the current time driver, supporting the use of intrusive futures for timer registration. Fixes: #3028, #3069 --- .github/workflows/ci.yml | 1 + tokio-util/src/time/delay_queue.rs | 22 +- tokio-util/tests/time_delay_queue.rs | 24 +- tokio/src/loom/std/mod.rs | 2 +- tokio/src/stream/throttle.rs | 2 +- tokio/src/stream/timeout.rs | 2 +- tokio/src/time/driver/atomic_stack.rs | 124 ----- tokio/src/time/driver/entry.rs | 854 +++++++++++++++++++++++----------- tokio/src/time/driver/handle.rs | 41 +- tokio/src/time/driver/mod.rs | 494 +++++++++++--------- tokio/src/time/driver/sleep.rs | 133 ++++++ tokio/src/time/driver/tests/mod.rs | 246 +++++++++- tokio/src/time/driver/wheel/level.rs | 277 +++++++++++ tokio/src/time/driver/wheel/mod.rs | 350 ++++++++++++++ tokio/src/time/driver/wheel/stack.rs | 112 +++++ tokio/src/time/error.rs | 26 +- tokio/src/time/mod.rs | 43 +- tokio/src/time/sleep.rs | 141 ------ tokio/src/time/tests/mod.rs | 2 +- tokio/src/time/tests/test_sleep.rs | 12 +- tokio/src/time/timeout.rs | 2 +- tokio/src/time/wheel/level.rs | 259 ----------- tokio/src/time/wheel/mod.rs | 295 ------------ tokio/src/time/wheel/stack.rs | 112 ----- tokio/src/util/mod.rs | 1 + tokio/tests/macros_select.rs | 8 +- tokio/tests/stream_timeout.rs | 2 +- tokio/tests/sync_mutex.rs | 7 +- tokio/tests/sync_mutex_owned.rs | 7 +- tokio/tests/time_interval.rs | 3 +- tokio/tests/time_rt.rs | 2 +- tokio/tests/time_sleep.rs | 145 +++++- 32 files changed, 2195 insertions(+), 1556 deletions(-) delete mode 100644 tokio/src/time/driver/atomic_stack.rs create mode 100644 tokio/src/time/driver/sleep.rs create mode 100644 tokio/src/time/driver/wheel/level.rs create mode 100644 tokio/src/time/driver/wheel/mod.rs create mode 100644 tokio/src/time/driver/wheel/stack.rs delete mode 100644 tokio/src/time/sleep.rs delete mode 100644 tokio/src/time/wheel/level.rs delete mode 100644 tokio/src/time/wheel/mod.rs delete mode 100644 tokio/src/time/wheel/stack.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9bf58b4a..dd4972ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -260,6 +260,7 @@ jobs: - loom_pool::group_b - loom_pool::group_c - loom_pool::group_d + - time::driver steps: - uses: actions/checkout@v2 - name: Install Rust diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index 000b4423..4edd5cd6 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -14,7 +14,7 @@ use std::cmp; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; -use std::task::{self, Poll}; +use std::task::{self, Poll, Waker}; /// A queue of delayed elements. /// @@ -145,6 +145,11 @@ pub struct DelayQueue { /// Instant at which the timer starts start: Instant, + + /// Waker that is invoked when we potentially need to reset the timer. + /// Because we lazily create the timer when the first entry is created, we + /// need to awaken any poller that polled us before that point. + waker: Option, } /// An entry in `DelayQueue` that has expired and removed. @@ -253,6 +258,7 @@ impl DelayQueue { delay: None, wheel_now: 0, start: Instant::now(), + waker: None, } } @@ -330,6 +336,10 @@ impl DelayQueue { }; if should_set_delay { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + let delay_time = self.start + Duration::from_millis(when); if let Some(ref mut delay) = &mut self.delay { delay.reset(delay_time); @@ -348,6 +358,15 @@ impl DelayQueue { &mut self, cx: &mut task::Context<'_>, ) -> Poll, Error>>> { + if !self + .waker + .as_ref() + .map(|w| w.will_wake(cx.waker())) + .unwrap_or(false) + { + self.waker = Some(cx.waker().clone()); + } + let item = ready!(self.poll_idx(cx)); Poll::Ready(item.map(|result| { result.map(|idx| { @@ -533,6 +552,7 @@ impl DelayQueue { let next_deadline = self.next_deadline(); if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { + // This should awaken us if necessary (ie, if already expired) delay.reset(deadline); } } diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index 42a56b8b..d42dca87 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -2,7 +2,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, sleep, Duration, Instant}; +use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_ok, assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; @@ -107,9 +107,10 @@ async fn multi_delay_at_start() { assert_pending!(poll!(queue)); assert!(!queue.is_woken()); + let start = Instant::now(); for elapsed in 0..1200 { - sleep(ms(1)).await; let elapsed = elapsed + 1; + tokio::time::sleep_until(start + ms(elapsed)).await; if delays.contains(&elapsed) { assert!(queue.is_woken()); @@ -117,7 +118,12 @@ async fn multi_delay_at_start() { assert_pending!(poll!(queue)); } else if queue.is_woken() { let cascade = &[192, 960]; - assert!(cascade.contains(&elapsed), "elapsed={}", elapsed); + assert!( + cascade.contains(&elapsed), + "elapsed={} dt={:?}", + elapsed, + Instant::now() - start + ); assert_pending!(poll!(queue)); } @@ -205,7 +211,7 @@ async fn reset_much_later() { sleep(ms(3)).await; - queue.reset_at(&key, now + ms(5)); + queue.reset_at(&key, now + ms(10)); sleep(ms(20)).await; @@ -402,7 +408,7 @@ async fn insert_before_first_after_poll() { sleep(ms(99)).await; - assert!(!queue.is_woken()); + assert_pending!(poll!(queue)); sleep(ms(1)).await; @@ -457,7 +463,7 @@ async fn reset_later_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(80)).await; + sleep_until(now + Duration::from_millis(80)).await; assert!(!queue.is_woken()); @@ -472,7 +478,7 @@ async fn reset_later_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(39)).await; + sleep_until(now + Duration::from_millis(119)).await; assert!(!queue.is_woken()); sleep(ms(1)).await; @@ -515,7 +521,7 @@ async fn reset_earlier_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(80)).await; + sleep_until(now + Duration::from_millis(80)).await; assert!(!queue.is_woken()); @@ -530,7 +536,7 @@ async fn reset_earlier_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(39)).await; + sleep_until(now + Duration::from_millis(119)).await; assert!(!queue.is_woken()); sleep(ms(1)).await; diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 414ef906..c3f74efb 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -47,7 +47,7 @@ pub(crate) mod rand { } pub(crate) mod sync { - pub(crate) use std::sync::Arc; + pub(crate) use std::sync::{Arc, Weak}; // Below, make sure all the feature-influenced types are exported for // internal use. Note however that some are not _currently_ named by diff --git a/tokio/src/stream/throttle.rs b/tokio/src/stream/throttle.rs index 8f4a256d..ff1fbf01 100644 --- a/tokio/src/stream/throttle.rs +++ b/tokio/src/stream/throttle.rs @@ -17,7 +17,7 @@ where let delay = if duration == Duration::from_millis(0) { None } else { - Some(Sleep::new_timeout(Instant::now() + duration, duration)) + Some(Sleep::new_timeout(Instant::now() + duration)) }; Throttle { diff --git a/tokio/src/stream/timeout.rs b/tokio/src/stream/timeout.rs index 669973ff..61154da0 100644 --- a/tokio/src/stream/timeout.rs +++ b/tokio/src/stream/timeout.rs @@ -23,7 +23,7 @@ pin_project! { impl Timeout { pub(super) fn new(stream: S, duration: Duration) -> Self { let next = Instant::now() + duration; - let deadline = Sleep::new_timeout(next, duration); + let deadline = Sleep::new_timeout(next); Timeout { stream: Fuse::new(stream), diff --git a/tokio/src/time/driver/atomic_stack.rs b/tokio/src/time/driver/atomic_stack.rs deleted file mode 100644 index 5dcc4726..00000000 --- a/tokio/src/time/driver/atomic_stack.rs +++ /dev/null @@ -1,124 +0,0 @@ -use crate::time::driver::Entry; -use crate::time::error::Error; - -use std::ptr; -use std::sync::atomic::AtomicPtr; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::Arc; - -/// A stack of `Entry` nodes -#[derive(Debug)] -pub(crate) struct AtomicStack { - /// Stack head - head: AtomicPtr, -} - -/// Entries that were removed from the stack -#[derive(Debug)] -pub(crate) struct AtomicStackEntries { - ptr: *mut Entry, -} - -/// Used to indicate that the timer has shutdown. -const SHUTDOWN: *mut Entry = 1 as *mut _; - -impl AtomicStack { - pub(crate) fn new() -> AtomicStack { - AtomicStack { - head: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Pushes an entry onto the stack. - /// - /// Returns `true` if the entry was pushed, `false` if the entry is already - /// on the stack, `Err` if the timer is shutdown. - pub(crate) fn push(&self, entry: &Arc) -> Result { - // First, set the queued bit on the entry - let queued = entry.queued.fetch_or(true, SeqCst); - - if queued { - // Already queued, nothing more to do - return Ok(false); - } - - let ptr = Arc::into_raw(entry.clone()) as *mut _; - - let mut curr = self.head.load(SeqCst); - - loop { - if curr == SHUTDOWN { - // Don't leak the entry node - let _ = unsafe { Arc::from_raw(ptr) }; - - return Err(Error::shutdown()); - } - - // Update the `next` pointer. This is safe because setting the queued - // bit is a "lock" on this field. - unsafe { - *(entry.next_atomic.get()) = curr; - } - - let actual = self.head.compare_and_swap(curr, ptr, SeqCst); - - if actual == curr { - break; - } - - curr = actual; - } - - Ok(true) - } - - /// Takes all entries from the stack - pub(crate) fn take(&self) -> AtomicStackEntries { - let ptr = self.head.swap(ptr::null_mut(), SeqCst); - AtomicStackEntries { ptr } - } - - /// Drains all remaining nodes in the stack and prevent any new nodes from - /// being pushed onto the stack. - pub(crate) fn shutdown(&self) { - // Shutdown the processing queue - let ptr = self.head.swap(SHUTDOWN, SeqCst); - - // Let the drop fn of `AtomicStackEntries` handle draining the stack - drop(AtomicStackEntries { ptr }); - } -} - -// ===== impl AtomicStackEntries ===== - -impl Iterator for AtomicStackEntries { - type Item = Arc; - - fn next(&mut self) -> Option { - if self.ptr.is_null() || self.ptr == SHUTDOWN { - return None; - } - - // Convert the pointer to an `Arc` - let entry = unsafe { Arc::from_raw(self.ptr) }; - - // Update `self.ptr` to point to the next element of the stack - self.ptr = unsafe { *entry.next_atomic.get() }; - - // Unset the queued flag - let res = entry.queued.fetch_and(false, SeqCst); - debug_assert!(res); - - // Return the entry - Some(entry) - } -} - -impl Drop for AtomicStackEntries { - fn drop(&mut self) { - for entry in self { - // Flag the entry as errored - entry.error(Error::shutdown()); - } - } -} diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index b40cae73..e0926797 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -1,362 +1,684 @@ -use crate::loom::sync::atomic::AtomicU64; -use crate::sync::AtomicWaker; -use crate::time::driver::{Handle, Inner}; -use crate::time::{error::Error, Duration, Instant}; - -use std::cell::UnsafeCell; -use std::ptr; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::{AtomicBool, AtomicU8}; -use std::sync::{Arc, Weak}; -use std::task::{self, Poll}; -use std::u64; - -/// Internal state shared between a `Sleep` instance and the timer. -/// -/// This struct is used as a node in two intrusive data structures: -/// -/// * An atomic stack used to signal to the timer thread that the entry state -/// has changed. The timer thread will observe the entry on this stack and -/// perform any actions as necessary. -/// -/// * A doubly linked list used **only** by the timer thread. Each slot in the -/// timer wheel is a head pointer to the list of entries that must be -/// processed during that timer tick. -#[derive(Debug)] -pub(crate) struct Entry { - /// Only accessed from `Registration`. - time: CachePadded>, - - /// Timer internals. Using a weak pointer allows the timer to shutdown - /// without all `Sleep` instances having completed. - /// - /// When empty, it means that the entry has not yet been linked with a - /// timer instance. - inner: Weak, - - /// Tracks the entry state. This value contains the following information: - /// - /// * The deadline at which the entry must be "fired". - /// * A flag indicating if the entry has already been fired. - /// * Whether or not the entry transitioned to the error state. - /// - /// When an `Entry` is created, `state` is initialized to the instant at - /// which the entry must be fired. When a timer is reset to a different - /// instant, this value is changed. - state: AtomicU64, +//! Timer state structures. +//! +//! This module contains the heart of the intrusive timer implementation, and as +//! such the structures inside are full of tricky concurrency and unsafe code. +//! +//! # Ground rules +//! +//! The heart of the timer implementation here is the `TimerShared` structure, +//! shared between the `TimerEntry` and the driver. Generally, we permit access +//! to `TimerShared` ONLY via either 1) a mutable reference to `TimerEntry` or +//! 2) a held driver lock. +//! +//! It follows from this that any changes made while holding BOTH 1 and 2 will +//! be reliably visible, regardless of ordering. This is because of the acq/rel +//! fences on the driver lock ensuring ordering with 2, and rust mutable +//! reference rules for 1 (a mutable reference to an object can't be passed +//! between threads without an acq/rel barrier, and same-thread we have local +//! happens-before ordering). +//! +//! # State field +//! +//! Each timer has a state field associated with it. This field contains either +//! the current scheduled time, or a special flag value indicating its state. +//! This state can either indicate that the timer is on the 'pending' queue (and +//! thus will be fired with an `Ok(())` result soon) or that it has already been +//! fired/deregistered. +//! +//! This single state field allows for code that is firing the timer to +//! synchronize with any racing `reset` calls reliably. +//! +//! # Cached vs true timeouts +//! +//! To allow for the use case of a timeout that is periodically reset before +//! expiration to be as lightweight as possible, we support optimistically +//! lock-free timer resets, in the case where a timer is rescheduled to a later +//! point than it was originally scheduled for. +//! +//! This is accomplished by lazily rescheduling timers. That is, we update the +//! state field field with the true expiration of the timer from the holder of +//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's +//! walking lists of timers), it checks this "true when" value, and reschedules +//! based on it. +//! +//! We do, however, also need to track what the expiration time was when we +//! originally registered the timer; this is used to locate the right linked +//! list when the timer is being cancelled. This is referred to as the "cached +//! when" internally. +//! +//! There is of course a race condition between timer reset and timer +//! expiration. If the driver fails to observe the updated expiration time, it +//! could trigger expiration of the timer too early. However, because +//! `mark_pending` performs a compare-and-swap, it will identify this race and +//! refuse to mark the timer as pending. + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::Ordering; - /// Stores the actual error. If `state` indicates that an error occurred, - /// this is guaranteed to be a non-zero value representing the first error - /// that occurred. Otherwise its value is undefined. - error: AtomicU8, +use crate::sync::AtomicWaker; +use crate::time::Instant; +use crate::util::linked_list; - /// Task to notify once the deadline is reached. - waker: AtomicWaker, +use super::Handle; - /// True when the entry is queued in the "process" stack. This value - /// is set before pushing the value and unset after popping the value. - /// - /// TODO: This could possibly be rolled up into `state`. - pub(super) queued: AtomicBool, - - /// Next entry in the "process" linked list. - /// - /// Access to this field is coordinated by the `queued` flag. - /// - /// Represents a strong Arc ref. - pub(super) next_atomic: UnsafeCell<*mut Entry>, +use std::cell::UnsafeCell as StdUnsafeCell; +use std::task::{Context, Poll, Waker}; +use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; - /// When the entry expires, relative to the `start` of the timer - /// (Inner::start). This is only used by the timer. - /// - /// A `Sleep` instance can be reset to a different deadline by the thread - /// that owns the `Sleep` instance. In this case, the timer thread will not - /// immediately know that this has happened. The timer thread must know the - /// last deadline that it saw as it uses this value to locate the entry in - /// its wheel. - /// - /// Once the timer thread observes that the instant has changed, it updates - /// the wheel and sets this value. The idea is that this value eventually - /// converges to the value of `state` as the timer thread makes updates. - when: UnsafeCell>, +type TimerResult = Result<(), crate::time::error::Error>; - /// Next entry in the State's linked list. - /// - /// This is only accessed by the timer - pub(crate) next_stack: UnsafeCell>>, +const STATE_DEREGISTERED: u64 = u64::max_value(); +const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; +const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; - /// Previous entry in the State's linked list. - /// - /// This is only accessed by the timer and is used to unlink a canceled - /// entry. - /// - /// This is a weak reference. - pub(crate) prev_stack: UnsafeCell<*const Entry>, -} - -/// Stores the info for `Sleep`. +/// Not all platforms support 64-bit compare-and-swap. This hack replaces the +/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow, +/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now. +/// +/// Note: We use "x86 or 64-bit pointers" as the condition here because +/// target_has_atomic is not stable. +#[cfg(all( + not(tokio_force_time_entry_locked), + any(target_arch = "x86", target_pointer_width = "64") +))] +type AtomicU64 = crate::loom::sync::atomic::AtomicU64; + +#[cfg(not(all( + not(tokio_force_time_entry_locked), + any(target_arch = "x86", target_pointer_width = "64") +)))] #[derive(Debug)] -pub(crate) struct Time { - pub(crate) deadline: Instant, - pub(crate) duration: Duration, +struct AtomicU64 { + inner: crate::loom::sync::Mutex, } -/// Flag indicating a timer entry has elapsed -const ELAPSED: u64 = 1 << 63; - -/// Flag indicating a timer entry has reached an error state -const ERROR: u64 = u64::MAX; +#[cfg(not(all( + not(tokio_force_time_entry_locked), + any(target_arch = "x86", target_pointer_width = "64") +)))] +impl AtomicU64 { + fn new(v: u64) -> Self { + Self { + inner: crate::loom::sync::Mutex::new(v), + } + } -// ===== impl Entry ===== + fn load(&self, _order: Ordering) -> u64 { + debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock + *self.inner.lock() + } -impl Entry { - pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc { - let inner = handle.inner().unwrap(); + fn store(&self, v: u64, _order: Ordering) { + debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock + *self.inner.lock() = v; + } - // Attempt to increment the number of active timeouts - let entry = if let Err(err) = inner.increment() { - let entry = Entry::new2(deadline, duration, Weak::new(), ERROR); - entry.error(err); - entry + fn compare_exchange( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result { + debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock + debug_assert_ne!(_failure, Ordering::SeqCst); + + let mut lock = self.inner.lock(); + + if *lock == current { + *lock = new; + Ok(current) } else { - let when = inner.normalize_deadline(deadline); - let state = if when <= inner.elapsed() { - ELAPSED - } else { - when - }; - Entry::new2(deadline, duration, Arc::downgrade(&inner), state) - }; - - let entry = Arc::new(entry); - if let Err(err) = inner.queue(&entry) { - entry.error(err); + Err(*lock) } - - entry } - /// Only called by `Registration` - pub(crate) fn time_ref(&self) -> &Time { - unsafe { &*self.time.0.get() } + fn compare_exchange_weak( + &self, + current: u64, + new: u64, + success: Ordering, + failure: Ordering, + ) -> Result { + self.compare_exchange(current, new, success, failure) } +} - /// Only called by `Registration` - #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281 - pub(crate) unsafe fn time_mut(&self) -> &mut Time { - &mut *self.time.0.get() - } +/// This structure holds the current shared state of the timer - its scheduled +/// time (if registered), or otherwise the result of the timer completing, as +/// well as the registered waker. +/// +/// Generally, the StateCell is only permitted to be accessed from two contexts: +/// Either a thread holding the corresponding &mut TimerEntry, or a thread +/// holding the timer driver lock. The write actions on the StateCell amount to +/// passing "ownership" of the StateCell between these contexts; moving a timer +/// from the TimerEntry to the driver requires _both_ holding the &mut +/// TimerEntry and the driver lock, while moving it back (firing the timer) +/// requires only the driver lock. +pub(super) struct StateCell { + /// Holds either the scheduled expiration time for this timer, or (if the + /// timer has been fired and is unregistered), [`u64::max_value()`]. + state: AtomicU64, + /// If the timer is fired (an Acquire order read on state shows + /// `u64::max_value()`), holds the result that should be returned from + /// polling the timer. Otherwise, the contents are unspecified and reading + /// without holding the driver lock is undefined behavior. + result: UnsafeCell, + /// The currently-registered waker + waker: CachePadded, +} - pub(crate) fn when(&self) -> u64 { - self.when_internal().expect("invalid internal state") +impl Default for StateCell { + fn default() -> Self { + Self::new() } +} - /// The current entry state as known by the timer. This is not the value of - /// `state`, but lets the timer know how to converge its state to `state`. - pub(crate) fn when_internal(&self) -> Option { - unsafe { *self.when.get() } +impl std::fmt::Debug for StateCell { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "StateCell({:?})", self.read_state()) } +} - pub(crate) fn set_when_internal(&self, when: Option) { - unsafe { - *self.when.get() = when; +impl StateCell { + fn new() -> Self { + Self { + state: AtomicU64::new(STATE_DEREGISTERED), + result: UnsafeCell::new(Ok(())), + waker: CachePadded(AtomicWaker::new()), } } - /// Called by `Timer` to load the current value of `state` for processing - pub(crate) fn load_state(&self) -> Option { - let state = self.state.load(SeqCst); + fn is_pending(&self) -> bool { + self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE + } - if is_elapsed(state) { + /// Returns the current expiration time, or None if not currently scheduled. + fn when(&self) -> Option { + let cur_state = self.state.load(Ordering::Relaxed); + + if cur_state == u64::max_value() { None } else { - Some(state) + Some(cur_state) } } - pub(crate) fn is_elapsed(&self) -> bool { - let state = self.state.load(SeqCst); - is_elapsed(state) + /// If the timer is completed, returns the result of the timer. Otherwise, + /// returns None and registers the waker. + fn poll(&self, waker: &Waker) -> Poll { + // We must register first. This ensures that either `fire` will + // observe the new waker, or we will observe a racing fire to have set + // the state, or both. + self.waker.0.register_by_ref(waker); + + self.read_state() } - pub(crate) fn fire(&self, when: u64) { - let mut curr = self.state.load(SeqCst); + fn read_state(&self) -> Poll { + let cur_state = self.state.load(Ordering::Acquire); + + if cur_state == STATE_DEREGISTERED { + // SAFETY: The driver has fired this timer; this involves writing + // the result, and then writing (with release ordering) the state + // field. + Poll::Ready(unsafe { self.result.with(|p| *p) }) + } else { + Poll::Pending + } + } + + /// Marks this timer as being moved to the pending list, if its scheduled + /// time is not after `not_after`. + /// + /// If the timer is scheduled for a time after not_after, returns an Err + /// containing the current scheduled time. + /// + /// SAFETY: Must hold the driver lock. + unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { + // Quick initial debug check to see if the timer is already fired. Since + // firing the timer can only happen with the driver lock held, we know + // we shouldn't be able to "miss" a transition to a fired state, even + // with relaxed ordering. + let mut cur_state = self.state.load(Ordering::Relaxed); loop { - if is_elapsed(curr) || curr > when { - return; - } + debug_assert!(cur_state < STATE_MIN_VALUE); - let next = ELAPSED | curr; - let actual = self.state.compare_and_swap(curr, next, SeqCst); + if cur_state > not_after { + break Err(cur_state); + } - if curr == actual { - break; + match self.state.compare_exchange( + cur_state, + STATE_PENDING_FIRE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + break Ok(()); + } + Err(actual_state) => { + cur_state = actual_state; + } } + } + } - curr = actual; + /// Fires the timer, setting the result to the provided result. + /// + /// Returns: + /// * `Some(waker) - if fired and a waker needs to be invoked once the + /// driver lock is released + /// * `None` - if fired and a waker does not need to be invoked, or if + /// already fired + /// + /// SAFETY: The driver lock must be held. + unsafe fn fire(&self, result: TimerResult) -> Option { + // Quick initial check to see if the timer is already fired. Since + // firing the timer can only happen with the driver lock held, we know + // we shouldn't be able to "miss" a transition to a fired state, even + // with relaxed ordering. + let cur_state = self.state.load(Ordering::Relaxed); + if cur_state == STATE_DEREGISTERED { + return None; } - self.waker.wake(); - } + // SAFETY: We assume the driver lock is held and the timer is not + // fired, so only the driver is accessing this field. + // + // We perform a release-ordered store to state below, to ensure this + // write is visible before the state update is visible. + unsafe { self.result.with_mut(|p| *p = result) }; + + self.state.store(STATE_DEREGISTERED, Ordering::Release); - pub(crate) fn error(&self, error: Error) { - // Record the precise nature of the error, if there isn't already an - // error present. If we don't actually transition to the error state - // below, that's fine, as the error details we set here will be ignored. - self.error.compare_and_swap(0, error.as_u8(), SeqCst); + self.waker.0.take_waker() + } - // Only transition to the error state if not currently elapsed - let mut curr = self.state.load(SeqCst); + /// Marks the timer as registered (poll will return None) and sets the + /// expiration time. + /// + /// While this function is memory-safe, it should only be called from a + /// context holding both `&mut TimerEntry` and the driver lock. + fn set_expiration(&self, timestamp: u64) { + debug_assert!(timestamp < STATE_MIN_VALUE); + + // We can use relaxed ordering because we hold the driver lock and will + // fence when we release the lock. + self.state.store(timestamp, Ordering::Relaxed); + } + /// Attempts to adjust the timer to a new timestamp. + /// + /// If the timer has already been fired, is pending firing, or the new + /// timestamp is earlier than the old timestamp, (or occasionally + /// spuriously) returns Err without changing the timer's state. In this + /// case, the timer must be deregistered and re-registered. + fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { + let mut prior = self.state.load(Ordering::Relaxed); loop { - if is_elapsed(curr) { - return; + if new_timestamp < prior || prior >= STATE_MIN_VALUE { + return Err(()); } - let next = ERROR; + match self.state.compare_exchange_weak( + prior, + new_timestamp, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + return Ok(()); + } + Err(true_prior) => { + prior = true_prior; + } + } + } + } - let actual = self.state.compare_and_swap(curr, next, SeqCst); + /// Returns true if the state of this timer indicates that the timer might + /// be registered with the driver. This check is performed with relaxed + /// ordering, but is conservative - if it returns false, the timer is + /// definitely _not_ registered. + pub(super) fn might_be_registered(&self) -> bool { + self.state.load(Ordering::Relaxed) != u64::max_value() + } +} - if curr == actual { - break; - } +/// A timer entry. +/// +/// This is the handle to a timer that is controlled by the requester of the +/// timer. As this participates in intrusive data structures, it must be pinned +/// before polling. +#[derive(Debug)] +pub(super) struct TimerEntry { + /// Arc reference to the driver. We can only free the driver after + /// deregistering everything from their respective timer wheels. + driver: Handle, + /// Shared inner structure; this is part of an intrusive linked list, and + /// therefore other references can exist to it while mutable references to + /// Entry exist. + /// + /// This is manipulated only under the inner mutex. TODO: Can we use loom + /// cells for this? + inner: StdUnsafeCell, + /// Initial deadline for the timer. This is used to register on the first + /// poll, as we can't register prior to being pinned. + initial_deadline: Option, +} + +unsafe impl Send for TimerEntry {} +unsafe impl Sync for TimerEntry {} + +/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the +/// timer entry. Generally, at most one TimerHandle exists for a timer at a time +/// (enforced by the timer state machine). +/// +/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats +/// of pointer safety apply. In particular, TimerHandle does not itself enforce +/// that the timer does still exist; however, normally an TimerHandle is created +/// immediately before registering the timer, and is consumed when firing the +/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce +/// memory safety, all operations are unsafe. +#[derive(Debug)] +pub(crate) struct TimerHandle { + inner: NonNull, +} + +pub(super) type EntryList = crate::util::linked_list::LinkedList; + +/// The shared state structure of a timer. This structure is shared between the +/// frontend (`Entry`) and driver backend. +/// +/// Note that this structure is located inside the `TimerEntry` structure. +#[derive(Debug)] +pub(crate) struct TimerShared { + /// Current state. This records whether the timer entry is currently under + /// the ownership of the driver, and if not, its current state (not + /// complete, fired, error, etc). + state: StateCell, + + /// Data manipulated by the driver thread itself, only. + driver_state: CachePadded, - curr = actual; + _p: PhantomPinned, +} + +impl TimerShared { + pub(super) fn new() -> Self { + Self { + state: StateCell::default(), + driver_state: CachePadded(TimerSharedPadded::new()), + _p: PhantomPinned, } + } - self.waker.wake(); + /// Gets the cached time-of-expiration value + pub(super) fn cached_when(&self) -> u64 { + // Cached-when is only accessed under the driver lock, so we can use relaxed + self.driver_state.0.cached_when.load(Ordering::Relaxed) } - pub(crate) fn cancel(entry: &Arc) { - let state = entry.state.fetch_or(ELAPSED, SeqCst); + /// Gets the true time-of-expiration value, and copies it into the cached + /// time-of-expiration value. + /// + /// SAFETY: Must be called with the driver lock held, and when this entry is + /// not in any timer wheel lists. + pub(super) unsafe fn sync_when(&self) -> u64 { + let true_when = self.true_when(); - if is_elapsed(state) { - // Nothing more to do - return; - } + self.driver_state + .0 + .cached_when + .store(true_when, Ordering::Relaxed); + + true_when + } - // If registered with a timer instance, try to upgrade the Arc. - let inner = match entry.upgrade_inner() { - Some(inner) => inner, - None => return, - }; + /// Returns the true time-of-expiration value, with relaxed memory ordering. + pub(super) fn true_when(&self) -> u64 { + self.state.when().expect("Timer already fired") + } - let _ = inner.queue(entry); + /// Sets the true time-of-expiration value, even if it is less than the + /// current expiration or the timer is deregistered. + /// + /// SAFETY: Must only be called with the driver lock held and the entry not + /// in the timer wheel. + pub(super) unsafe fn set_expiration(&self, t: u64) { + self.state.set_expiration(t); + self.driver_state.0.cached_when.store(t, Ordering::Relaxed); } - pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { - let mut curr = self.state.load(SeqCst); + /// Sets the true time-of-expiration only if it is after the current. + pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { + self.state.extend_expiration(t) + } - if is_elapsed(curr) { - return Poll::Ready(if curr == ERROR { - Err(Error::from_u8(self.error.load(SeqCst))) - } else { - Ok(()) - }); + /// Returns a TimerHandle for this timer. + pub(super) fn handle(&self) -> TimerHandle { + TimerHandle { + inner: NonNull::from(self), } + } - self.waker.register_by_ref(cx.waker()); + /// Returns true if the state of this timer indicates that the timer might + /// be registered with the driver. This check is performed with relaxed + /// ordering, but is conservative - if it returns false, the timer is + /// definitely _not_ registered. + pub(super) fn might_be_registered(&self) -> bool { + self.state.might_be_registered() + } +} - curr = self.state.load(SeqCst); +/// Additional shared state between the driver and the timer which is cache +/// padded. This contains the information that the driver thread accesses most +/// frequently to minimize contention. In particular, we move it away from the +/// waker, as the waker is updated on every poll. +struct TimerSharedPadded { + /// The expiration time for which this entry is currently registered. + /// Generally owned by the driver, but is accessed by the entry when not + /// registered. + cached_when: AtomicU64, + + /// The true expiration time. Set by the timer future, read by the driver. + true_when: AtomicU64, + + /// A link within the doubly-linked list of timers on a particular level and + /// slot. Valid only if state is equal to Registered. + /// + /// Only accessed under the entry lock. + pointers: StdUnsafeCell>, +} - if is_elapsed(curr) { - return Poll::Ready(if curr == ERROR { - Err(Error::from_u8(self.error.load(SeqCst))) - } else { - Ok(()) - }); - } +impl std::fmt::Debug for TimerSharedPadded { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TimerSharedPadded") + .field("when", &self.true_when.load(Ordering::Relaxed)) + .field("cached_when", &self.cached_when.load(Ordering::Relaxed)) + .finish() + } +} - Poll::Pending +impl TimerSharedPadded { + fn new() -> Self { + Self { + cached_when: AtomicU64::new(0), + true_when: AtomicU64::new(0), + pointers: StdUnsafeCell::new(linked_list::Pointers::new()), + } } +} - /// Only called by `Registration` - pub(crate) fn reset(entry: &mut Arc) { - let inner = match entry.upgrade_inner() { - Some(inner) => inner, - None => return, - }; +unsafe impl Send for TimerShared {} +unsafe impl Sync for TimerShared {} - let deadline = entry.time_ref().deadline; - let when = inner.normalize_deadline(deadline); - let elapsed = inner.elapsed(); +unsafe impl linked_list::Link for TimerShared { + type Handle = TimerHandle; - let next = if when <= elapsed { ELAPSED } else { when }; + type Target = TimerShared; - let mut curr = entry.state.load(SeqCst); + fn as_raw(handle: &Self::Handle) -> NonNull { + handle.inner + } - loop { - // In these two cases, there is no work to do when resetting the - // timer. If the `Entry` is in an error state, then it cannot be - // used anymore. If resetting the entry to the current value, then - // the reset is a noop. - if curr == ERROR || curr == when { - return; - } + unsafe fn from_raw(ptr: NonNull) -> Self::Handle { + TimerHandle { inner: ptr } + } - let actual = entry.state.compare_and_swap(curr, next, SeqCst); + unsafe fn pointers( + target: NonNull, + ) -> NonNull> { + unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() } + } +} - if curr == actual { - break; - } +// ===== impl Entry ===== + +impl TimerEntry { + pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { + let driver = handle.clone(); - curr = actual; + Self { + driver, + inner: StdUnsafeCell::new(TimerShared::new()), + initial_deadline: Some(deadline), } + } + + fn inner(&self) -> &TimerShared { + unsafe { &*self.inner.get() } + } + + pub(crate) fn is_elapsed(&self) -> bool { + !self.inner().state.might_be_registered() && self.initial_deadline.is_none() + } + + /// Cancels and deregisters the timer. This operation is irreversible. + pub(crate) fn cancel(self: Pin<&mut Self>) { + // We need to perform an acq/rel fence with the driver thread, and the + // simplest way to do so is to grab the driver lock. + // + // Why is this necessary? We're about to release this timer's memory for + // some other non-timer use. However, we've been doing a bunch of + // relaxed (or even non-atomic) writes from the driver thread, and we'll + // be doing more from _this thread_ (as this memory is interpreted as + // something else). + // + // It is critical to ensure that, from the point of view of the driver, + // those future non-timer writes happen-after the timer is fully fired, + // and from the purpose of this thread, the driver's writes all + // happen-before we drop the timer. This in turn requires us to perform + // an acquire-release barrier in _both_ directions between the driver + // and dropping thread. + // + // The lock acquisition in clear_entry serves this purpose. All of the + // driver manipulations happen with the lock held, so we can just take + // the lock and be sure that this drop happens-after everything the + // driver did so far and happens-before everything the driver does in + // the future. While we have the lock held, we also go ahead and + // deregister the entry if necessary. + unsafe { self.driver.clear_entry(NonNull::from(self.inner())) }; + } + + pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { + unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None; - // If the state has transitioned to 'elapsed' then wake the task as - // this entry is ready to be polled. - if !is_elapsed(curr) && is_elapsed(next) { - entry.waker.wake(); + let tick = self.driver.time_source().deadline_to_tick(new_time); + + if self.inner().extend_expiration(tick).is_ok() { + return; } - // The driver tracks all non-elapsed entries; notify the driver that it - // should update its state for this entry unless the entry had already - // elapsed and remains elapsed. - if !is_elapsed(curr) || !is_elapsed(next) { - let _ = inner.queue(entry); + unsafe { + self.driver.reregister(tick, self.inner().into()); } } - fn new2(deadline: Instant, duration: Duration, inner: Weak, state: u64) -> Self { - Self { - time: CachePadded(UnsafeCell::new(Time { deadline, duration })), - inner, - waker: AtomicWaker::new(), - state: AtomicU64::new(state), - queued: AtomicBool::new(false), - error: AtomicU8::new(0), - next_atomic: UnsafeCell::new(ptr::null_mut()), - when: UnsafeCell::new(None), - next_stack: UnsafeCell::new(None), - prev_stack: UnsafeCell::new(ptr::null_mut()), + pub(crate) fn poll_elapsed( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(deadline) = self.initial_deadline { + self.as_mut().reset(deadline); } - } - fn upgrade_inner(&self) -> Option> { - self.inner.upgrade() + let this = unsafe { self.get_unchecked_mut() }; + + this.inner().state.poll(cx.waker()) } } -fn is_elapsed(state: u64) -> bool { - state & ELAPSED == ELAPSED -} +impl TimerHandle { + pub(super) unsafe fn cached_when(&self) -> u64 { + unsafe { self.inner.as_ref().cached_when() } + } -impl Drop for Entry { - fn drop(&mut self) { - let inner = match self.upgrade_inner() { - Some(inner) => inner, - None => return, - }; + pub(super) unsafe fn sync_when(&self) -> u64 { + unsafe { self.inner.as_ref().sync_when() } + } + + pub(super) unsafe fn is_pending(&self) -> bool { + unsafe { self.inner.as_ref().state.is_pending() } + } + + /// Forcibly sets the true and cached expiration times to the given tick. + /// + /// SAFETY: The caller must ensure that the handle remains valid, the driver + /// lock is held, and that the timer is not in any wheel linked lists. + pub(super) unsafe fn set_expiration(&self, tick: u64) { + self.inner.as_ref().set_expiration(tick); + } - inner.decrement(); + /// Attempts to mark this entry as pending. If the expiration time is after + /// `not_after`, however, returns an Err with the current expiration time. + /// + /// If an `Err` is returned, the `cached_when` value will be updated to this + /// new expiration time. + /// + /// SAFETY: The caller must ensure that the handle remains valid, the driver + /// lock is held, and that the timer is not in any wheel linked lists. + /// After returning Ok, the entry must be added to the pending list. + pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { + match self.inner.as_ref().state.mark_pending(not_after) { + Ok(()) => Ok(()), + Err(tick) => { + self.inner + .as_ref() + .driver_state + .0 + .cached_when + .store(tick, Ordering::Relaxed); + Err(tick) + } + } + } + + /// Attempts to transition to a terminal state. If the state is already a + /// terminal state, does nothing. + /// + /// Because the entry might be dropped after the state is moved to a + /// terminal state, this function consumes the handle to ensure we don't + /// access the entry afterwards. + /// + /// Returns the last-registered waker, if any. + /// + /// SAFETY: The driver lock must be held while invoking this function, and + /// the entry must not be in any wheel linked lists. + pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option { + self.inner.as_ref().state.fire(completed_state) } } -unsafe impl Send for Entry {} -unsafe impl Sync for Entry {} +impl Drop for TimerEntry { + fn drop(&mut self) { + unsafe { Pin::new_unchecked(self) }.as_mut().cancel() + } +} #[cfg_attr(target_arch = "x86_64", repr(align(128)))] #[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] -#[derive(Debug)] +#[derive(Debug, Default)] struct CachePadded(T); diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index 54b8a8bd..d4d315dc 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -1,22 +1,29 @@ -use crate::time::driver::Inner; +use crate::loom::sync::{Arc, Mutex}; +use crate::time::driver::ClockTime; use std::fmt; -use std::sync::{Arc, Weak}; /// Handle to time driver instance. #[derive(Clone)] pub(crate) struct Handle { - inner: Weak, + time_source: ClockTime, + inner: Arc>, } impl Handle { /// Creates a new timer `Handle` from a shared `Inner` timer state. - pub(crate) fn new(inner: Weak) -> Self { - Handle { inner } + pub(super) fn new(inner: Arc>) -> Self { + let time_source = inner.lock().time_source.clone(); + Handle { time_source, inner } } - /// Tries to return a strong ref to the inner - pub(crate) fn inner(&self) -> Option> { - self.inner.upgrade() + /// Returns the time source associated with this handle + pub(super) fn time_source(&self) -> &ClockTime { + &self.time_source + } + + /// Locks the driver's inner structure + pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> { + self.inner.lock() } } @@ -31,12 +38,12 @@ cfg_rt! { /// It can be triggered when `Builder::enable_time()` or /// `Builder::enable_all()` are not included in the builder. /// - /// It can also panic whenever a timer is created outside of a Tokio - /// runtime. That is why `rt.block_on(delay_for(...))` will panic, - /// since the function is executed outside of the runtime. - /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't - /// panic. And this is because wrapping the function on an async makes it - /// lazy, and so gets executed inside the runtime successfuly without + /// It can also panic whenever a timer is created ouClockTimeide of a + /// Tokio runtime. That is why `rt.block_on(delay_for(...))` will panic, + /// since the function is executed ouClockTimeide of the runtime. + /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't panic. + /// And this is because wrapping the function on an async makes it lazy, + /// and so gets executed inside the runtime successfuly without /// panicking. pub(crate) fn current() -> Self { crate::runtime::context::time_handle() @@ -56,12 +63,12 @@ cfg_not_rt! { /// It can be triggered when `Builder::enable_time()` or /// `Builder::enable_all()` are not included in the builder. /// - /// It can also panic whenever a timer is created outside of a Tokio + /// It can also panic whenever a timer is created ouClockTimeide of a Tokio /// runtime. That is why `rt.block_on(delay_for(...))` will panic, - /// since the function is executed outside of the runtime. + /// since the function is executed ouClockTimeide of the runtime. /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't /// panic. And this is because wrapping the function on an async makes it - /// lazy, and so gets executed inside the runtime successfuly without + /// lazy, and so geClockTime executed inside the runtime successfuly without /// panicking. pub(crate) fn current() -> Self { panic!("there is no timer running, must be called from the context of Tokio runtime or \ diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 8532c551..917078ef 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -1,26 +1,29 @@ +// Currently, rust warns when an unsafe fn contains an unsafe {} block. However, +// in the future, this will change to the reverse. For now, suppress this +// warning and generally stick with being explicit about unsafety. +#![allow(unused_unsafe)] #![cfg_attr(not(feature = "rt"), allow(dead_code))] //! Time driver -mod atomic_stack; -use self::atomic_stack::AtomicStack; - mod entry; -pub(super) use self::entry::Entry; +pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; mod handle; pub(crate) use self::handle::Handle; -use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; +mod wheel; + +pub(super) mod sleep; + +use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; -use crate::time::{error::Error, wheel}; +use crate::time::error::Error; use crate::time::{Clock, Duration, Instant}; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; - -use std::sync::Arc; -use std::usize; -use std::{cmp, fmt}; +use std::convert::TryInto; +use std::fmt; +use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// @@ -78,63 +81,96 @@ use std::{cmp, fmt}; /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] -pub(crate) struct Driver { +pub(crate) struct Driver { + /// Timing backend in use + time_source: ClockTime, + /// Shared state - inner: Arc, + inner: Handle, - /// Timer wheel - wheel: wheel::Wheel, + /// Parker to delegate to + park: P, +} + +/// A structure which handles conversion from Instants to u64 timestamps. +#[derive(Debug, Clone)] +pub(self) struct ClockTime { + clock: super::clock::Clock, + start_time: Instant, +} - /// Thread parker. The `Driver` park implementation delegates to this. - park: T, +impl ClockTime { + pub(self) fn new(clock: Clock) -> Self { + Self { + clock, + start_time: super::clock::now(), + } + } - /// Source of "now" instances - clock: Clock, + pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { + // Round up to the end of a ms + self.instant_to_tick(t + Duration::from_nanos(999_999)) + } - /// True if the driver is being shutdown - is_shutdown: bool, + pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { + // round up + let dur: Duration = t + .checked_duration_since(self.start_time) + .unwrap_or_else(|| Duration::from_secs(0)); + let ms = dur.as_millis(); + + ms.try_into().expect("Duration too far into the future") + } + + pub(self) fn tick_to_duration(&self, t: u64) -> Duration { + Duration::from_millis(t) + } + + pub(self) fn now(&self) -> u64 { + self.instant_to_tick(self.clock.now()) + } } /// Timer state shared between `Driver`, `Handle`, and `Registration`. -pub(crate) struct Inner { - /// The instant at which the timer started running. - start: Instant, +pub(self) struct Inner { + /// Timing backend in use + time_source: ClockTime, /// The last published timer `elapsed` value. - elapsed: AtomicU64, + elapsed: u64, - /// Number of active timeouts - num: AtomicUsize, + /// The earliest time at which we promise to wake up without unparking + next_wake: Option, - /// Head of the "process" linked list. - process: AtomicStack, + /// Timer wheel + wheel: wheel::Wheel, + + /// True if the driver is being shutdown + is_shutdown: bool, - /// Unparks the timer thread. + /// Unparker that can be used to wake the time driver unpark: Box, } -/// Maximum number of timeouts the system can handle concurrently. -const MAX_TIMEOUTS: usize = usize::MAX >> 1; - // ===== impl Driver ===== -impl Driver +impl

Driver

where - T: Park, + P: Park + 'static, { /// Creates a new `Driver` instance that uses `park` to block the current - /// thread and `clock` to get the current `Instant`. + /// thread and `time_source` to get the current time and convert to ticks. /// /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: T, clock: Clock) -> Driver { - let unpark = Box::new(park.unpark()); + pub(crate) fn new(park: P, clock: Clock) -> Driver

{ + let time_source = ClockTime::new(clock); + + let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); Driver { - inner: Arc::new(Inner::new(clock.now(), unpark)), - wheel: wheel::Wheel::new(), + time_source, + inner: Handle::new(Arc::new(Mutex::new(inner))), park, - clock, - is_shutdown: false, } } @@ -145,189 +181,240 @@ where /// `with_default`, setting the timer as the default timer for the execution /// context. pub(crate) fn handle(&self) -> Handle { - Handle::new(Arc::downgrade(&self.inner)) + self.inner.clone() } - /// Converts an `Expiration` to an `Instant`. - fn expiration_instant(&self, when: u64) -> Instant { - self.inner.start + Duration::from_millis(when) - } + fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { + let clock = &self.time_source.clock; - /// Runs timer related logic - fn process(&mut self) { - let now = crate::time::ms( - self.clock.now() - self.inner.start, - crate::time::Round::Down, - ); + let mut lock = self.inner.lock(); - while let Some(entry) = self.wheel.poll(now) { - let when = entry.when_internal().expect("invalid internal entry state"); + let next_wake = lock.wheel.next_expiration_time(); + lock.next_wake = + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - // Fire the entry - entry.fire(when); + drop(lock); - // Track that the entry has been fired - entry.set_when_internal(None); - } + match next_wake { + Some(when) => { + let now = self.time_source.now(); + // Note that we effectively round up to 1ms here - this avoids + // very short-duration microsecond-resolution sleeps that the OS + // might treat as zero-length. + let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); + + if duration > Duration::from_millis(0) { + if let Some(limit) = limit { + duration = std::cmp::min(limit, duration); + } - // Update the elapsed cache - self.inner.elapsed.store(self.wheel.elapsed(), SeqCst); - } + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; - /// Processes the entry queue - /// - /// This handles adding and canceling timeouts. - fn process_queue(&mut self) { - for entry in self.inner.process.take() { - match (entry.when_internal(), entry.load_state()) { - (None, None) => { - // Nothing to do - } - (Some(_), None) => { - // Remove the entry - self.clear_entry(&entry); - } - (None, Some(when)) => { - // Add the entry to the timer wheel - self.add_entry(entry, when); + // Simulate advancing time + clock.advance(duration); + } else { + self.park.park_timeout(duration)?; + } + } else { + self.park.park_timeout(Duration::from_secs(0))?; } - (Some(_), Some(next)) => { - self.clear_entry(&entry); - self.add_entry(entry, next); + } + None => { + if let Some(duration) = limit { + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + clock.advance(duration); + } else { + self.park.park_timeout(duration)?; + } + } else { + self.park.park()?; } } } - } - - fn clear_entry(&mut self, entry: &Arc) { - self.wheel.remove(entry); - entry.set_when_internal(None); - } - /// Fires the entry if it needs to, otherwise queue it to be processed later. - fn add_entry(&mut self, entry: Arc, when: u64) { - use crate::time::error::InsertError; + // Process pending timers after waking up + self.inner.process(); - entry.set_when_internal(Some(when)); - - match self.wheel.insert(when, entry) { - Ok(_) => {} - Err((entry, InsertError::Elapsed)) => { - // The entry's deadline has elapsed, so fire it and update the - // internal state accordingly. - entry.set_when_internal(None); - entry.fire(when); - } - Err((entry, InsertError::Invalid)) => { - // The entry's deadline is invalid, so error it and update the - // internal state accordingly. - entry.set_when_internal(None); - entry.error(Error::invalid()); - } - } + Ok(()) } } -impl Park for Driver -where - T: Park, -{ - type Unpark = T::Unpark; - type Error = T::Error; +impl Handle { + /// Runs timer related logic, and returns the next wakeup time + pub(self) fn process(&self) { + let now = self.time_source().now(); - fn unpark(&self) -> Self::Unpark { - self.park.unpark() + self.process_at_time(now) } - fn park(&mut self) -> Result<(), Self::Error> { - self.process_queue(); + pub(self) fn process_at_time(&self, now: u64) { + let mut waker_list: [Option; 32] = Default::default(); + let mut waker_idx = 0; - match self.wheel.poll_at() { - Some(when) => { - let now = self.clock.now(); - let deadline = self.expiration_instant(when); + let mut lock = self.lock(); - if deadline > now { - let dur = deadline - now; + assert!(now >= lock.elapsed); - if self.clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - self.clock.advance(dur); - } else { - self.park.park_timeout(dur)?; + while let Some(entry) = lock.wheel.poll(now) { + debug_assert!(unsafe { entry.is_pending() }); + + // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. + if let Some(waker) = unsafe { entry.fire(Ok(())) } { + waker_list[waker_idx] = Some(waker); + + waker_idx += 1; + + if waker_idx == waker