summaryrefslogtreecommitdiffstats
path: root/tokio/src/timer/timer/entry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/timer/timer/entry.rs')
-rw-r--r--tokio/src/timer/timer/entry.rs393
1 files changed, 393 insertions, 0 deletions
diff --git a/tokio/src/timer/timer/entry.rs b/tokio/src/timer/timer/entry.rs
new file mode 100644
index 00000000..a31dda4a
--- /dev/null
+++ b/tokio/src/timer/timer/entry.rs
@@ -0,0 +1,393 @@
+use crate::timer::atomic::AtomicU64;
+use crate::timer::timer::{HandlePriv, Inner};
+use crate::timer::Error;
+
+use tokio_sync::AtomicWaker;
+
+use crossbeam_utils::CachePadded;
+use std::cell::UnsafeCell;
+use std::ptr;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::{Arc, Weak};
+use std::task::{self, Poll};
+use std::time::{Duration, Instant};
+use std::u64;
+
+/// Internal state shared between a `Delay` instance and the timer.
+///
+/// This struct is used as a node in two intrusive data structures:
+///
+/// * An atomic stack used to signal to the timer thread that the entry state
+/// has changed. The timer thread will observe the entry on this stack and
+/// perform any actions as necessary.
+///
+/// * A doubly linked list used **only** by the timer thread. Each slot in the
+/// timer wheel is a head pointer to the list of entries that must be
+/// processed during that timer tick.
+#[derive(Debug)]
+pub(crate) struct Entry {
+ /// Only accessed from `Registration`.
+ time: CachePadded<UnsafeCell<Time>>,
+
+ /// Timer internals. Using a weak pointer allows the timer to shutdown
+ /// without all `Delay` instances having completed.
+ ///
+ /// When `None`, the entry has not yet been linked with a timer instance.
+ inner: Option<Weak<Inner>>,
+
+ /// Tracks the entry state. This value contains the following information:
+ ///
+ /// * The deadline at which the entry must be "fired".
+ /// * A flag indicating if the entry has already been fired.
+ /// * Whether or not the entry transitioned to the error state.
+ ///
+ /// When an `Entry` is created, `state` is initialized to the instant at
+ /// which the entry must be fired. When a timer is reset to a different
+ /// instant, this value is changed.
+ state: AtomicU64,
+
+ /// Task to notify once the deadline is reached.
+ waker: AtomicWaker,
+
+ /// True when the entry is queued in the "process" stack. This value
+ /// is set before pushing the value and unset after popping the value.
+ ///
+ /// TODO: This could possibly be rolled up into `state`.
+ pub(super) queued: AtomicBool,
+
+ /// Next entry in the "process" linked list.
+ ///
+ /// Access to this field is coordinated by the `queued` flag.
+ ///
+ /// Represents a strong Arc ref.
+ pub(super) next_atomic: UnsafeCell<*mut Entry>,
+
+ /// When the entry expires, relative to the `start` of the timer
+ /// (Inner::start). This is only used by the timer.
+ ///
+ /// A `Delay` instance can be reset to a different deadline by the thread
+ /// that owns the `Delay` instance. In this case, the timer thread will not
+ /// immediately know that this has happened. The timer thread must know the
+ /// last deadline that it saw as it uses this value to locate the entry in
+ /// its wheel.
+ ///
+ /// Once the timer thread observes that the instant has changed, it updates
+ /// the wheel and sets this value. The idea is that this value eventually
+ /// converges to the value of `state` as the timer thread makes updates.
+ when: UnsafeCell<Option<u64>>,
+
+ /// Next entry in the State's linked list.
+ ///
+ /// This is only accessed by the timer
+ pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
+
+ /// Previous entry in the State's linked list.
+ ///
+ /// This is only accessed by the timer and is used to unlink a canceled
+ /// entry.
+ ///
+ /// This is a weak reference.
+ pub(super) prev_stack: UnsafeCell<*const Entry>,
+}
+
+/// Stores the info for `Delay`.
+#[derive(Debug)]
+pub(crate) struct Time {
+ pub(crate) deadline: Instant,
+ pub(crate) duration: Duration,
+}
+
+/// Flag indicating a timer entry has elapsed
+const ELAPSED: u64 = 1 << 63;
+
+/// Flag indicating a timer entry has reached an error state
+const ERROR: u64 = u64::MAX;
+
+// ===== impl Entry =====
+
+impl Entry {
+ pub(crate) fn new(deadline: Instant, duration: Duration) -> Entry {
+ Entry {
+ time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
+ inner: None,
+ waker: AtomicWaker::new(),
+ state: AtomicU64::new(0),
+ queued: AtomicBool::new(false),
+ next_atomic: UnsafeCell::new(ptr::null_mut()),
+ when: UnsafeCell::new(None),
+ next_stack: UnsafeCell::new(None),
+ prev_stack: UnsafeCell::new(ptr::null_mut()),
+ }
+ }
+
+ /// Only called by `Registration`
+ pub(crate) fn time_ref(&self) -> &Time {
+ unsafe { &*self.time.get() }
+ }
+
+ /// Only called by `Registration`
+ #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281
+ pub(crate) unsafe fn time_mut(&self) -> &mut Time {
+ &mut *self.time.get()
+ }
+
+ /// Returns `true` if the `Entry` is currently associated with a timer
+ /// instance.
+ pub(crate) fn is_registered(&self) -> bool {
+ self.inner.is_some()
+ }
+
+ /// Only called by `Registration`
+ pub(crate) fn register(me: &mut Arc<Self>) {
+ let handle = match HandlePriv::try_current() {
+ Ok(handle) => handle,
+ Err(_) => {
+ // Could not associate the entry with a timer, transition the
+ // state to error
+ Arc::get_mut(me).unwrap().transition_to_error();
+
+ return;
+ }
+ };
+
+ Entry::register_with(me, handle)
+ }
+
+ /// Only called by `Registration`
+ pub(crate) fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
+ assert!(!me.is_registered(), "only register an entry once");
+
+ let deadline = me.time_ref().deadline;
+
+ let inner = match handle.inner() {
+ Some(inner) => inner,
+ None => {
+ // Could not associate the entry with a timer, transition the
+ // state to error
+ Arc::get_mut(me).unwrap().transition_to_error();
+
+ return;
+ }
+ };
+
+ // Increment the number of active timeouts
+ if inner.increment().is_err() {
+ Arc::get_mut(me).unwrap().transition_to_error();
+
+ return;
+ }
+
+ // Associate the entry with the timer
+ Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
+
+ let when = inner.normalize_deadline(deadline);
+
+ // Relaxed OK: At this point, there are no other threads that have
+ // access to this entry.
+ if when <= inner.elapsed() {
+ me.state.store(ELAPSED, Relaxed);
+ return;
+ } else {
+ me.state.store(when, Relaxed);
+ }
+
+ if inner.queue(me).is_err() {
+ // The timer has shutdown, transition the entry to the error state.
+ me.error();
+ }
+ }
+
+ fn transition_to_error(&mut self) {
+ self.inner = Some(Weak::new());
+ self.state = AtomicU64::new(ERROR);
+ }
+
+ /// The current entry state as known by the timer. This is not the value of
+ /// `state`, but lets the timer know how to converge its state to `state`.
+ pub(crate) fn when_internal(&self) -> Option<u64> {
+ unsafe { (*self.when.get()) }
+ }
+
+ pub(crate) fn set_when_internal(&self, when: Option<u64>) {
+ unsafe {
+ (*self.when.get()) = when;
+ }
+ }
+
+ /// Called by `Timer` to load the current value of `state` for processing
+ pub(crate) fn load_state(&self) -> Option<u64> {
+ let state = self.state.load(SeqCst);
+
+ if is_elapsed(state) {
+ None
+ } else {
+ Some(state)
+ }
+ }
+
+ pub(crate) fn is_elapsed(&self) -> bool {
+ let state = self.state.load(SeqCst);
+ is_elapsed(state)
+ }
+
+ pub(crate) fn fire(&self, when: u64) {
+ let mut curr = self.state.load(SeqCst);
+
+ loop {
+ if is_elapsed(curr) || curr > when {
+ return;
+ }
+
+ let next = ELAPSED | curr;
+ let actual = self.state.compare_and_swap(curr, next, SeqCst);
+
+ if curr == actual {
+ break;
+ }
+
+ curr = actual;
+ }
+
+ self.waker.wake();
+ }
+
+ pub(crate) fn error(&self) {
+ // Only transition to the error state if not currently elapsed
+ let mut curr = self.state.load(SeqCst);
+
+ loop {
+ if is_elapsed(curr) {
+ return;
+ }
+
+ let next = ERROR;
+
+ let actual = self.state.compare_and_swap(curr, next, SeqCst);
+
+ if curr == actual {
+ break;
+ }
+
+ curr = actual;
+ }
+
+ self.waker.wake();
+ }
+
+ pub(crate) fn cancel(entry: &Arc<Entry>) {
+ let state = entry.state.fetch_or(ELAPSED, SeqCst);
+
+ if is_elapsed(state) {
+ // Nothing more to do
+ return;
+ }
+
+ // If registered with a timer instance, try to upgrade the Arc.
+ let inner = match entry.upgrade_inner() {
+ Some(inner) => inner,
+ None => return,
+ };
+
+ let _ = inner.queue(entry);
+ }
+
+ pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ let mut curr = self.state.load(SeqCst);
+
+ if is_elapsed(curr) {
+ return Poll::Ready(if curr == ERROR {
+ Err(Error::shutdown())
+ } else {
+ Ok(())
+ });
+ }
+
+ self.waker.register_by_ref(cx.waker());
+
+ curr = self.state.load(SeqCst);
+
+ if is_elapsed(curr) {
+ return Poll::Ready(if curr == ERROR {
+ Err(Error::shutdown())
+ } else {
+ Ok(())
+ });
+ }
+
+ Poll::Pending
+ }
+
+ /// Only called by `Registration`
+ pub(crate) fn reset(entry: &mut Arc<Entry>) {
+ if !entry.is_registered() {
+ return;
+ }
+
+ let inner = match entry.upgrade_inner() {
+ Some(inner) => inner,
+ None => return,
+ };
+
+ let deadline = entry.time_ref().deadline;
+ let when = inner.normalize_deadline(deadline);
+ let elapsed = inner.elapsed();
+
+ let mut curr = entry.state.load(SeqCst);
+ let mut notify;
+
+ loop {
+ // In these two cases, there is no work to do when resetting the
+ // timer. If the `Entry` is in an error state, then it cannot be
+ // used anymore. If resetting the entry to the current value, then
+ // the reset is a noop.
+ if curr == ERROR || curr == when {
+ return;
+ }
+
+ let next;
+
+ if when <= elapsed {
+ next = ELAPSED;
+ notify = !is_elapsed(curr);
+ } else {
+ next = when;
+ notify = true;
+ }
+
+ let actual = entry.state.compare_and_swap(curr, next, SeqCst);
+
+ if curr == actual {
+ break;
+ }
+
+ curr = actual;
+ }
+
+ if notify {
+ let _ = inner.queue(entry);
+ }
+ }
+
+ fn upgrade_inner(&self) -> Option<Arc<Inner>> {
+ self.inner.as_ref().and_then(|inner| inner.upgrade())
+ }
+}
+
+fn is_elapsed(state: u64) -> bool {
+ state & ELAPSED == ELAPSED
+}
+
+impl Drop for Entry {
+ fn drop(&mut self) {
+ let inner = match self.upgrade_inner() {
+ Some(inner) => inner,
+ None => return,
+ };
+
+ inner.decrement();
+ }
+}
+
+unsafe impl Send for Entry {}
+unsafe impl Sync for Entry {}