summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgreenwoodcm <greenwd@amazon.com>2020-10-06 12:48:01 -0700
committerGitHub <noreply@github.com>2020-10-06 12:48:01 -0700
commitfcdf9345bf19e9a1e1664f01713f9eba54da27c5 (patch)
tree2d5d1a380e2e5bf29979335a553d19665aaaef29
parent4cf45c038b9691f24fac22df13594c2223b185f6 (diff)
time: clean time driver (#2905)
* remove unnecessary wheel::Poll the timer wheel uses the `wheel::Poll` struct as input when advancing the timer to the next time step. the `Poll` struct contains an instant representing the time step to advance to and also contains an optional and mutable reference to an `Expiration` struct. from what I can tell, the latter field is only used in the context of polling the wheel and does not need to be exposed outside of that method. without the expiration field the `Poll` struct is nothing more than a wrapper around the instant being polled. this change removes the `Poll` struct and updates integration points accordingly. * remove Stack trait in favor of concrete Stack implementation * remove timer Registration struct
-rw-r--r--tokio-util/src/time/delay_queue.rs8
-rw-r--r--tokio-util/src/time/wheel/mod.rs49
-rw-r--r--tokio/src/time/delay.rs44
-rw-r--r--tokio/src/time/driver/entry.rs20
-rw-r--r--tokio/src/time/driver/mod.rs29
-rw-r--r--tokio/src/time/driver/registration.rs56
-rw-r--r--tokio/src/time/driver/stack.rs121
-rw-r--r--tokio/src/time/tests/mod.rs6
-rw-r--r--tokio/src/time/wheel/level.rs174
-rw-r--r--tokio/src/time/wheel/mod.rs101
-rw-r--r--tokio/src/time/wheel/stack.rs120
11 files changed, 319 insertions, 409 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs
index b23c24e6..92c922b8 100644
--- a/tokio-util/src/time/delay_queue.rs
+++ b/tokio-util/src/time/delay_queue.rs
@@ -141,7 +141,7 @@ pub struct DelayQueue<T> {
delay: Option<Delay>,
/// Wheel polling state
- poll: wheel::Poll,
+ wheel_now: u64,
/// Instant at which the timer starts
start: Instant,
@@ -251,7 +251,7 @@ impl<T> DelayQueue<T> {
slab: Slab::with_capacity(capacity),
expired: Stack::default(),
delay: None,
- poll: wheel::Poll::new(0),
+ wheel_now: 0,
start: Instant::now(),
}
}
@@ -733,11 +733,11 @@ impl<T> DelayQueue<T> {
let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down);
- self.poll = wheel::Poll::new(now);
+ self.wheel_now = now;
}
// We poll the wheel to get the next value out before finding the next deadline.
- let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab);
+ let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab);
self.delay = self.next_deadline().map(sleep_until);
diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs
index a2ef27fc..478037a3 100644
--- a/tokio-util/src/time/wheel/mod.rs
+++ b/tokio-util/src/time/wheel/mod.rs
@@ -51,13 +51,6 @@ pub(crate) enum InsertError {
Invalid,
}
-/// Poll expirations from the wheel
-#[derive(Debug, Default)]
-pub(crate) struct Poll {
- now: u64,
- expiration: Option<Expiration>,
-}
-
impl<T> Wheel<T>
where
T: Stack,
@@ -136,19 +129,18 @@ where
self.next_expiration().map(|expiration| expiration.deadline)
}
- pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> {
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
loop {
- if poll.expiration.is_none() {
- poll.expiration = self.next_expiration().and_then(|expiration| {
- if expiration.deadline > poll.now {
- None
- } else {
- Some(expiration)
- }
- });
- }
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
- match poll.expiration {
+ match expiration {
Some(ref expiration) => {
if let Some(item) = self.poll_expiration(expiration, store) {
return Some(item);
@@ -157,12 +149,14 @@ where
self.set_elapsed(expiration.deadline);
}
None => {
- self.set_elapsed(poll.now);
+ // in this case the poll did not indicate an expiration
+ // _and_ we were not able to find a next expiration in
+ // the current list of timers. advance to the poll's
+ // current time and do nothing else.
+ self.set_elapsed(now);
return None;
}
}
-
- poll.expiration = None;
}
}
@@ -197,6 +191,10 @@ where
res
}
+ /// iteratively find entries that are between the wheel's current
+ /// time and the expiration time. for each in that population either
+ /// return it for notification (in the case of the last level) or tier
+ /// it down to the next level (in all other cases).
pub(crate) fn poll_expiration(
&mut self,
expiration: &Expiration,
@@ -251,15 +249,6 @@ fn level_for(elapsed: u64, when: u64) -> usize {
significant / 6
}
-impl Poll {
- pub(crate) fn new(now: u64) -> Poll {
- Poll {
- now,
- expiration: None,
- }
- }
-}
-
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs
index 42ae4b08..9364860d 100644
--- a/tokio/src/time/delay.rs
+++ b/tokio/src/time/delay.rs
@@ -1,8 +1,9 @@
-use crate::time::driver::Registration;
-use crate::time::{Duration, Instant};
+use crate::time::driver::{Entry, Handle};
+use crate::time::{Duration, Error, Instant};
use std::future::Future;
use std::pin::Pin;
+use std::sync::Arc;
use std::task::{self, Poll};
/// Waits until `deadline` is reached.
@@ -16,8 +17,7 @@ use std::task::{self, Poll};
/// Canceling a delay is done by dropping the returned future. No additional
/// cleanup work is required.
pub fn sleep_until(deadline: Instant) -> Delay {
- let registration = Registration::new(deadline, Duration::from_millis(0));
- Delay { registration }
+ Delay::new_timeout(deadline, Duration::from_millis(0))
}
/// Waits until `duration` has elapsed.
@@ -63,25 +63,27 @@ pub struct Delay {
/// The link between the `Delay` instance and the timer that drives it.
///
/// This also stores the `deadline` value.
- registration: Registration,
+ entry: Arc<Entry>,
}
impl Delay {
pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay {
- let registration = Registration::new(deadline, duration);
- Delay { registration }
+ let handle = Handle::current();
+ let entry = Entry::new(&handle, deadline, duration);
+
+ Delay { entry }
}
/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
- self.registration.deadline()
+ self.entry.time_ref().deadline
}
/// Returns `true` if the `Delay` has elapsed
///
/// A `Delay` is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
- self.registration.is_elapsed()
+ self.entry.is_elapsed()
}
/// Resets the `Delay` instance to a new deadline.
@@ -92,7 +94,21 @@ impl Delay {
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&mut self, deadline: Instant) {
- self.registration.reset(deadline);
+ unsafe {
+ self.entry.time_mut().deadline = deadline;
+ }
+
+ Entry::reset(&mut self.entry);
+ }
+
+ fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ self.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ })
}
}
@@ -109,9 +125,15 @@ impl Future for Delay {
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
- match ready!(self.registration.poll_elapsed(cx)) {
+ match ready!(self.poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}
+
+impl Drop for Delay {
+ fn drop(&mut self) {
+ Entry::cancel(&self.entry);
+ }
+}
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index 974465c1..20f8e1c6 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -83,7 +83,7 @@ pub(crate) struct Entry {
/// Next entry in the State's linked list.
///
/// This is only accessed by the timer
- pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
+ pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>,
/// Previous entry in the State's linked list.
///
@@ -91,7 +91,7 @@ pub(crate) struct Entry {
/// entry.
///
/// This is a weak reference.
- pub(super) prev_stack: UnsafeCell<*const Entry>,
+ pub(crate) prev_stack: UnsafeCell<*const Entry>,
}
/// Stores the info for `Delay`.
@@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX;
impl Entry {
pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
let inner = handle.inner().unwrap();
- let entry: Entry;
- // Increment the number of active timeouts
- if let Err(err) = inner.increment() {
- entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
+ // Attempt to increment the number of active timeouts
+ let entry = if let Err(err) = inner.increment() {
+ let entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
entry.error(err);
+ entry
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
@@ -125,8 +125,8 @@ impl Entry {
} else {
when
};
- entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
- }
+ Entry::new2(deadline, duration, Arc::downgrade(&inner), state)
+ };
let entry = Arc::new(entry);
if let Err(err) = inner.queue(&entry) {
@@ -147,6 +147,10 @@ impl Entry {
&mut *self.time.0.get()
}
+ pub(crate) fn when(&self) -> u64 {
+ self.when_internal().expect("invalid internal state")
+ }
+
/// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`.
pub(crate) fn when_internal(&self) -> Option<u64> {
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index bb6c28b3..5ece7c72 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -9,12 +9,6 @@ pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;
-mod registration;
-pub(crate) use self::registration::Registration;
-
-mod stack;
-use self::stack::Stack;
-
use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
use crate::park::{Park, Unpark};
use crate::time::{wheel, Error};
@@ -73,7 +67,7 @@ use std::{cmp, fmt};
/// When the timer processes entries at level zero, it will notify all the
/// `Delay` instances as their deadlines have been reached. For all higher
/// levels, all entries will be redistributed across the wheel at the next level
-/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will
+/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will
/// either be canceled (dropped) or their associated entries will reach level
/// zero and be notified.
///
@@ -87,7 +81,7 @@ pub(crate) struct Driver<T: Park> {
inner: Arc<Inner>,
/// Timer wheel
- wheel: wheel::Wheel<Stack>,
+ wheel: wheel::Wheel,
/// Thread parker. The `Driver` park implementation delegates to this.
park: T,
@@ -163,9 +157,8 @@ where
self.clock.now() - self.inner.start,
crate::time::Round::Down,
);
- let mut poll = wheel::Poll::new(now);
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(now) {
let when = entry.when_internal().expect("invalid internal entry state");
// Fire the entry
@@ -193,7 +186,7 @@ where
self.clear_entry(&entry);
}
(None, Some(when)) => {
- // Queue the entry
+ // Add the entry to the timer wheel
self.add_entry(entry, when);
}
(Some(_), Some(next)) => {
@@ -205,19 +198,17 @@ where
}
fn clear_entry(&mut self, entry: &Arc<Entry>) {
- self.wheel.remove(entry, &mut ());
+ self.wheel.remove(entry);
entry.set_when_internal(None);
}
/// Fires the entry if it needs to, otherwise queue it to be processed later.
- ///
- /// Returns `None` if the entry was fired.
fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
use crate::time::wheel::InsertError;
entry.set_when_internal(Some(when));
- match self.wheel.insert(when, entry, &mut ()) {
+ match self.wheel.insert(when, entry) {
Ok(_) => {}
Err((entry, InsertError::Elapsed)) => {
// The entry's deadline has elapsed, so fire it and update the
@@ -320,9 +311,9 @@ where
self.inner.process.shutdown();
// Clear the wheel, using u64::MAX allows us to drain everything
- let mut poll = wheel::Poll::new(u64::MAX);
+ let end_of_time = u64::MAX;
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(end_of_time) {
entry.error(Error::shutdown());
}
@@ -387,6 +378,10 @@ impl Inner {
debug_assert!(prev <= MAX_TIMEOUTS);
}
+ /// add the entry to the "process queue". entries are not immediately
+ /// pushed into the timer wheel but are instead pushed into the
+ /// process queue and then moved from the process queue into the timer
+ /// wheel on next `process`
fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
if self.process.push(entry)? {
// The timer is notified so that it can process the timeout
diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs
deleted file mode 100644
index 3a0b3450..00000000
--- a/tokio/src/time/driver/registration.rs
+++ /dev/null
@@ -1,56 +0,0 @@
-use crate::time::driver::{Entry, Handle};
-use crate::time::{Duration, Error, Instant};
-
-use std::sync::Arc;
-use std::task::{self, Poll};
-
-/// Registration with a timer.
-///
-/// The association between a `Delay` instance and a timer is done lazily in
-/// `poll`
-#[derive(Debug)]
-pub(crate) struct Registration {
- entry: Arc<Entry>,
-}
-
-impl Registration {
- pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
- let handle = Handle::current();
-
- Registration {
- entry: Entry::new(&handle, deadline, duration),
- }
- }
-
- pub(crate) fn deadline(&self) -> Instant {
- self.entry.time_ref().deadline
- }
-
- pub(crate) fn reset(&mut self, deadline: Instant) {
- unsafe {
- self.entry.time_mut().deadline = deadline;
- }
-
- Entry::reset(&mut self.entry);
- }
-
- pub(crate) fn is_elapsed(&self) -> bool {
- self.entry.is_elapsed()
- }
-
- pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- self.entry.poll_elapsed(cx).map(move |r| {
- coop.made_progress();
- r
- })
- }
-}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- Entry::cancel(&self.entry);
- }
-}
diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs
deleted file mode 100644
index 3e2924f2..00000000
--- a/tokio/src/time/driver/stack.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-use crate::time::driver::Entry;
-use crate::time::wheel;
-
-use std::ptr;
-use std::sync::Arc;
-
-/// A doubly linked stack
-#[derive(Debug)]
-pub(crate) struct Stack {
- head: Option<Arc<Entry>>,
-}
-
-impl Default for Stack {
- fn default() -> Stack {
- Stack { head: None }
- }
-}
-
-impl wheel::Stack for Stack {
- type Owned = Arc<Entry>;
- type Borrowed = Entry;
- type Store = ();
-
- fn is_empty(&self) -> bool {
- self.head.is_none()
- }
-
- fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) {
- // Get a pointer to the entry to for the prev link
- let ptr: *const Entry = &*entry as *const _;
-
- // Remove the old head entry
- let old = self.head.take();
-
- unsafe {
- // Ensure the entry is not already in a stack.
- debug_assert!((*entry.next_stack.get()).is_none());
- debug_assert!((*entry.prev_stack.get()).is_null());
-
- if let Some(ref entry) = old.as_ref() {
- debug_assert!({
- // The head is not already set to the entry
- ptr != &***entry as *const _
- });
-
- // Set the previous link on the old head
- *entry.prev_stack.get() = ptr;
- }
-
- // Set this entry's next pointer
- *entry.next_stack.get() = old;
- }
-
- // Update the head pointer
- self.head = Some(entry);
- }
-
- /// Pops an item from the stack
- fn pop(&mut self, _: &mut ()) -> Option<Arc<Entry>> {
- let entry = self.head.take();
-
- unsafe {
- if let Some(entry) = entry.as_ref() {
- self.head = (*entry.next_stack.get()).take();
-
- if let Some(entry) = self.head.as_ref() {
- *entry.prev_stack.get() = ptr::null();
- }
-
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- entry
- }
-
- fn remove(&mut self, entry: &Entry, _: &mut ()) {
- unsafe {
- // Ensure that the entry is in fact contained by the stack
- debug_assert!({
- // This walks the full linked list even if an entry is found.
- let mut next = self.head.as_ref();
- let mut contains = false;
-
- while let Some(n) = next {
- if entry as *const _ == &**n as *const _ {
- debug_assert!(!contains);
- contains = true;
- }
-
- next = (*n.next_stack.get()).as_ref();
- }
-
- contains
- });
-
- // Unlink `entry` from the next node
- let next = (*entry.next_stack.get()).take();
-
- if let Some(next) = next.as_ref() {
- (*next.prev_stack.get()) = *entry.prev_stack.get();
- }
-
- // Unlink `entry` from the prev node
-
- if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
- *prev.next_stack.get() = next;
- } else {
- // It is the head
- self.head = next;
- }
-
- // Unset the prev pointer
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- fn when(item: &Entry, _: &()) -> u64 {
- item.when_internal().expect("invalid internal state")
- }
-}
diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs
index e112b8e1..a043d65e 100644
--- a/tokio/src/time/tests/mod.rs
+++ b/tokio/src/time/tests/mod.rs
@@ -8,10 +8,10 @@ fn assert_sync<T: Sync>() {}
#[test]
fn registration_is_send_and_sync() {
- use crate::time::driver::Registration;
+ use crate::time::delay::Delay;
- assert_send::<Registration>();
- assert_sync::<Registration>();
+ assert_send::<Delay>();
+ assert_sync::<Delay>();
}
#[test]
diff --git a/tokio/src/time/wheel/level.rs b/tokio/src/time/wheel/level.rs
index 49f9bfb9..d51d26a0 100644
--- a/tokio/src/time/wheel/level.rs
+++ b/tokio/src/time/wheel/level.rs
@@ -1,9 +1,10 @@
+use super::{Item, OwnedItem};
use crate::time::wheel::Stack;
use std::fmt;
/// Wheel for a single level in the timer. This wheel contains 64 slots.
-pub(crate) struct Level<T> {
+pub(crate) struct Level {
level: usize,
/// Bit field tracking which slots currently contain entries.
@@ -16,7 +17,7 @@ pub(crate) struct Level<T> {
occupied: u64,
/// Slots
- slot: [T; LEVEL_MULT],
+ slot: [Stack; LEVEL_MULT],
}
/// Indicates when a slot must be processed next.
@@ -37,87 +38,90 @@ pub(crate) struct Expiration {
/// Being a power of 2 is very important.
const LEVEL_MULT: usize = 64;
-impl<T: Stack> Level<T> {
- pub(crate) fn new(level: usize) -> Level<T> {
- // Rust's derived implementations for arrays require that the value
- // contained by the array be `Copy`. So, here we have to manually
- // initialize every single slot.
- macro_rules! s {
- () => {
- T::default()
- };
- };
+impl Level {
+ pub(crate) fn new(level: usize) -> Level {
+ // A value has to be Copy in order to use syntax like:
+ // let stack = Stack::default();
+ // ...
+ // slots: [stack; 64],
+ //
+ // Alternatively, since Stack is Default one can
+ // use syntax like:
+ // let slots: [Stack; 64] = Default::default();
+ //
+ // However, that is only supported for arrays of size
+ // 32 or fewer. So in our case we have to explicitly
+ // invoke the constructor for each array element.
+ let ctor = Stack::default;
Level {
level,
occupied: 0,
slot: [
- // It does not look like the necessary traits are
- // derived for [T; 64].
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
- s!(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
+ ctor(),
],
}
}
@@ -173,17 +177,17 @@ impl<T: Stack> Level<T> {
Some(slot)
}
- pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) {
+ pub(crate) fn add_entry(&mut self, when: u64, item: OwnedItem) {
let slot = slot_for(when, self.level);
- self.slot[slot].push(item, store);
+ self.slot[slot].push(item);
self.occupied |= occupied_bit(slot);
}
- pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) {
+ pub(crate) fn remove_entry(&mut self, when: u64, item: &Item) {
let slot = slot_for(when, self.level);
- self.slot[slot].remove(item, store);
+ self.slot[slot].remove(item);
if self.slot[slot].is_empty() {
// The bit is currently set
@@ -194,8 +198,8 @@ impl<T: Stack> Level<T> {
}
}
- pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option<T::Owned> {
- let ret = self.slot[slot].pop(store);
+ pub(crate) fn pop_entry_slot(&mut self, slot: usize) -> Option<OwnedItem> {
+ let ret = self.slot[slot].pop();
if ret.is_some() && self.slot[slot].is_empty() {
// The bit is currently set
@@ -208,7 +212,7 @@ impl<T: Stack> Level<T> {
}
}
-impl<T> fmt::Debug for Level<T> {
+impl fmt::Debug for Level {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Level")
.field("occupied", &self.occupied)
diff --git a/tokio/src/time/wheel/mod.rs b/tokio/src/time/wheel/mod.rs
index a2ef27fc..03861240 100644
--- a/tokio/src/time/wheel/mod.rs
+++ b/tokio/src/time/wheel/mod.rs
@@ -1,3 +1,5 @@
+use crate::time::driver::Entry;
+
mod level;
pub(crate) use self::level::Expiration;
use self::level::Level;
@@ -5,9 +7,12 @@ use self::level::Level;
mod stack;
pub(crate) use self::stack::Stack;
-use std::borrow::Borrow;
+use std::sync::Arc;
use std::usize;
+pub(super) type Item = Entry;
+pub(super) type OwnedItem = Arc<Item>;
+
/// Timing wheel implementation.
///
/// This type provides the hashed timing wheel implementation that backs `Timer`
@@ -20,7 +25,7 @@ use std::usize;
///
/// See `Timer` documentation for some implementation notes.
#[derive(Debug)]
-pub(crate) struct Wheel<T> {
+pub(crate) struct Wheel {
/// The number of milliseconds elapsed since the wheel started.
elapsed: u64,
@@ -34,7 +39,7 @@ pub(crate) struct Wheel<T> {
/// * ~ 4 min slots / ~ 4 hr range
/// * ~ 4 hr slots / ~ 12 day range
/// * ~ 12 day slots / ~ 2 yr range
- levels: Vec<Level<T>>,
+ levels: Vec<Level>,
}
/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
@@ -51,19 +56,9 @@ pub(crate) enum InsertError {
Invalid,
}
-/// Poll expirations from the wheel
-#[derive(Debug, Default)]
-pub(crate) struct Poll {
- now: u64,
- expiration: Option<Expiration>,
-}
-
-impl<T> Wheel<T>
-where
- T: Stack,
-{
+impl Wheel {
/// Create a new timing wheel
- pub(crate) fn new() -> Wheel<T> {
+ pub(crate) fn new() -> Wheel {
let levels = (0..NUM_LEVELS).map(Level::new).collect();
Wheel { elapsed: 0, levels }
@@ -99,9 +94,8 @@ where
pub(crate) fn insert(
&mut self,
when: u64,
- item: T::Owned,
- store: &mut T::Store,
- ) -> Result<(), (T::Owned, InsertError)> {
+ item: OwnedItem,
+ ) -> Result<(), (OwnedItem, InsertError)> {
if when <= self.elapsed {
return Err((item, InsertError::Elapsed));
} else if when - self.elapsed > MAX_DURATION {
@@ -111,7 +105,7 @@ where
// Get the level at which the entry should be stored
let level = self.level_for(when);
- self.levels[level].add_entry(when, item, store);
+ self.levels[level].add_entry(when, item);
debug_assert!({
self.levels[level]
@@ -124,11 +118,11 @@ where
}
/// Remove `item` from thee timing wheel.
- pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
- let when = T::when(item, store);
+ pub(crate) fn remove(&mut self, item: &Item) {
+ let when = item.when();
let level = self.level_for(when);
- self.levels[level].remove_entry(when, item, store);
+ self.levels[level].remove_entry(when, item);
}
/// Instant at which to poll
@@ -136,33 +130,35 @@ where
self.next_expiration().map(|expiration| expiration.deadline)
}
- pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> {
+ /// Advances the timer up to the instant represented by `now`.
+ pub(crate) fn poll(&mut self, now: u64) -> Option<OwnedItem> {
loop {
- if poll.expiration.is_none() {
- poll.expiration = self.next_expiration().and_then(|expiration| {
- if expiration.deadline > poll.now {
- None
- } else {
- Some(expiration)
- }
- });
- }
+ // under what circumstances is poll.expiration Some vs. None?
+ let expiration = self.next_expiration().and_then(|expiration| {
+ if expiration.deadline > now {
+ None
+ } else {
+ Some(expiration)
+ }
+ });
- match poll.expiration {
+ match expiration {
Some(ref expiration) => {
- if let Some(item) = self.poll_expiration(expiration, store) {
+ if let Some(item) = self.poll_expiration(expiration) {
return Some(item);
}