diff options
-rw-r--r-- | tokio-util/src/time/delay_queue.rs | 8 | ||||
-rw-r--r-- | tokio-util/src/time/wheel/mod.rs | 49 | ||||
-rw-r--r-- | tokio/src/time/delay.rs | 44 | ||||
-rw-r--r-- | tokio/src/time/driver/entry.rs | 20 | ||||
-rw-r--r-- | tokio/src/time/driver/mod.rs | 29 | ||||
-rw-r--r-- | tokio/src/time/driver/registration.rs | 56 | ||||
-rw-r--r-- | tokio/src/time/driver/stack.rs | 121 | ||||
-rw-r--r-- | tokio/src/time/tests/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/time/wheel/level.rs | 174 | ||||
-rw-r--r-- | tokio/src/time/wheel/mod.rs | 101 | ||||
-rw-r--r-- | tokio/src/time/wheel/stack.rs | 120 |
11 files changed, 319 insertions, 409 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index b23c24e6..92c922b8 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -141,7 +141,7 @@ pub struct DelayQueue<T> { delay: Option<Delay>, /// Wheel polling state - poll: wheel::Poll, + wheel_now: u64, /// Instant at which the timer starts start: Instant, @@ -251,7 +251,7 @@ impl<T> DelayQueue<T> { slab: Slab::with_capacity(capacity), expired: Stack::default(), delay: None, - poll: wheel::Poll::new(0), + wheel_now: 0, start: Instant::now(), } } @@ -733,11 +733,11 @@ impl<T> DelayQueue<T> { let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); - self.poll = wheel::Poll::new(now); + self.wheel_now = now; } // We poll the wheel to get the next value out before finding the next deadline. - let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab); + let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); self.delay = self.next_deadline().map(sleep_until); diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs index a2ef27fc..478037a3 100644 --- a/tokio-util/src/time/wheel/mod.rs +++ b/tokio-util/src/time/wheel/mod.rs @@ -51,13 +51,6 @@ pub(crate) enum InsertError { Invalid, } -/// Poll expirations from the wheel -#[derive(Debug, Default)] -pub(crate) struct Poll { - now: u64, - expiration: Option<Expiration>, -} - impl<T> Wheel<T> where T: Stack, @@ -136,19 +129,18 @@ where self.next_expiration().map(|expiration| expiration.deadline) } - pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> { + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> { loop { - if poll.expiration.is_none() { - poll.expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > poll.now { - None - } else { - Some(expiration) - } - }); - } + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); - match poll.expiration { + match expiration { Some(ref expiration) => { if let Some(item) = self.poll_expiration(expiration, store) { return Some(item); @@ -157,12 +149,14 @@ where self.set_elapsed(expiration.deadline); } None => { - self.set_elapsed(poll.now); + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); return None; } } - - poll.expiration = None; } } @@ -197,6 +191,10 @@ where res } + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// return it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). pub(crate) fn poll_expiration( &mut self, expiration: &Expiration, @@ -251,15 +249,6 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / 6 } -impl Poll { - pub(crate) fn new(now: u64) -> Poll { - Poll { - now, - expiration: None, - } - } -} - #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs index 42ae4b08..9364860d 100644 --- a/tokio/src/time/delay.rs +++ b/tokio/src/time/delay.rs @@ -1,8 +1,9 @@ -use crate::time::driver::Registration; -use crate::time::{Duration, Instant}; +use crate::time::driver::{Entry, Handle}; +use crate::time::{Duration, Error, Instant}; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{self, Poll}; /// Waits until `deadline` is reached. @@ -16,8 +17,7 @@ use std::task::{self, Poll}; /// Canceling a delay is done by dropping the returned future. No additional /// cleanup work is required. pub fn sleep_until(deadline: Instant) -> Delay { - let registration = Registration::new(deadline, Duration::from_millis(0)); - Delay { registration } + Delay::new_timeout(deadline, Duration::from_millis(0)) } /// Waits until `duration` has elapsed. @@ -63,25 +63,27 @@ pub struct Delay { /// The link between the `Delay` instance and the timer that drives it. /// /// This also stores the `deadline` value. - registration: Registration, + entry: Arc<Entry>, } impl Delay { pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay { - let registration = Registration::new(deadline, duration); - Delay { registration } + let handle = Handle::current(); + let entry = Entry::new(&handle, deadline, duration); + + Delay { entry } } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.registration.deadline() + self.entry.time_ref().deadline } /// Returns `true` if the `Delay` has elapsed /// /// A `Delay` is elapsed when the requested duration has elapsed. pub fn is_elapsed(&self) -> bool { - self.registration.is_elapsed() + self.entry.is_elapsed() } /// Resets the `Delay` instance to a new deadline. @@ -92,7 +94,21 @@ impl Delay { /// This function can be called both before and after the future has /// completed. pub fn reset(&mut self, deadline: Instant) { - self.registration.reset(deadline); + unsafe { + self.entry.time_mut().deadline = deadline; + } + + Entry::reset(&mut self.entry); + } + + fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + self.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) } } @@ -109,9 +125,15 @@ impl Future for Delay { // Both cases are extremely rare, and pretty accurately fit into // "logic errors", so we just panic in this case. A user couldn't // really do much better if we passed the error onwards. - match ready!(self.registration.poll_elapsed(cx)) { + match ready!(self.poll_elapsed(cx)) { Ok(()) => Poll::Ready(()), Err(e) => panic!("timer error: {}", e), } } } + +impl Drop for Delay { + fn drop(&mut self) { + Entry::cancel(&self.entry); + } +} diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 974465c1..20f8e1c6 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -83,7 +83,7 @@ pub(crate) struct Entry { /// Next entry in the State's linked list. /// /// This is only accessed by the timer - pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>, + pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>, /// Previous entry in the State's linked list. /// @@ -91,7 +91,7 @@ pub(crate) struct Entry { /// entry. /// /// This is a weak reference. - pub(super) prev_stack: UnsafeCell<*const Entry>, + pub(crate) prev_stack: UnsafeCell<*const Entry>, } /// Stores the info for `Delay`. @@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX; impl Entry { pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> { let inner = handle.inner().unwrap(); - let entry: Entry; - // Increment the number of active timeouts - if let Err(err) = inner.increment() { - entry = Entry::new2(deadline, duration, Weak::new(), ERROR); + // Attempt to increment the number of active timeouts + let entry = if let Err(err) = inner.increment() { + let entry = Entry::new2(deadline, duration, Weak::new(), ERROR); entry.error(err); + entry } else { let when = inner.normalize_deadline(deadline); let state = if when <= inner.elapsed() { @@ -125,8 +125,8 @@ impl Entry { } else { when }; - entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state); - } + Entry::new2(deadline, duration, Arc::downgrade(&inner), state) + }; let entry = Arc::new(entry); if let Err(err) = inner.queue(&entry) { @@ -147,6 +147,10 @@ impl Entry { &mut *self.time.0.get() } + pub(crate) fn when(&self) -> u64 { + self.when_internal().expect("invalid internal state") + } + /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub(crate) fn when_internal(&self) -> Option<u64> { diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index bb6c28b3..5ece7c72 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -9,12 +9,6 @@ pub(super) use self::entry::Entry; mod handle; pub(crate) use self::handle::Handle; -mod registration; -pub(crate) use self::registration::Registration; - -mod stack; -use self::stack::Stack; - use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; @@ -73,7 +67,7 @@ use std::{cmp, fmt}; /// When the timer processes entries at level zero, it will notify all the /// `Delay` instances as their deadlines have been reached. For all higher /// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will +/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will /// either be canceled (dropped) or their associated entries will reach level /// zero and be notified. /// @@ -87,7 +81,7 @@ pub(crate) struct Driver<T: Park> { inner: Arc<Inner>, /// Timer wheel - wheel: wheel::Wheel<Stack>, + wheel: wheel::Wheel, /// Thread parker. The `Driver` park implementation delegates to this. park: T, @@ -163,9 +157,8 @@ where self.clock.now() - self.inner.start, crate::time::Round::Down, ); - let mut poll = wheel::Poll::new(now); - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(now) { let when = entry.when_internal().expect("invalid internal entry state"); // Fire the entry @@ -193,7 +186,7 @@ where self.clear_entry(&entry); } (None, Some(when)) => { - // Queue the entry + // Add the entry to the timer wheel self.add_entry(entry, when); } (Some(_), Some(next)) => { @@ -205,19 +198,17 @@ where } fn clear_entry(&mut self, entry: &Arc<Entry>) { - self.wheel.remove(entry, &mut ()); + self.wheel.remove(entry); entry.set_when_internal(None); } /// Fires the entry if it needs to, otherwise queue it to be processed later. - /// - /// Returns `None` if the entry was fired. fn add_entry(&mut self, entry: Arc<Entry>, when: u64) { use crate::time::wheel::InsertError; entry.set_when_internal(Some(when)); - match self.wheel.insert(when, entry, &mut ()) { + match self.wheel.insert(when, entry) { Ok(_) => {} Err((entry, InsertError::Elapsed)) => { // The entry's deadline has elapsed, so fire it and update the @@ -320,9 +311,9 @@ where self.inner.process.shutdown(); // Clear the wheel, using u64::MAX allows us to drain everything - let mut poll = wheel::Poll::new(u64::MAX); + let end_of_time = u64::MAX; - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(end_of_time) { entry.error(Error::shutdown()); } @@ -387,6 +378,10 @@ impl Inner { debug_assert!(prev <= MAX_TIMEOUTS); } + /// add the entry to the "process queue". entries are not immediately + /// pushed into the timer wheel but are instead pushed into the + /// process queue and then moved from the process queue into the timer + /// wheel on next `process` fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> { if self.process.push(entry)? { // The timer is notified so that it can process the timeout diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs deleted file mode 100644 index 3a0b3450..00000000 --- a/tokio/src/time/driver/registration.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::time::driver::{Entry, Handle}; -use crate::time::{Duration, Error, Instant}; - -use std::sync::Arc; -use std::task::{self, Poll}; - -/// Registration with a timer. -/// -/// The association between a `Delay` instance and a timer is done lazily in -/// `poll` -#[derive(Debug)] -pub(crate) struct Registration { - entry: Arc<Entry>, -} - -impl Registration { - pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { - let handle = Handle::current(); - - Registration { - entry: Entry::new(&handle, deadline, duration), - } - } - - pub(crate) fn deadline(&self) -> Instant { - self.entry.time_ref().deadline - } - - pub(crate) fn reset(&mut self, deadline: Instant) { - unsafe { - self.entry.time_mut().deadline = deadline; - } - - Entry::reset(&mut self.entry); - } - - pub(crate) fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() - } - - pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - self.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } -} - -impl Drop for Registration { - fn drop(&mut self) { - Entry::cancel(&self.entry); - } -} diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs deleted file mode 100644 index 3e2924f2..00000000 --- a/tokio/src/time/driver/stack.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::time::driver::Entry; -use crate::time::wheel; - -use std::ptr; -use std::sync::Arc; - -/// A doubly linked stack -#[derive(Debug)] -pub(crate) struct Stack { - head: Option<Arc<Entry>>, -} - -impl Default for Stack { - fn default() -> Stack { - Stack { head: None } - } -} - -impl wheel::Stack for Stack { - type Owned = Arc<Entry>; - type Borrowed = Entry; - type Store = (); - - fn is_empty(&self) -> bool { - self.head.is_none() - } - - fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) { - // Get a pointer to the entry to for the prev link - let ptr: *const Entry = &*entry as *const _; - - // Remove the old head entry - let old = self.head.take(); - - unsafe { - // Ensure the entry is not already in a stack. - debug_assert!((*entry.next_stack.get()).is_none()); - debug_assert!((*entry.prev_stack.get()).is_null()); - - if let Some(ref entry) = old.as_ref() { - debug_assert!({ - // The head is not already set to the entry - ptr != &***entry as *const _ - }); - - // Set the previous link on the old head - *entry.prev_stack.get() = ptr; - } - - // Set this entry's next pointer - *entry.next_stack.get() = old; - } - - // Update the head pointer - self.head = Some(entry); - } - - /// Pops an item from the stack - fn pop(&mut self, _: &mut ()) -> Option<Arc<Entry>> { - let entry = self.head.take(); - - unsafe { - if let Some(entry) = entry.as_ref() { - self.head = (*entry.next_stack.get()).take(); - - if let Some(entry) = self.head.as_ref() { - *entry.prev_stack.get() = ptr::null(); - } - - *entry.prev_stack.get() = ptr::null(); - } - } - - entry - } - - fn remove(&mut self, entry: &Entry, _: &mut ()) { - unsafe { - // Ensure that the entry is in fact contained by the stack - debug_assert!({ - // This walks the full linked list even if an entry is found. - let mut next = self.head.as_ref(); - let mut contains = false; - - while let Some(n) = next { - if entry as *const _ == &**n as *const _ { - debug_assert!(!contains); - contains = true; - } - - next = (*n.next_stack.get()).as_ref(); - } - - contains - }); - - // Unlink `entry` from the next node - let next = (*entry.next_stack.get()).take(); - - if let Some(next) = next.as_ref() { - (*next.prev_stack.get()) = *entry.prev_stack.get(); - } - - // Unlink `entry` from the prev node - - if let Some(prev) = (*entry.prev_stack.get()).as_ref() { - *prev.next_stack.get() = next; - } else { - // It is the head - self.head = next; - } - - // Unset the prev pointer - *entry.prev_stack.get() = ptr::null(); - } - } - - fn when(item: &Entry, _: &()) -> u64 { - item.when_internal().expect("invalid internal state") - } -} diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs index e112b8e1..a043d65e 100644 --- a/tokio/src/time/tests/mod.rs +++ b/tokio/src/time/tests/mod.rs @@ -8,10 +8,10 @@ fn assert_sync<T: Sync>() {} #[test] fn registration_is_send_and_sync() { - use crate::time::driver::Registration; + use crate::time::delay::Delay; - assert_send::<Registration>(); - assert_sync::<Registration>(); + assert_send::<Delay>(); + assert_sync::<Delay>(); } #[test] diff --git a/tokio/src/time/wheel/level.rs b/tokio/src/time/wheel/level.rs index 49f9bfb9..d51d26a0 100644 --- a/tokio/src/time/wheel/level.rs +++ b/tokio/src/time/wheel/level.rs @@ -1,9 +1,10 @@ +use super::{Item, OwnedItem}; use crate::time::wheel::Stack; use std::fmt; /// Wheel for a single level in the timer. This wheel contains 64 slots. -pub(crate) struct Level<T> { +pub(crate) struct Level { level: usize, /// Bit field tracking which slots currently contain entries. @@ -16,7 +17,7 @@ pub(crate) struct Level<T> { occupied: u64, /// Slots - slot: [T; LEVEL_MULT], + slot: [Stack; LEVEL_MULT], } /// Indicates when a slot must be processed next. @@ -37,87 +38,90 @@ pub(crate) struct Expiration { /// Being a power of 2 is very important. const LEVEL_MULT: usize = 64; -impl<T: Stack> Level<T> { - pub(crate) fn new(level: usize) -> Level<T> { - // Rust's derived implementations for arrays require that the value - // contained by the array be `Copy`. So, here we have to manually - // initialize every single slot. - macro_rules! s { - () => { - T::default() - }; - }; +impl Level { + pub(crate) fn new(level: usize) -> Level { + // A value has to be Copy in order to use syntax like: + // let stack = Stack::default(); + // ... + // slots: [stack; 64], + // + // Alternatively, since Stack is Default one can + // use syntax like: + // let slots: [Stack; 64] = Default::default(); + // + // However, that is only supported for arrays of size + // 32 or fewer. So in our case we have to explicitly + // invoke the constructor for each array element. + let ctor = Stack::default; Level { level, occupied: 0, slot: [ - // It does not look like the necessary traits are - // derived for [T; 64]. - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), ], } } @@ -173,17 +177,17 @@ impl<T: Stack> Level<T> { Some(slot) } - pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) { + pub(crate) fn add_entry(&mut self, when: u64, item: OwnedItem) { let slot = slot_for(when, self.level); - self.slot[slot].push(item, store); + self.slot[slot].push(item); self.occupied |= occupied_bit(slot); } - pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) { + pub(crate) fn remove_entry(&mut self, when: u64, item: &Item) { let slot = slot_for(when, self.level); - self.slot[slot].remove(item, store); + self.slot[slot].remove(item); if self.slot[slot].is_empty() { // The bit is currently set @@ -194,8 +198,8 @@ impl<T: Stack> Level<T> { } } - pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option<T::Owned> { - let ret = self.slot[slot].pop(store); + pub(crate) fn pop_entry_slot(&mut self, slot: usize) -> Option<OwnedItem> { + let ret = self.slot[slot].pop(); if ret.is_some() && self.slot[slot].is_empty() { // The bit is currently set @@ -208,7 +212,7 @@ impl<T: Stack> Level<T> { } } -impl<T> fmt::Debug for Level<T> { +impl fmt::Debug for Level { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Level") .field("occupied", &self.occupied) diff --git a/tokio/src/time/wheel/mod.rs b/tokio/src/time/wheel/mod.rs index a2ef27fc..03861240 100644 --- a/tokio/src/time/wheel/mod.rs +++ b/tokio/src/time/wheel/mod.rs @@ -1,3 +1,5 @@ +use crate::time::driver::Entry; + mod level; pub(crate) use self::level::Expiration; use self::level::Level; @@ -5,9 +7,12 @@ use self::level::Level; mod stack; pub(crate) use self::stack::Stack; -use std::borrow::Borrow; +use std::sync::Arc; use std::usize; +pub(super) type Item = Entry; +pub(super) type OwnedItem = Arc<Item>; + /// Timing wheel implementation. /// /// This type provides the hashed timing wheel implementation that backs `Timer` @@ -20,7 +25,7 @@ use std::usize; /// /// See `Timer` documentation for some implementation notes. #[derive(Debug)] -pub(crate) struct Wheel<T> { +pub(crate) struct Wheel { /// The number of milliseconds elapsed since the wheel started. elapsed: u64, @@ -34,7 +39,7 @@ pub(crate) struct Wheel<T> { /// * ~ 4 min slots / ~ 4 hr range /// * ~ 4 hr slots / ~ 12 day range /// * ~ 12 day slots / ~ 2 yr range - levels: Vec<Level<T>>, + levels: Vec<Level>, } /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots @@ -51,19 +56,9 @@ pub(crate) enum InsertError { Invalid, } -/// Poll expirations from the wheel -#[derive(Debug, Default)] -pub(crate) struct Poll { - now: u64, - expiration: Option<Expiration>, -} - -impl<T> Wheel<T> -where - T: Stack, -{ +impl Wheel { /// Create a new timing wheel - pub(crate) fn new() -> Wheel<T> { + pub(crate) fn new() -> Wheel { let levels = (0..NUM_LEVELS).map(Level::new).collect(); Wheel { elapsed: 0, levels } @@ -99,9 +94,8 @@ where pub(crate) fn insert( &mut self, when: u64, - item: T::Owned, - store: &mut T::Store, - ) -> Result<(), (T::Owned, InsertError)> { + item: OwnedItem, + ) -> Result<(), (OwnedItem, InsertError)> { if when <= self.elapsed { return Err((item, InsertError::Elapsed)); } else if when - self.elapsed > MAX_DURATION { @@ -111,7 +105,7 @@ where // Get the level at which the entry should be stored let level = self.level_for(when); - self.levels[level].add_entry(when, item, store); + self.levels[level].add_entry(when, item); debug_assert!({ self.levels[level] @@ -124,11 +118,11 @@ where } /// Remove `item` from thee timing wheel. - pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) { - let when = T::when(item, store); + pub(crate) fn remove(&mut self, item: &Item) { + let when = item.when(); let level = self.level_for(when); - self.levels[level].remove_entry(when, item, store); + self.levels[level].remove_entry(when, item); } /// Instant at which to poll @@ -136,33 +130,35 @@ where self.next_expiration().map(|expiration| expiration.deadline) } - pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> { + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64) -> Option<OwnedItem> { loop { - if poll.expiration.is_none() { - poll.expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > poll.now { - None - } else { - Some(expiration) - } - }); - } + // under what circumstances is poll.expiration Some vs. None? + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); - match poll.expiration { + match expiration { Some(ref expiration) => { - if let Some(item) = self.poll_expiration(expiration, store) { + if let Some(item) = self.poll_expiration(expiration) { return Some(item); } self.set_elapsed(expiration.deadline); } None => { - self.set_elapsed(poll.now); + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); return None; } } - - poll.expiration = None; } } @@ -197,22 +193,22 @@ where res } - pub(crate) fn poll_expiration( - &mut self, - expiration: &Expiration, - store: &mut T::Store, - ) -> Option<T::Owned> { - while let Some(item) = self.pop_entry(expiration, store) { + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// return it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). + pub(crate) fn poll_expiration(&mut self, expiration: &Expiration) -> Option<OwnedItem> { |