summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbdonlan <bdonlan@gmail.com>2020-10-05 14:25:04 -0700
committerGitHub <noreply@github.com>2020-10-05 14:25:04 -0700
commit9730317e94cd5bfca237376549405a6feb815223 (patch)
tree333c03937b5c804b4c79335cdb8d7d68cf665e44
parent02311dcfa13d719c3f591c922f8a719d7c954ef0 (diff)
time: move DelayQueue to tokio-util (#2897)
This change is intended to do the minimum to unblock 0.3; as such, for now, we duplicate the internal `time::wheel` structures in tokio-util, rather than trying to refactor things at this stage. Co-authored-by: Bryan Donlan <bdonlan@amazon.com>
-rw-r--r--tokio-util/Cargo.toml4
-rw-r--r--tokio-util/src/lib.rs3
-rw-r--r--tokio-util/src/time/delay_queue.rs (renamed from tokio/src/time/delay_queue.rs)39
-rw-r--r--tokio-util/src/time/mod.rs47
-rw-r--r--tokio-util/src/time/wheel/level.rs255
-rw-r--r--tokio-util/src/time/wheel/mod.rs314
-rw-r--r--tokio-util/src/time/wheel/stack.rs26
-rw-r--r--tokio-util/tests/time_delay_queue.rs (renamed from tokio/tests/time_delay_queue.rs)3
-rw-r--r--tokio/Cargo.toml4
-rw-r--r--tokio/src/time/mod.rs7
10 files changed, 674 insertions, 28 deletions
diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml
index 45daa2b1..8c54f27b 100644
--- a/tokio-util/Cargo.toml
+++ b/tokio-util/Cargo.toml
@@ -25,10 +25,11 @@ publish = false
default = []
# Shorthand for enabling everything
-full = ["codec", "compat", "io"]
+full = ["codec", "compat", "io", "time"]
compat = ["futures-io",]
codec = ["tokio/stream"]
+time = ["tokio/time","slab"]
io = []
[dependencies]
@@ -40,6 +41,7 @@ futures-sink = "0.3.0"
futures-io = { version = "0.3.0", optional = true }
log = "0.4"
pin-project-lite = "0.1.4"
+slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
[dev-dependencies]
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs
index eb35345e..31a16d05 100644
--- a/tokio-util/src/lib.rs
+++ b/tokio-util/src/lib.rs
@@ -53,6 +53,9 @@ pub mod sync;
pub mod either;
+#[cfg(feature = "time")]
+pub mod time;
+
#[cfg(any(feature = "io", feature = "codec"))]
mod util {
use tokio::io::{AsyncRead, ReadBuf};
diff --git a/tokio/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs
index 910f75fb..b23c24e6 100644
--- a/tokio/src/time/delay_queue.rs
+++ b/tokio-util/src/time/delay_queue.rs
@@ -5,7 +5,9 @@
//! [`DelayQueue`]: struct@DelayQueue
use crate::time::wheel::{self, Wheel};
-use crate::time::{sleep_until, Delay, Duration, Error, Instant};
+
+use futures_core::ready;
+use tokio::time::{sleep_until, Delay, Duration, Error, Instant};
use slab::Slab;
use std::cmp;
@@ -50,8 +52,8 @@ use std::task::{self, Poll};
///
/// # Implementation
///
-/// The [`DelayQueue`] is backed by a separate instance of the same timer wheel used internally by
-/// Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
+/// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally
+/// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
/// performance and scalability benefits.
///
/// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation,
@@ -65,7 +67,8 @@ use std::task::{self, Poll};
/// Using `DelayQueue` to manage cache entries.
///
/// ```rust,no_run
-/// use tokio::time::{delay_queue, DelayQueue, Error};
+/// use tokio::time::Error;
+/// use tokio_util::time::{DelayQueue, delay_queue};
///
/// use futures::ready;
/// use std::collections::HashMap;
@@ -118,7 +121,7 @@ use std::task::{self, Poll};
/// [`poll_expired`]: method@Self::poll_expired
/// [`Stream::poll_expired`]: method@Self::poll_expired
/// [`DelayQueue`]: struct@DelayQueue
-/// [`sleep`]: fn@super::sleep
+/// [`sleep`]: fn@tokio::time::sleep
/// [`slab`]: slab
/// [`capacity`]: method@Self::capacity
/// [`reserve`]: method@Self::reserve
@@ -210,7 +213,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
- /// # use tokio::time::DelayQueue;
+ /// # use tokio_util::time::DelayQueue;
/// let delay_queue: DelayQueue<u32> = DelayQueue::new();
/// ```
pub fn new() -> DelayQueue<T> {
@@ -226,7 +229,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
- /// # use tokio::time::DelayQueue;
+ /// # use tokio_util::time::DelayQueue;
/// # use std::time::Duration;
///
/// # #[tokio::main]
@@ -281,7 +284,8 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
- /// use tokio::time::{DelayQueue, Duration, Instant};
+ /// use tokio::time::{Duration, Instant};
+ /// use tokio_util::time::DelayQueue;
///
/// # #[tokio::main]
/// # async fn main() {
@@ -391,7 +395,7 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
@@ -460,7 +464,7 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
@@ -503,7 +507,8 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
- /// use tokio::time::{DelayQueue, Duration, Instant};
+ /// use tokio::time::{Duration, Instant};
+ /// use tokio_util::time::DelayQueue;
///
/// # #[tokio::main]
/// # async fn main() {
@@ -559,7 +564,7 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
@@ -589,7 +594,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
@@ -617,7 +622,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
///
/// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
/// assert_eq!(delay_queue.capacity(), 10);
@@ -631,7 +636,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
@@ -666,7 +671,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
@@ -691,7 +696,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```
- /// use tokio::time::DelayQueue;
+ /// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
diff --git a/tokio-util/src/time/mod.rs b/tokio-util/src/time/mod.rs
new file mode 100644
index 00000000..c6c8799d
--- /dev/null
+++ b/tokio-util/src/time/mod.rs
@@ -0,0 +1,47 @@
+//! Additional utilities for tracking time.
+//!
+//! This module provides additional utilities for executing code after a set period
+//! of time. Currently there is only one:
+//!
+//! * `DelayQueue`: A queue where items are returned once the requested delay
+//! has expired.
+//!
+//! This type must be used from within the context of the `Runtime`.
+
+use std::time::Duration;
+
+mod wheel;
+
+#[doc(inline)]
+pub mod delay_queue;
+
+pub use delay_queue::DelayQueue;
+
+// ===== Internal utils =====
+
+enum Round {
+ Up,
+ Down,
+}
+
+/// Convert a `Duration` to milliseconds, rounding up and saturating at
+/// `u64::MAX`.
+///
+/// The saturating is fine because `u64::MAX` milliseconds are still many
+/// million years.
+#[inline]
+fn ms(duration: Duration, round: Round) -> u64 {
+ const NANOS_PER_MILLI: u32 = 1_000_000;
+ const MILLIS_PER_SEC: u64 = 1_000;
+
+ // Round up.
+ let millis = match round {
+ Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI,
+ Round::Down => duration.subsec_millis(),
+ };
+
+ duration
+ .as_secs()
+ .saturating_mul(MILLIS_PER_SEC)
+ .saturating_add(u64::from(millis))
+}
diff --git a/tokio-util/src/time/wheel/level.rs b/tokio-util/src/time/wheel/level.rs
new file mode 100644
index 00000000..49f9bfb9
--- /dev/null
+++ b/tokio-util/src/time/wheel/level.rs
@@ -0,0 +1,255 @@
+use crate::time::wheel::Stack;
+
+use std::fmt;
+
+/// Wheel for a single level in the timer. This wheel contains 64 slots.
+pub(crate) struct Level<T> {
+ level: usize,
+
+ /// Bit field tracking which slots currently contain entries.
+ ///
+ /// Using a bit field to track slots that contain entries allows avoiding a
+ /// scan to find entries. This field is updated when entries are added or
+ /// removed from a slot.
+ ///
+ /// The least-significant bit represents slot zero.
+ occupied: u64,
+
+ /// Slots
+ slot: [T; LEVEL_MULT],
+}
+
+/// Indicates when a slot must be processed next.
+#[derive(Debug)]
+pub(crate) struct Expiration {
+ /// The level containing the slot.
+ pub(crate) level: usize,
+
+ /// The slot index.
+ pub(crate) slot: usize,
+
+ /// The instant at which the slot needs to be processed.
+ pub(crate) deadline: u64,
+}
+
+/// Level multiplier.
+///
+/// Being a power of 2 is very important.
+const LEVEL_MULT: usize = 64;
+
+impl<T: Stack> Level<T> {
+ pub(crate) fn new(level: usize) -> Level<T> {
+ // Rust's derived implementations for arrays require that the value
+ // contained by the array be `Copy`. So, here we have to manually
+ // initialize every single slot.
+ macro_rules! s {
+ () => {
+ T::default()
+ };
+ };
+
+ Level {
+ level,
+ occupied: 0,
+ slot: [
+ // It does not look like the necessary traits are
+ // derived for [T; 64].
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ s!(),
+ ],
+ }
+ }
+
+ /// Finds the slot that needs to be processed next and returns the slot and
+ /// `Instant` at which this slot must be processed.
+ pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
+ // Use the `occupied` bit field to get the index of the next slot that
+ // needs to be processed.
+ let slot = match self.next_occupied_slot(now) {
+ Some(slot) => slot,
+ None => return None,
+ };
+
+ // From the slot index, calculate the `Instant` at which it needs to be
+ // processed. This value *must* be in the future with respect to `now`.
+
+ let level_range = level_range(self.level);
+ let slot_range = slot_range(self.level);
+
+ // TODO: This can probably be simplified w/ power of 2 math
+ let level_start = now - (now % level_range);
+ let deadline = level_start + slot as u64 * slot_range;
+
+ debug_assert!(
+ deadline >= now,
+ "deadline={}; now={}; level={}; slot={}; occupied={:b}",
+ deadline,
+ now,
+ self.level,
+ slot,
+ self.occupied
+ );
+
+ Some(Expiration {
+ level: self.level,
+ slot,
+ deadline,
+ })
+ }
+
+ fn next_occupied_slot(&self, now: u64) -> Option<usize> {
+ if self.occupied == 0 {
+ return None;
+ }
+
+ // Get the slot for now using Maths
+ let now_slot = (now / slot_range(self.level)) as usize;
+ let occupied = self.occupied.rotate_right(now_slot as u32);
+ let zeros = occupied.trailing_zeros() as usize;
+ let slot = (zeros + now_slot) % 64;
+
+ Some(slot)
+ }
+
+ pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) {
+ let slot = slot_for(when, self.level);
+
+ self.slot[slot].push(item, store);
+ self.occupied |= occupied_bit(slot);
+ }
+
+ pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) {
+ let slot = slot_for(when, self.level);
+
+ self.slot[slot].remove(item, store);
+
+ if self.slot[slot].is_empty() {
+ // The bit is currently set
+ debug_assert!(self.occupied & occupied_bit(slot) != 0);
+
+ // Unset the bit
+ self.occupied ^= occupied_bit(slot);
+ }
+ }
+
+ pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option<T::Owned> {
+ let ret = self.slot[slot].pop(store);
+
+ if ret.is_some() && self.slot[slot].is_empty() {
+ // The bit is currently set
+ debug_assert!(self.occupied & occupied_bit(slot) != 0);
+
+ self.occupied ^= occupied_bit(slot);
+ }
+
+ ret
+ }
+}
+
+impl<T> fmt::Debug for Level<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Level")
+ .field("occupied", &self.occupied)
+ .finish()
+ }
+}
+
+fn occupied_bit(slot: usize) -> u64 {
+ 1 << slot
+}
+
+fn slot_range(level: usize) -> u64 {
+ LEVEL_MULT.pow(level as u32) as u64
+}
+
+fn level_range(level: usize) -> u64 {
+ LEVEL_MULT as u64 * slot_range(level)
+}
+
+/// Convert a duration (milliseconds) and a level to a slot position
+fn slot_for(duration: u64, level: usize) -> usize {
+ ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
+}
+
+/*
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_slot_for() {
+ for pos in 1..64 {
+ assert_eq!(pos as usize, slot_for(pos, 0));
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(pos as usize, slot_for(a as u64, level));
+ }
+ }
+ }
+}
+*/
diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs
new file mode 100644
index 00000000..a2ef27fc
--- /dev/null
+++ b/tokio-util/src/time/wheel/mod.rs
@@ -0,0 +1,314 @@
+mod level;
+pub(crate) use self::level::Expiration;
+use self::level::Level;
+
+mod stack;
+pub(crate) use self::stack::Stack;
+
+use std::borrow::Borrow;
+use std::usize;
+
+/// Timing wheel implementation.
+///
+/// This type provides the hashed timing wheel implementation that backs `Timer`
+/// and `DelayQueue`.
+///
+/// The structure is generic over `T: Stack`. This allows handling timeout data
+/// being stored on the heap or in a slab. In order to support the latter case,
+/// the slab must be passed into each function allowing the implementation to
+/// lookup timer entries.
+///
+/// See `Timer` documentation for some implementation notes.
+#[derive(Debug)]
+pub(crate) struct Wheel<T> {
+ /// The number of milliseconds elapsed since the wheel started.
+ elapsed: u64,
+
+ /// Timer wheel.
+ ///
+ /// Levels:
+ ///
+ /// * 1 ms slots / 64 ms range
+ /// * 64 ms slots / ~ 4 sec range
+ /// * ~ 4 sec slots / ~ 4 min range
+ /// * ~ 4 min slots / ~ 4 hr range
+ /// * ~ 4 hr slots / ~ 12 day range
+ /// * ~ 12 day slots / ~ 2 yr range
+ levels: Vec<Level<T>>,
+}
+
+/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
+/// each, the timer is able to track time up to 2 years into the future with a
+/// precision of 1 millisecond.
+const NUM_LEVELS: usize = 6;
+
+/// The maximum duration of a delay
+const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
+
+#[derive(Debug)]
+pub(crate) enum InsertError {
+ Elapsed,
+ Invalid,
+}
+
+/// Poll expirations from the wheel
+#[derive(Debug, Default)]
+pub(crate) struct Poll {
+ now: u64,
+ expiration: Option<Expiration>,
+}
+
+impl<T> Wheel<T>
+where
+ T: Stack,
+{
+ /// Create a new timing wheel
+ pub(crate) fn new() -> Wheel<T> {
+ let levels = (0..NUM_LEVELS).map(Level::new).collect();
+
+ Wheel { elapsed: 0, levels }
+ }
+
+ /// Return the number of milliseconds that have elapsed since the timing
+ /// wheel's creation.
+ pub(crate) fn elapsed(&self) -> u64 {
+ self.elapsed
+ }
+
+ /// Insert an entry into the timing wheel.
+ ///
+ /// # Arguments
+ ///
+ /// * `when`: is the instant at which the entry should be fired. It is
+ /// represented as the number of milliseconds since the creation
+ /// of the timing wheel.
+ ///
+ /// * `item`: The item to insert into the wheel.
+ ///
+ /// * `store`: The slab or `()` when using heap storage.
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
+ ///
+ /// `Err(Elapsed)` indicates that `when` represents an instant that has
+ /// already passed. In this case, the caller should fire the timeout
+ /// immediately.
+ ///
+ /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
+ pub(crate) fn insert(
+ &mut self,
+ when: u64,
+ item: T::Owned,
+ store: &mut T::Store,
+ ) -> Result<(), (T::Owned, InsertError)> {
+ if when <= self.elapsed {
+ return Err((item, InsertError::Elapsed));
+ } else if when - self.elapsed > MAX_DURATION {
+ return Err((item, InsertError::Invalid));
+ }
+
+ // Get the level at which the entry should be stored
+ let level = self.level_for(when);
+
+ self.levels[level].add_entry(when, item, store);
+
+ debug_assert!({
+ self.levels[level]
+ .next_expiration(self.elapsed)
+ .map(|e| e.deadline >= self.elapsed)
+ .unwrap_or(true)
+ });
+
+ Ok(())
+ }
+
+ /// Remove `item` from thee timing wheel.
+ pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
+ let when = T::when(item, store);
+ let level = self.level_for(when);
+
+ self.levels[level].remove_entry(when, item, store);
+ }
+
+ /// Instant at which to poll
+ pub(crate) fn poll_at(&self) -> Option<u64> {
+ self.next_expiration().map(|expiration| expiration.deadline)
+ }
+
+ pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> {
+ loop {
+ if poll.expiration.is_none() {
+ poll.expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > poll.now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
+ }
+
+ match poll.expiration {
+ Some(ref expiration) => {
+ if let Some(item) = self.poll_expiration(expiration, store) {
+ return Some(item);
+ }
+
+ self.set_elapsed(expiration.deadline);
+ }
+ None => {
+ self.set_elapsed(poll.now);
+ return None;
+ }
+ }
+
+ poll.expiration = None;
+ }
+ }
+
+ /// Returns the instant at which the next timeout expires.
+ fn next_expiration(&self) -> Option<Expiration> {
+ // Check all levels
+ for level in 0..NUM_LEVELS {
+ if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
+ // There cannot be any expirations at a higher level that happen
+ // before this one.
+ debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
+
+ return Some(expiration);
+ }
+ }
+
+ None
+ }
+
+ /// Used for debug assertions
+ fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
+ let mut res = true;
+
+ for l2 in start_level..NUM_LEVELS {
+ if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
+ if e2.deadline < before {
+ res = false;
+ }
+ }
+ }
+
+ res
+ }
+
+ pub(crate) fn poll_expiration(
+ &mut self,
+ expiration: &Expiration,
+ store: &mut T::Store,
+ ) -> Option<T::Owned> {
+ while let Some(item) = self.pop_entry(expiration, store) {
+ if expiration.level == 0 {
+ debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
+
+ return Some(item);
+ } else {
+ let when = T::when(item.borrow(), store);
+
+ let next_level = expiration.level - 1;
+
+ self.levels[next_level].add_entry(when, item, store);
+ }
+ }
+
+ None
+ }
+
+ fn set_elapsed(&mut self, when: u64) {
+ assert!(
+ self.elapsed <= when,
+ "elapsed={:?}; when={:?}",
+ self.elapsed,
+ when
+ );
+
+ if when > self.elapsed {
+ self.elapsed = when;
+ }
+ }
+
+ fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
+ self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
+ }
+
+ fn level_for(&self, when: u64) -> usize {
+ level_for(self.elapsed, when)
+ }
+}
+
+fn level_for(elapsed: u64, when: u64) -> usize {
+ let masked = elapsed ^ when;
+
+ assert!(masked != 0, "elapsed={}; when={}", elapsed, when);
+
+ let leading_zeros = masked.leading_zeros() as usize;
+ let significant = 63 - leading_zeros;
+ significant / 6
+}
+
+impl Poll {
+ pub(crate) fn new(now: u64) -> Poll {
+ Poll {
+ now,
+ expiration: None,
+ }
+ }
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_level_for() {
+ for pos in 1..64 {
+ assert_eq!(
+ 0,
+ level_for(0, pos),
+ "level_for({}) -- binary = {:b}",
+ pos,
+ pos
+ );
+ }
+
+ for level in 1..5 {
+ for pos in level..64 {
+ let a = pos * 64_usize.pow(level as u32);
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+
+ if pos > level {
+ let a = a - 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+
+ if pos < 64 {
+ let a = a + 1;
+ assert_eq!(
+ level,
+ level_for(0, a as u64),
+ "level_for({}) -- binary = {:b}",
+ a,
+ a
+ );
+ }
+ }
+ }
+ }
+}
diff --git a/tokio-util/src/time/wheel/stack.rs b/tokio-util/src/time/wheel/stack.rs
new file mode 100644
index 00000000..6e55c38c
--- /dev/null
+++ b/tokio-util/src/time/wheel/stack.rs
@@ -0,0 +1,26 @@
+use std::borrow::Borrow;
+
+/// Abstracts the stack operations needed to track timeouts.
+pub(crate) trait Stack: Default {
+ /// Type of the item stored in the stack
+ type Owned: Borrow<Self::Borrowed>;
+
+ /// Borrowed item
+ type Borrowed;
+
+ /// Item storage, this allows a slab to be used instead of just the heap
+ type Store;
+
+ /// Returns `true` if the stack is empty
+ fn is_empty(&self) -> bool;
+
+ /// Push an item onto the stack
+ fn push(&mut self, item: Self::Owned, store: &mut Self::Store);
+
+ /// Pop an item from the stack
+ fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned>;
+
+ fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store);
+
+ fn when(item: &Self::Borrowed, store: &Self::Store) -> u64;
+}
diff --git a/tokio/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs
index d4878b91..73951e7c 100644
--- a/tokio/tests/time_delay_queue.rs
+++ b/tokio-util/tests/time_delay_queue.rs
@@ -2,8 +2,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::time::{self, sleep, DelayQueue, Duration, Instant};
+use tokio::time::{self, sleep, Duration, Instant};
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
+use tokio_util::time::DelayQueue;
macro_rules! poll {
($queue:ident) => {
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index fdce440f..6d8377c2 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -83,7 +83,7 @@ stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["lazy_static", "mio/tcp", "mio/os-poll"]
-time = ["slab"]
+time = []
udp = ["lazy_static", "mio/udp", "mio/os-poll"]
uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"]
@@ -101,7 +101,7 @@ memchr = { version = "2.2", optional = true }
mio = { version = "0.7.2", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
-slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
+slab = { version = "0.4.1", optional = true }
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full
[target.'cfg(unix)'.dependencies]
diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs
index c8c797d9..a68e11b5 100644
--- a/tokio/src/time/mod.rs
+++ b/tokio/src/time/mod.rs
@@ -14,9 +14,6 @@
//! of time it is allowed to execute. If the future or stream does not
//! complete in time, then it is canceled and an error is returned.
//!
-//! * `DelayQueue`: A queue where items are returned once the requested delay
-//! has expired.
-//!
//! These types are sufficient for handling a large number of scenarios
//! involving time.
//!
@@ -96,10 +93,6 @@ pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};
-pub mod delay_queue;
-#[doc(inline)]
-pub use delay_queue::DelayQueue;
-
mod delay;
pub use delay::{sleep, sleep_until, Delay};