diff options
Diffstat (limited to 'tokio/src/timer/timer/entry.rs')
-rw-r--r-- | tokio/src/timer/timer/entry.rs | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/tokio/src/timer/timer/entry.rs b/tokio/src/timer/timer/entry.rs new file mode 100644 index 00000000..a31dda4a --- /dev/null +++ b/tokio/src/timer/timer/entry.rs @@ -0,0 +1,393 @@ +use crate::timer::atomic::AtomicU64; +use crate::timer::timer::{HandlePriv, Inner}; +use crate::timer::Error; + +use tokio_sync::AtomicWaker; + +use crossbeam_utils::CachePadded; +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, Weak}; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; +use std::u64; + +/// Internal state shared between a `Delay` instance and the timer. +/// +/// This struct is used as a node in two intrusive data structures: +/// +/// * An atomic stack used to signal to the timer thread that the entry state +/// has changed. The timer thread will observe the entry on this stack and +/// perform any actions as necessary. +/// +/// * A doubly linked list used **only** by the timer thread. Each slot in the +/// timer wheel is a head pointer to the list of entries that must be +/// processed during that timer tick. +#[derive(Debug)] +pub(crate) struct Entry { + /// Only accessed from `Registration`. + time: CachePadded<UnsafeCell<Time>>, + + /// Timer internals. Using a weak pointer allows the timer to shutdown + /// without all `Delay` instances having completed. + /// + /// When `None`, the entry has not yet been linked with a timer instance. + inner: Option<Weak<Inner>>, + + /// Tracks the entry state. This value contains the following information: + /// + /// * The deadline at which the entry must be "fired". + /// * A flag indicating if the entry has already been fired. + /// * Whether or not the entry transitioned to the error state. + /// + /// When an `Entry` is created, `state` is initialized to the instant at + /// which the entry must be fired. When a timer is reset to a different + /// instant, this value is changed. + state: AtomicU64, + + /// Task to notify once the deadline is reached. + waker: AtomicWaker, + + /// True when the entry is queued in the "process" stack. This value + /// is set before pushing the value and unset after popping the value. + /// + /// TODO: This could possibly be rolled up into `state`. + pub(super) queued: AtomicBool, + + /// Next entry in the "process" linked list. + /// + /// Access to this field is coordinated by the `queued` flag. + /// + /// Represents a strong Arc ref. + pub(super) next_atomic: UnsafeCell<*mut Entry>, + + /// When the entry expires, relative to the `start` of the timer + /// (Inner::start). This is only used by the timer. + /// + /// A `Delay` instance can be reset to a different deadline by the thread + /// that owns the `Delay` instance. In this case, the timer thread will not + /// immediately know that this has happened. The timer thread must know the + /// last deadline that it saw as it uses this value to locate the entry in + /// its wheel. + /// + /// Once the timer thread observes that the instant has changed, it updates + /// the wheel and sets this value. The idea is that this value eventually + /// converges to the value of `state` as the timer thread makes updates. + when: UnsafeCell<Option<u64>>, + + /// Next entry in the State's linked list. + /// + /// This is only accessed by the timer + pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>, + + /// Previous entry in the State's linked list. + /// + /// This is only accessed by the timer and is used to unlink a canceled + /// entry. + /// + /// This is a weak reference. + pub(super) prev_stack: UnsafeCell<*const Entry>, +} + +/// Stores the info for `Delay`. +#[derive(Debug)] +pub(crate) struct Time { + pub(crate) deadline: Instant, + pub(crate) duration: Duration, +} + +/// Flag indicating a timer entry has elapsed +const ELAPSED: u64 = 1 << 63; + +/// Flag indicating a timer entry has reached an error state +const ERROR: u64 = u64::MAX; + +// ===== impl Entry ===== + +impl Entry { + pub(crate) fn new(deadline: Instant, duration: Duration) -> Entry { + Entry { + time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })), + inner: None, + waker: AtomicWaker::new(), + state: AtomicU64::new(0), + queued: AtomicBool::new(false), + next_atomic: UnsafeCell::new(ptr::null_mut()), + when: UnsafeCell::new(None), + next_stack: UnsafeCell::new(None), + prev_stack: UnsafeCell::new(ptr::null_mut()), + } + } + + /// Only called by `Registration` + pub(crate) fn time_ref(&self) -> &Time { + unsafe { &*self.time.get() } + } + + /// Only called by `Registration` + #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281 + pub(crate) unsafe fn time_mut(&self) -> &mut Time { + &mut *self.time.get() + } + + /// Returns `true` if the `Entry` is currently associated with a timer + /// instance. + pub(crate) fn is_registered(&self) -> bool { + self.inner.is_some() + } + + /// Only called by `Registration` + pub(crate) fn register(me: &mut Arc<Self>) { + let handle = match HandlePriv::try_current() { + Ok(handle) => handle, + Err(_) => { + // Could not associate the entry with a timer, transition the + // state to error + Arc::get_mut(me).unwrap().transition_to_error(); + + return; + } + }; + + Entry::register_with(me, handle) + } + + /// Only called by `Registration` + pub(crate) fn register_with(me: &mut Arc<Self>, handle: HandlePriv) { + assert!(!me.is_registered(), "only register an entry once"); + + let deadline = me.time_ref().deadline; + + let inner = match handle.inner() { + Some(inner) => inner, + None => { + // Could not associate the entry with a timer, transition the + // state to error + Arc::get_mut(me).unwrap().transition_to_error(); + + return; + } + }; + + // Increment the number of active timeouts + if inner.increment().is_err() { + Arc::get_mut(me).unwrap().transition_to_error(); + + return; + } + + // Associate the entry with the timer + Arc::get_mut(me).unwrap().inner = Some(handle.into_inner()); + + let when = inner.normalize_deadline(deadline); + + // Relaxed OK: At this point, there are no other threads that have + // access to this entry. + if when <= inner.elapsed() { + me.state.store(ELAPSED, Relaxed); + return; + } else { + me.state.store(when, Relaxed); + } + + if inner.queue(me).is_err() { + // The timer has shutdown, transition the entry to the error state. + me.error(); + } + } + + fn transition_to_error(&mut self) { + self.inner = Some(Weak::new()); + self.state = AtomicU64::new(ERROR); + } + + /// The current entry state as known by the timer. This is not the value of + /// `state`, but lets the timer know how to converge its state to `state`. + pub(crate) fn when_internal(&self) -> Option<u64> { + unsafe { (*self.when.get()) } + } + + pub(crate) fn set_when_internal(&self, when: Option<u64>) { + unsafe { + (*self.when.get()) = when; + } + } + + /// Called by `Timer` to load the current value of `state` for processing + pub(crate) fn load_state(&self) -> Option<u64> { + let state = self.state.load(SeqCst); + + if is_elapsed(state) { + None + } else { + Some(state) + } + } + + pub(crate) fn is_elapsed(&self) -> bool { + let state = self.state.load(SeqCst); + is_elapsed(state) + } + + pub(crate) fn fire(&self, when: u64) { + let mut curr = self.state.load(SeqCst); + + loop { + if is_elapsed(curr) || curr > when { + return; + } + + let next = ELAPSED | curr; + let actual = self.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + self.waker.wake(); + } + + pub(crate) fn error(&self) { + // Only transition to the error state if not currently elapsed + let mut curr = self.state.load(SeqCst); + + loop { + if is_elapsed(curr) { + return; + } + + let next = ERROR; + + let actual = self.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + self.waker.wake(); + } + + pub(crate) fn cancel(entry: &Arc<Entry>) { + let state = entry.state.fetch_or(ELAPSED, SeqCst); + + if is_elapsed(state) { + // Nothing more to do + return; + } + + // If registered with a timer instance, try to upgrade the Arc. + let inner = match entry.upgrade_inner() { + Some(inner) => inner, + None => return, + }; + + let _ = inner.queue(entry); + } + + pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + let mut curr = self.state.load(SeqCst); + + if is_elapsed(curr) { + return Poll::Ready(if curr == ERROR { + Err(Error::shutdown()) + } else { + Ok(()) + }); + } + + self.waker.register_by_ref(cx.waker()); + + curr = self.state.load(SeqCst); + + if is_elapsed(curr) { + return Poll::Ready(if curr == ERROR { + Err(Error::shutdown()) + } else { + Ok(()) + }); + } + + Poll::Pending + } + + /// Only called by `Registration` + pub(crate) fn reset(entry: &mut Arc<Entry>) { + if !entry.is_registered() { + return; + } + + let inner = match entry.upgrade_inner() { + Some(inner) => inner, + None => return, + }; + + let deadline = entry.time_ref().deadline; + let when = inner.normalize_deadline(deadline); + let elapsed = inner.elapsed(); + + let mut curr = entry.state.load(SeqCst); + let mut notify; + + loop { + // In these two cases, there is no work to do when resetting the + // timer. If the `Entry` is in an error state, then it cannot be + // used anymore. If resetting the entry to the current value, then + // the reset is a noop. + if curr == ERROR || curr == when { + return; + } + + let next; + + if when <= elapsed { + next = ELAPSED; + notify = !is_elapsed(curr); + } else { + next = when; + notify = true; + } + + let actual = entry.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + if notify { + let _ = inner.queue(entry); + } + } + + fn upgrade_inner(&self) -> Option<Arc<Inner>> { + self.inner.as_ref().and_then(|inner| inner.upgrade()) + } +} + +fn is_elapsed(state: u64) -> bool { + state & ELAPSED == ELAPSED +} + +impl Drop for Entry { + fn drop(&mut self) { + let inner = match self.upgrade_inner() { + Some(inner) => inner, + None => return, + }; + + inner.decrement(); + } +} + +unsafe impl Send for Entry {} +unsafe impl Sync for Entry {} |