summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio-util/src/time/delay_queue.rs8
-rw-r--r--tokio-util/src/time/wheel/mod.rs49
-rw-r--r--tokio/src/time/delay.rs44
-rw-r--r--tokio/src/time/driver/entry.rs20
-rw-r--r--tokio/src/time/driver/mod.rs29
-rw-r--r--tokio/src/time/driver/registration.rs56
-rw-r--r--tokio/src/time/driver/stack.rs121
-rw-r--r--tokio/src/time/tests/mod.rs6
-rw-r--r--tokio/src/time/wheel/level.rs174
-rw-r--r--tokio/src/time/wheel/mod.rs101
-rw-r--r--tokio/src/time/wheel/stack.rs120
11 files changed, 319 insertions, 409 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs
index b23c24e6..92c922b8 100644
--- a/tokio-util/src/time/delay_queue.rs
+++ b/tokio-util/src/time/delay_queue.rs
@@ -141,7 +141,7 @@ pub struct DelayQueue<T> {
delay: Option<Delay>,
/// Wheel polling state
- poll: wheel::Poll,
+ wheel_now: u64,
/// Instant at which the timer starts
start: Instant,
@@ -251,7 +251,7 @@ impl<T> DelayQueue<T> {
slab: Slab::with_capacity(capacity),
expired: Stack::default(),
delay: None,
- poll: wheel::Poll::new(0),
+ wheel_now: 0,
start: Instant::now(),
}
}
@@ -733,11 +733,11 @@ impl<T> DelayQueue<T> {
let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down);
- self.poll = wheel::Poll::new(now);
+ self.wheel_now = now;
}
// We poll the wheel to get the next value out before finding the next deadline.
- let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab);
+ let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab);
self.delay = self.next_deadline().map(sleep_until);
diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs
index a2ef27fc..478037a3 100644
--- a/tokio-util/src/time/wheel/mod.rs
+++ b/tokio-util/src/time/wheel/mod.rs
@@ -51,13 +51,6 @@ pub(crate) enum InsertError {
Invalid,
}
-/// Poll expirations from the wheel
-#[derive(Debug, Default)]
-pub(crate) struct Poll {
- now: u64,
- expiration: Option<Expiration>,
-}
-
impl<T> Wheel<T>
where
T: Stack,
@@ -136,19 +129,18 @@ where
self.next_expiration().map(|expiration| expiration.deadline)
}
- pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> {
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
loop {
- if poll.expiration.is_none() {
- poll.expiration = self.next_expiration().and_then(|expiration| {
- if expiration.deadline > poll.now {
- None
- } else {
- Some(expiration)
- }
- });
- }
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
- match poll.expiration {
+ match expiration {
Some(ref expiration) => {
if let Some(item) = self.poll_expiration(expiration, store) {
return Some(item);
@@ -157,12 +149,14 @@ where
self.set_elapsed(expiration.deadline);
}
None => {
- self.set_elapsed(poll.now);
+ // in this case the poll did not indicate an expiration
+ // _and_ we were not able to find a next expiration in
+ // the current list of timers. advance to the poll's
+ // current time and do nothing else.
+ self.set_elapsed(now);
return None;
}
}
-
- poll.expiration = None;
}
}
@@ -197,6 +191,10 @@ where
res
}
+ /// iteratively find entries that are between the wheel's current
+ /// time and the expiration time. for each in that population either
+ /// return it for notification (in the case of the last level) or tier
+ /// it down to the next level (in all other cases).
pub(crate) fn poll_expiration(
&mut self,
expiration: &Expiration,
@@ -251,15 +249,6 @@ fn level_for(elapsed: u64, when: u64) -> usize {
significant / 6
}
-impl Poll {
- pub(crate) fn new(now: u64) -> Poll {
- Poll {
- now,
- expiration: None,
- }
- }
-}
-
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs
index 42ae4b08..9364860d 100644
--- a/tokio/src/time/delay.rs
+++ b/tokio/src/time/delay.rs
@@ -1,8 +1,9 @@
-use crate::time::driver::Registration;
-use crate::time::{Duration, Instant};
+use crate::time::driver::{Entry, Handle};
+use crate::time::{Duration, Error, Instant};
use std::future::Future;
use std::pin::Pin;
+use std::sync::Arc;
use std::task::{self, Poll};
/// Waits until `deadline` is reached.
@@ -16,8 +17,7 @@ use std::task::{self, Poll};
/// Canceling a delay is done by dropping the returned future. No additional
/// cleanup work is required.
pub fn sleep_until(deadline: Instant) -> Delay {
- let registration = Registration::new(deadline, Duration::from_millis(0));
- Delay { registration }
+ Delay::new_timeout(deadline, Duration::from_millis(0))
}
/// Waits until `duration` has elapsed.
@@ -63,25 +63,27 @@ pub struct Delay {
/// The link between the `Delay` instance and the timer that drives it.
///
/// This also stores the `deadline` value.
- registration: Registration,
+ entry: Arc<Entry>,
}
impl Delay {
pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay {
- let registration = Registration::new(deadline, duration);
- Delay { registration }
+ let handle = Handle::current();
+ let entry = Entry::new(&handle, deadline, duration);
+
+ Delay { entry }
}
/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
- self.registration.deadline()
+ self.entry.time_ref().deadline
}
/// Returns `true` if the `Delay` has elapsed
///
/// A `Delay` is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
- self.registration.is_elapsed()
+ self.entry.is_elapsed()
}
/// Resets the `Delay` instance to a new deadline.
@@ -92,7 +94,21 @@ impl Delay {
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&mut self, deadline: Instant) {
- self.registration.reset(deadline);
+ unsafe {
+ self.entry.time_mut().deadline = deadline;
+ }
+
+ Entry::reset(&mut self.entry);
+ }
+
+ fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ self.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ })
}
}
@@ -109,9 +125,15 @@ impl Future for Delay {
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
- match ready!(self.registration.poll_elapsed(cx)) {
+ match ready!(self.poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}
+
+impl Drop for Delay {
+ fn drop(&mut self) {
+ Entry::cancel(&self.entry);
+ }
+}
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index 974465c1..20f8e1c6 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -83,7 +83,7 @@ pub(crate) struct Entry {
/// Next entry in the State's linked list.
///
/// This is only accessed by the timer
- pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
+ pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>,
/// Previous entry in the State's linked list.
///
@@ -91,7 +91,7 @@ pub(crate) struct Entry {
/// entry.
///
/// This is a weak reference.
- pub(super) prev_stack: UnsafeCell<*const Entry>,
+ pub(crate) prev_stack: UnsafeCell<*const Entry>,
}
/// Stores the info for `Delay`.
@@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX;
impl Entry {
pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
let inner = handle.inner().unwrap();
- let entry: Entry;
- // Increment the number of active timeouts
- if let Err(err) = inner.increment() {
- entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
+ // Attempt to increment the number of active timeouts
+ let entry = if let Err(err) = inner.increment() {
+ let entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
entry.error(err);
+ entry
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
@@ -125,8 +125,8 @@ impl Entry {
} else {
when
};
- entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
- }
+ Entry::new2(deadline, duration, Arc::downgrade(&inner), state)
+ };
let entry = Arc::new(entry);
if let Err(err) = inner.queue(&entry) {
@@ -147,6 +147,10 @@ impl Entry {
&mut *self.time.0.get()
}
+ pub(crate) fn when(&self) -> u64 {
+ self.when_internal().expect("invalid internal state")
+ }
+
/// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`.
pub(crate) fn when_internal(&self) -> Option<u64> {
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index bb6c28b3..5ece7c72 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -9,12 +9,6 @@ pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;
-mod registration;
-pub(crate) use self::registration::Registration;
-
-mod stack;
-use self::stack::Stack;
-
use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
use crate::park::{Park, Unpark};
use crate::time::{wheel, Error};
@@ -73,7 +67,7 @@ use std::{cmp, fmt};
/// When the timer processes entries at level zero, it will notify all the
/// `Delay` instances as their deadlines have been reached. For all higher
/// levels, all entries will be redistributed across the wheel at the next level
-/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will
+/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will
/// either be canceled (dropped) or their associated entries will reach level
/// zero and be notified.
///
@@ -87,7 +81,7 @@ pub(crate) struct Driver<T: Park> {
inner: Arc<Inner>,
/// Timer wheel
- wheel: wheel::Wheel<Stack>,
+ wheel: wheel::Wheel,
/// Thread parker. The `Driver` park implementation delegates to this.
park: T,
@@ -163,9 +157,8 @@ where
self.clock.now() - self.inner.start,
crate::time::Round::Down,
);
- let mut poll = wheel::Poll::new(now);
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(now) {
let when = entry.when_internal().expect("invalid internal entry state");
// Fire the entry
@@ -193,7 +186,7 @@ where
self.clear_entry(&entry);
}
(None, Some(when)) => {
- // Queue the entry
+ // Add the entry to the timer wheel
self.add_entry(entry, when);
}
(Some(_), Some(next)) => {
@@ -205,19 +198,17 @@ where
}
fn clear_entry(&mut self, entry: &Arc<Entry>) {
- self.wheel.remove(entry, &mut ());
+ self.wheel.remove(entry);
entry.set_when_internal(None);
}
/// Fires the entry if it needs to, otherwise queue it to be processed later.
- ///
- /// Returns `None` if the entry was fired.
fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
use crate::time::wheel::InsertError;
entry.set_when_internal(Some(when));
- match self.wheel.insert(when, entry, &mut ()) {
+ match self.wheel.insert(when, entry) {
Ok(_) => {}
Err((entry, InsertError::Elapsed)) => {
// The entry's deadline has elapsed, so fire it and update the
@@ -320,9 +311,9 @@ where
self.inner.process.shutdown();
// Clear the wheel, using u64::MAX allows us to drain everything
- let mut poll = wheel::Poll::new(u64::MAX);
+ let end_of_time = u64::MAX;
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(end_of_time) {
entry.error(Error::shutdown());
}
@@ -387,6 +378,10 @@ impl Inner {
debug_assert!(prev <= MAX_TIMEOUTS);
}
+ /// add the entry to the "process queue". entries are not immediately
+ /// pushed into the timer wheel but are instead pushed into the
+ /// process queue and then moved from the process queue into the timer
+ /// wheel on next `process`
fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
if self.process.push(entry)? {
// The timer is notified so that it can process the timeout
diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs
deleted file mode 100644
index 3a0b3450..00000000
--- a/tokio/src/time/driver/registration.rs
+++ /dev/null
@@ -1,56 +0,0 @@
-use crate::time::driver::{Entry, Handle};
-use crate::time::{Duration, Error, Instant};
-
-use std::sync::Arc;
-use std::task::{self, Poll};
-
-/// Registration with a timer.
-///
-/// The association between a `Delay` instance and a timer is done lazily in
-/// `poll`
-#[derive(Debug)]
-pub(crate) struct Registration {
- entry: Arc<Entry>,
-}
-
-impl Registration {
- pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
- let handle = Handle::current();
-
- Registration {
- entry: Entry::new(&handle, deadline, duration),
- }
- }
-
- pub(crate) fn deadline(&self) -> Instant {
- self.entry.time_ref().deadline
- }
-
- pub(crate) fn reset(&mut self, deadline: Instant) {
- unsafe {
- self.entry.time_mut().deadline = deadline;
- }
-
- Entry::reset(&mut self.entry);
- }
-
- pub(crate) fn is_elapsed(&self) -> bool {
- self.entry.is_elapsed()
- }
-
- pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- self.entry.poll_elapsed(cx).map(move |r| {
- coop.made_progress();
- r
- })
- }
-}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- Entry::cancel(&self.entry);
- }
-}
diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs
deleted file mode 100644
index 3e2924f2..00000000
--- a/tokio/src/time/driver/stack.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-use crate::time::driver::Entry;
-use crate::time::wheel;
-
-use std::ptr;
-use std::sync::Arc;
-
-/// A doubly linked stack
-#[derive(Debug)]
-pub(crate) struct Stack {
- head: Option<Arc<Entry>>,
-}
-
-impl Default for Stack {
- fn default() -> Stack {
- Stack { head: None }
- }
-}
-
-impl wheel::Stack for Stack {
- type Owned = Arc<Entry>;
- type Borrowed = Entry;
- type Store = ();
-
- fn is_empty(&self) -> bool {
- self.head.is_none()
- }
-
- fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) {
- // Get a pointer to the entry to for the prev link
- let ptr: *const Entry = &*entry as *const _;
-
- // Remove the old head entry
- let old = self.head.take();
-
- unsafe {
- // Ensure the entry is not already in a stack.
- debug_assert!((*entry.next_stack.get()).is_none());
- debug_assert!((*entry.prev_stack.get()).is_null());
-
- if let Some(ref entry) = old.as_ref() {
- debug_assert!({
- // The head is not already set to the entry
- ptr != &***entry as *const _
- });
-
- // Set the previous link on the old head
- *entry.prev_stack.get() = ptr;
- }
-
- // Set this entry's next pointer
- *entry.next_stack.get() = old;
- }
-
- // Update the head pointer
- self.head = Some(entry);
- }
-
- /// Pops an item from the stack
- fn pop(&mut self, _: &mut ()) -> Option<Arc<Entry>> {
- let entry = self.head.take();
-
- unsafe {
- if let Some(entry) = entry.as_ref() {
- self.head = (*entry.next_stack.get()).take();
-
- if let Some(entry) = self.head.as_ref() {
- *entry.prev_stack.get() = ptr::null();
- }
-
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- entry
- }
-
- fn remove(&mut self, entry: &Entry, _: &mut ()) {
- unsafe {
- // Ensure that the entry is in fact contained by the stack
- debug_assert!({
- // This walks the full linked list even if an entry is found.
- let mut next = self.head.as_ref();
- let mut contains = false;
-
- while let Some(n) = next {
- if entry as *const _ == &**n as *const _ {
- debug_assert!(!contains);
- contains = true;
- }
-
- next = (*n.next_stack.get()).as_ref();
- }
-
- contains
- });
-
- // Unlink `entry` from the next node
- let next = (*entry.next_stack.get()).take();
-
- if let Some(next) = next.as_ref() {
- (*next.prev_stack.get()) = *entry.prev_stack.get();
- }
-
- // Unlink `entry` from the prev node
-
- if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
- *prev.next_stack.get() = next;
- } else {
- // It is the head
- self.head = next;
- }
-
- // Unset the prev pointer
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- fn when(item: &Entry, _: &()) -> u64 {
- item.when_internal().expect("invalid internal state")
- }
-}
diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs
index e112b8e1..a043d65e 100644
--- a/tokio/src/time/tests/mod.rs
+++ b/tokio/src/time/tests/mod.rs
@@ -8,10 +8,10 @@ fn assert_sync<T: Sync>() {}
#[test]
fn registration_is_send_and_sync() {
- use crate::time::driver::Registration;
+ use crate::time::delay::Delay;
- assert_send::<Registration>();
- assert_sync::<Registration>();
+ assert_send::<Delay>();
+ assert_sync::<Delay>();
}
#[test]
diff --git a/tokio/src/time/wheel/level.rs b/tokio/src/time/wheel/level.rs
index 49f9bfb9..d51d26a0 100644
--- a/tokio/src/time/wheel/level.rs
+++ b/tokio/src/time/wheel/level.rs
@@ -1,9 +1,10 @@
+use super::{Item, OwnedItem};
use crate::time::wheel::Stack;
use std::fmt;
/// Wheel for a single level in the timer. This wheel contains 64 slots.
-pub(crate) struct Level<T> {
+pub(crate) struct Level {
level: usize,
/// Bit field tracking which slots currently contain entries.
@@ -16,7 +17,7 @@ pub(crate) struct Level<T> {
occupied: u64,
/// Slots
- slot: [T; LEVEL_MULT],
+ slot: [Stack; LEVEL_MULT],
}
/// Indicates when a slot must be processed next.
@@ -37,87 +38,90 @@ pub(crate) struct Expiration {
/// Being a power of 2 is very important.
const LEVEL_MULT: usize = 64;
-impl<T: Stack> Level<T> {
- pub(crate) fn new(level: usize) -> Level<T> {
- // Rust's derived implementations for arrays require that the value
- // contained by the array be `Copy`. So, here we have to manually
- // initialize every single slot.
- macro_rules! s {
- () => {
- T::default()
- };
- };
+impl Level {
+ pub(crate) fn new(level: usize) -> Level {
+ // A value has to be Copy in order to use syntax like:
+ // let stack = Stack::default();
+ // ...
+ // slots: [stack; 64],
+ //
+ // Alternatively, since Stack is Default one can
+ // use syntax like:
+ // let slots: [Stack; 64] = Default::default();
+ //
+ // However, that is only supported for arrays of size
+ // 32 or fewer. So in our case we have to explicitly
+ // invoke the constructor for each array element.
+ let ctor = Stack::default;
Level {
level,
occupied: 0,
slot: [
- // It does not look like the necessary traits are
- // derived for [T; 64].
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
],
}
}
@@ -173,17 +177,17 @@ impl<T: Stack> Level<T> {
Some(slot)
}
- pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) {
+ pub(crate) fn add_entry(&mut self, when: u64, item: OwnedItem) {
let slot = slot_for(when, self.level);
- self.slot[slot].push(item, store);
+ self.slot[slot].push(item);
self.occupied |= occupied_bit(slot);
}
- pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) {
+ pub(crate) fn remove_entry(&mut self, when: u64, item: &Item) {
let slot = slot_for(when, self.level);
- self.slot[slot].remove(item, store);
+ self.slot[slot].remove(item);
if self.slot[slot].is_empty() {
// The bit is currently set
@@ -194,8 +198,8 @@ impl<T: Stack> Level<T> {
}
}
- pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option<T::Owned> {
- let ret = self.slot[slot].pop(store);
+ pub(crate) fn pop_entry_slot(&mut self, slot: usize) -> Option<OwnedItem> {
+ let ret = self.slot[slot].pop();
if ret.is_some() && self.slot[slot].is_empty() {
// The bit is currently set
@@ -208,7 +212,7 @@ impl<T: Stack> Level<T> {
}
}
-impl<T> fmt::Debug for Level<T> {
+impl fmt::Debug for Level {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Level")
.field("occupied", &self.occupied)
diff --git a/tokio/src/time/wheel/mod.rs b/tokio/src/time/wheel/mod.rs
index a2ef27fc..03861240 100644
--- a/tokio/src/time/wheel/mod.rs
+++ b/tokio/src/time/wheel/mod.rs
@@ -1,3 +1,5 @@
+use crate::time::driver::Entry;
+
mod level;
pub(crate) use self::level::Expiration;
use self::level::Level;
@@ -5,9 +7,12 @@ use self::level::Level;
mod stack;
pub(crate) use self::stack::Stack;
-use std::borrow::Borrow;
+use std::sync::Arc;
use std::usize;
+pub(super) type Item = Entry;
+pub(super) type OwnedItem = Arc<Item>;
+
/// Timing wheel implementation.
///
/// This type provides the hashed timing wheel implementation that backs `Timer`
@@ -20,7 +25,7 @@ use std::usize;
///
/// See `Timer` documentation for some implementation notes.
#[derive(Debug)]
-pub(crate) struct Wheel<T> {
+pub(crate) struct Wheel {
/// The number of milliseconds elapsed since the wheel started.
elapsed: u64,
@@ -34,7 +39,7 @@ pub(crate) struct Wheel<T> {
/// * ~ 4 min slots / ~ 4 hr range
/// * ~ 4 hr slots / ~ 12 day range
/// * ~ 12 day slots / ~ 2 yr range
- levels: Vec<Level<T>>,
+ levels: Vec<Level>,
}
/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
@@ -51,19 +56,9 @@ pub(crate) enum InsertError {
Invalid,
}
-/// Poll expirations from the wheel
-#[derive(Debug, Default)]
-pub(crate) struct Poll {
- now: u64,
- expiration: Option<Expiration>,
-}
-
-impl<T> Wheel<T>
-where
- T: Stack,
-{
+impl Wheel {
/// Create a new timing wheel
- pub(crate) fn new() -> Wheel<T> {
+ pub(crate) fn new() -> Wheel {
let levels = (0..NUM_LEVELS).map(Level::new).collect();
Wheel { elapsed: 0, levels }
@@ -99,9 +94,8 @@ where
pub(crate) fn insert(
&mut self,
when: u64,
- item: T::Owned,
- store: &mut T::Store,
- ) -> Result<(), (T::Owned, InsertError)> {
+ item: OwnedItem,
+ ) -> Result<(), (OwnedItem, InsertError)> {
if when <= self.elapsed {
return Err((item, InsertError::Elapsed));
} else if when - self.elapsed > MAX_DURATION {
@@ -111,7 +105,7 @@ where
// Get the level at which the entry should be stored
let level = self.level_for(when);
- self.levels[level].add_entry(when, item, store);
+ self.levels[level].add_entry(when, item);
debug_assert!({
self.levels[level]
@@ -124,11 +118,11 @@ where
}
/// Remove `item` from thee timing wheel.
- pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
- let when = T::when(item, store);
+ pub(crate) fn remove(&mut self, item: &Item) {
+ let when = item.when();
let level = self.level_for(when);
- self.levels[level].remove_entry(when, item, store);
+ self.levels[level].remove_entry(when, item);
}
/// Instant at which to poll
@@ -136,33 +130,35 @@ where
self.next_expiration().map(|expiration| expiration.deadline)
}
- pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> {
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64) -> Option<OwnedItem> {
loop {
- if poll.expiration.is_none() {
- poll.expiration = self.next_expiration().and_then(|expiration| {
- if expiration.deadline > poll.now {
- None
- } else {
- Some(expiration)
- }
- });
- }
+ // under what circumstances is poll.expiration Some vs. None?
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
- match poll.expiration {
+ match expiration {
Some(ref expiration) => {
- if let Some(item) = self.poll_expiration(expiration, store) {
+ if let Some(item) = self.poll_expiration(expiration) {
return Some(item);
}
self.set_elapsed(expiration.deadline);
}
None => {
- self.set_elapsed(poll.now);
+ // in this case the poll did not indicate an expiration
+ // _and_ we were not able to find a next expiration in
+ // the current list of timers. advance to the poll's
+ // current time and do nothing else.
+ self.set_elapsed(now);
return None;
}
}
-
- poll.expiration = None;
}
}
@@ -197,22 +193,22 @@ where
res
}
- pub(crate) fn poll_expiration(
- &mut self,
- expiration: &Expiration,
- store: &mut T::Store,
- ) -> Option<T::Owned> {
- while let Some(item) = self.pop_entry(expiration, store) {
+ /// iteratively find entries that are between the wheel's current
+ /// time and the expiration time. for each in that population either
+ /// return it for notification (in the case of the last level) or tier
+ /// it down to the next level (in all other cases).
+ pub(crate) fn poll_expiration(&mut self, expiration: &Expiration) -> Option<OwnedItem> {