summaryrefslogtreecommitdiffstats
path: root/tokio/src/time/driver/wheel/level.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/time/driver/wheel/level.rs')
-rw-r--r--tokio/src/time/driver/wheel/level.rs277
1 files changed, 277 insertions, 0 deletions
diff --git a/tokio/src/time/driver/wheel/level.rs b/tokio/src/time/driver/wheel/level.rs
new file mode 100644
index 00000000..58280b10
--- /dev/null
+++ b/tokio/src/time/driver/wheel/level.rs
@@ -0,0 +1,277 @@
+use crate::time::driver::TimerHandle;
+
+use crate::time::driver::{EntryList, TimerShared};
+
+use std::{fmt, ptr::NonNull};
+
+/// Wheel for a single level in the timer. This wheel contains 64 slots.
+pub(crate) struct Level {
+ level: usize,
+
+ /// Bit field tracking which slots currently contain entries.
+ ///
+ /// Using a bit field to track slots that contain entries allows avoiding a
+ /// scan to find entries. This field is updated when entries are added or
+ /// removed from a slot.
+ ///
+ /// The least-significant bit represents slot zero.
+ occupied: u64,
+
+ /// Slots. We access these via the EntryInner `current_list` as well, so this needs to be an UnsafeCell.
+ slot: [EntryList; LEVEL_MULT],
+}
+
+/// Indicates when a slot must be processed next.
+#[derive(Debug)]
+pub(crate) struct Expiration {
+ /// The level containing the slot.
+ pub(crate) level: usize,
+
+ /// The slot index.
+ pub(crate) slot: usize,
+
+ /// The instant at which the slot needs to be processed.
+ pub(crate) deadline: u64,
+}
+
+/// Level multiplier.
+///
+/// Being a power of 2 is very important.
+const LEVEL_MULT: usize = 64;
+
+impl Level {
+ pub(crate) fn new(level: usize) -> Level {
+ // A value has to be Copy in order to use syntax like:
+ // let stack = Stack::default();
+ // ...
+ // slots: [stack; 64],
+ //
+ // Alternatively, since Stack is Default one can
+ // use syntax like:
+ // let slots: [Stack; 64] = Default::default();
+ //
+ // However, that is only supported for arrays of size
+ // 32 or fewer. So in our case we have to explicitly
+ // invoke the constructor for each array element.
+ let ctor = || EntryList::default();
+
+ Level {
+ level,
+ occupied: 0,
+ slot: [
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ],
+ }
+ }
+
+ /// Finds the slot that needs to be processed next and returns the slot and
+ /// `Instant` at which this slot must be processed.
+ pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
+ // Use the `occupied` bit field to get the index of the next slot that
+ // needs to be processed.
+ let slot = match self.next_occupied_slot(now) {
+ Some(slot) => slot,
+ None => return None,
+ };
+
+ // From the slot index, calculate the `Instant` at which it needs to be
+ // processed. This value *must* be in the future with respect to `now`.
+
+ let level_range = level_range(self.level);
+ let slot_range = slot_range(self.level);
+
+ // TODO: This can probably be simplified w/ power of 2 math
+ let level_start = now - (now % level_range);
+ let mut deadline = level_start + slot as u64 * slot_range;
+
+ if deadline <= now {
+ // A timer is in a slot "prior" to the current time. This can occur
+ // because we do not have an infinite hierarchy of timer levels, and
+ // eventually a timer scheduled for a very distant time might end up
+ // being placed in a slot that is beyond the end of all of the
+ // arrays.
+ //
+ // To deal with this, we first limit timers to being scheduled no
+ // more than MAX_DURATION ticks in the future; that is, they're at
+ // most one rotation of the top level away. Then, we force timers
+ // that logically would go into the top+1 level, to instead go into
+ // the top level's slots.
+ //
+ // What this means is that the top level's slots act as a
+ // pseudo-ring buffer, and we rotate around them indefinitely. If we
+ // compute a deadline before now, and it's the top level, it
+ // therefore means we're actually looking at a slot in the future.
+ debug_assert_eq!(self.level, super::NUM_LEVELS - 1);
+
+ deadline += level_range;
+ }
+
+ debug_assert!(
+ deadline >= now,
+ "deadline={:016X}; now={:016X}; level={}; lr={:016X}, sr={:016X}, slot={}; occupied={:b}",
+ deadline,
+ now,
+ self.level,
+ level_range,
+ slot_range,
+ slot,
+ self.occupied
+ );
+
+ Some(Expiration {
+ level: self.level,
+ slot,
+ deadline,
+ })
+ }
+
+ fn next_occupied_slot(&self, now: u64) -> Option<usize> {
+ if self.occupied == 0 {
+ return None;
+ }
+
+ // Get the slot for now using Maths
+ let now_slot = (now / slot_range(self.level)) as usize;
+ let occupied = self.occupied.rotate_right(now_slot as u32);
+ let zeros = occupied.trailing_zeros() as usize;
+ let slot = (zeros + now_slot) % 64;
+
+ Some(slot)
+ }
+
+ pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
+ let slot = slot_for(item.cached_when(), self.level);
+
+ self.slot[slot].push_front(item);
+
+ self.occupied |= occupied_bit(slot);
+ }
+
+ pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) {
+ let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level);
+
+ unsafe { self.slot[slot].remove(item) };
+ if self.slot[slot].is_empty() {
+ // The bit is currently set
+ debug_assert!(self.occupied & occupied_bit(slot) != 0);
+
+ // Unset the bit
+ self.occupied ^= occupied_bit(slot);
+ }
+ }
+
+ pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList {
+ self.occupied &= !occupied_bit(slot);
+
+ std::mem::take(&mut self.slot[slot])
+ }
+}
+
+impl fmt::Debug for Level {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Level")
+ .field("occupied", &self.occupied)
+ .finish()
+ }
+}
+
+fn occupied_bit(slot: usize) -> u64 {
+ 1 << slot
+}
+
+fn slot_range(level: usize) -> u64 {
+ LEVEL_MULT.pow(level as u32) as u64
+}
+
+fn level_range(level: usize) -> u64 {
+ LEVEL_MULT as u64 * slot_range(level)
+}
+
+/// Convert a duration (milliseconds) and a level to a slot position
+fn slot_for(duration: u64, level: usize) -> usize {
+ ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
+}
+
+/*
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_slot_for() {
+ for pos in 1..64 {
+ assert_eq!(pos as usize, slot_for(pos, 0));
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(pos as usize, slot_for(a as u64, level));
+ }
+ }
+ }
+}
+*/