summaryrefslogtreecommitdiffstats
path: root/tokio/src/time/driver
diff options
context:
space:
mode:
authorgreenwoodcm <greenwd@amazon.com>2020-10-06 12:48:01 -0700
committerGitHub <noreply@github.com>2020-10-06 12:48:01 -0700
commitfcdf9345bf19e9a1e1664f01713f9eba54da27c5 (patch)
tree2d5d1a380e2e5bf29979335a553d19665aaaef29 /tokio/src/time/driver
parent4cf45c038b9691f24fac22df13594c2223b185f6 (diff)
time: clean time driver (#2905)
* remove unnecessary wheel::Poll the timer wheel uses the `wheel::Poll` struct as input when advancing the timer to the next time step. the `Poll` struct contains an instant representing the time step to advance to and also contains an optional and mutable reference to an `Expiration` struct. from what I can tell, the latter field is only used in the context of polling the wheel and does not need to be exposed outside of that method. without the expiration field the `Poll` struct is nothing more than a wrapper around the instant being polled. this change removes the `Poll` struct and updates integration points accordingly. * remove Stack trait in favor of concrete Stack implementation * remove timer Registration struct
Diffstat (limited to 'tokio/src/time/driver')
-rw-r--r--tokio/src/time/driver/entry.rs20
-rw-r--r--tokio/src/time/driver/mod.rs29
-rw-r--r--tokio/src/time/driver/registration.rs56
-rw-r--r--tokio/src/time/driver/stack.rs121
4 files changed, 24 insertions, 202 deletions
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index 974465c1..20f8e1c6 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -83,7 +83,7 @@ pub(crate) struct Entry {
/// Next entry in the State's linked list.
///
/// This is only accessed by the timer
- pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
+ pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>,
/// Previous entry in the State's linked list.
///
@@ -91,7 +91,7 @@ pub(crate) struct Entry {
/// entry.
///
/// This is a weak reference.
- pub(super) prev_stack: UnsafeCell<*const Entry>,
+ pub(crate) prev_stack: UnsafeCell<*const Entry>,
}
/// Stores the info for `Delay`.
@@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX;
impl Entry {
pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
let inner = handle.inner().unwrap();
- let entry: Entry;
- // Increment the number of active timeouts
- if let Err(err) = inner.increment() {
- entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
+ // Attempt to increment the number of active timeouts
+ let entry = if let Err(err) = inner.increment() {
+ let entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
entry.error(err);
+ entry
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
@@ -125,8 +125,8 @@ impl Entry {
} else {
when
};
- entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
- }
+ Entry::new2(deadline, duration, Arc::downgrade(&inner), state)
+ };
let entry = Arc::new(entry);
if let Err(err) = inner.queue(&entry) {
@@ -147,6 +147,10 @@ impl Entry {
&mut *self.time.0.get()
}
+ pub(crate) fn when(&self) -> u64 {
+ self.when_internal().expect("invalid internal state")
+ }
+
/// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`.
pub(crate) fn when_internal(&self) -> Option<u64> {
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index bb6c28b3..5ece7c72 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -9,12 +9,6 @@ pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;
-mod registration;
-pub(crate) use self::registration::Registration;
-
-mod stack;
-use self::stack::Stack;
-
use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
use crate::park::{Park, Unpark};
use crate::time::{wheel, Error};
@@ -73,7 +67,7 @@ use std::{cmp, fmt};
/// When the timer processes entries at level zero, it will notify all the
/// `Delay` instances as their deadlines have been reached. For all higher
/// levels, all entries will be redistributed across the wheel at the next level
-/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will
+/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will
/// either be canceled (dropped) or their associated entries will reach level
/// zero and be notified.
///
@@ -87,7 +81,7 @@ pub(crate) struct Driver<T: Park> {
inner: Arc<Inner>,
/// Timer wheel
- wheel: wheel::Wheel<Stack>,
+ wheel: wheel::Wheel,
/// Thread parker. The `Driver` park implementation delegates to this.
park: T,
@@ -163,9 +157,8 @@ where
self.clock.now() - self.inner.start,
crate::time::Round::Down,
);
- let mut poll = wheel::Poll::new(now);
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(now) {
let when = entry.when_internal().expect("invalid internal entry state");
// Fire the entry
@@ -193,7 +186,7 @@ where
self.clear_entry(&entry);
}
(None, Some(when)) => {
- // Queue the entry
+ // Add the entry to the timer wheel
self.add_entry(entry, when);
}
(Some(_), Some(next)) => {
@@ -205,19 +198,17 @@ where
}
fn clear_entry(&mut self, entry: &Arc<Entry>) {
- self.wheel.remove(entry, &mut ());
+ self.wheel.remove(entry);
entry.set_when_internal(None);
}
/// Fires the entry if it needs to, otherwise queue it to be processed later.
- ///
- /// Returns `None` if the entry was fired.
fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
use crate::time::wheel::InsertError;
entry.set_when_internal(Some(when));
- match self.wheel.insert(when, entry, &mut ()) {
+ match self.wheel.insert(when, entry) {
Ok(_) => {}
Err((entry, InsertError::Elapsed)) => {
// The entry's deadline has elapsed, so fire it and update the
@@ -320,9 +311,9 @@ where
self.inner.process.shutdown();
// Clear the wheel, using u64::MAX allows us to drain everything
- let mut poll = wheel::Poll::new(u64::MAX);
+ let end_of_time = u64::MAX;
- while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
+ while let Some(entry) = self.wheel.poll(end_of_time) {
entry.error(Error::shutdown());
}
@@ -387,6 +378,10 @@ impl Inner {
debug_assert!(prev <= MAX_TIMEOUTS);
}
+ /// add the entry to the "process queue". entries are not immediately
+ /// pushed into the timer wheel but are instead pushed into the
+ /// process queue and then moved from the process queue into the timer
+ /// wheel on next `process`
fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
if self.process.push(entry)? {
// The timer is notified so that it can process the timeout
diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs
deleted file mode 100644
index 3a0b3450..00000000
--- a/tokio/src/time/driver/registration.rs
+++ /dev/null
@@ -1,56 +0,0 @@
-use crate::time::driver::{Entry, Handle};
-use crate::time::{Duration, Error, Instant};
-
-use std::sync::Arc;
-use std::task::{self, Poll};
-
-/// Registration with a timer.
-///
-/// The association between a `Delay` instance and a timer is done lazily in
-/// `poll`
-#[derive(Debug)]
-pub(crate) struct Registration {
- entry: Arc<Entry>,
-}
-
-impl Registration {
- pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
- let handle = Handle::current();
-
- Registration {
- entry: Entry::new(&handle, deadline, duration),
- }
- }
-
- pub(crate) fn deadline(&self) -> Instant {
- self.entry.time_ref().deadline
- }
-
- pub(crate) fn reset(&mut self, deadline: Instant) {
- unsafe {
- self.entry.time_mut().deadline = deadline;
- }
-
- Entry::reset(&mut self.entry);
- }
-
- pub(crate) fn is_elapsed(&self) -> bool {
- self.entry.is_elapsed()
- }
-
- pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- self.entry.poll_elapsed(cx).map(move |r| {
- coop.made_progress();
- r
- })
- }
-}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- Entry::cancel(&self.entry);
- }
-}
diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs
deleted file mode 100644
index 3e2924f2..00000000
--- a/tokio/src/time/driver/stack.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-use crate::time::driver::Entry;
-use crate::time::wheel;
-
-use std::ptr;
-use std::sync::Arc;
-
-/// A doubly linked stack
-#[derive(Debug)]
-pub(crate) struct Stack {
- head: Option<Arc<Entry>>,
-}
-
-impl Default for Stack {
- fn default() -> Stack {
- Stack { head: None }
- }
-}
-
-impl wheel::Stack for Stack {
- type Owned = Arc<Entry>;
- type Borrowed = Entry;
- type Store = ();
-
- fn is_empty(&self) -> bool {
- self.head.is_none()
- }
-
- fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) {
- // Get a pointer to the entry to for the prev link
- let ptr: *const Entry = &*entry as *const _;
-
- // Remove the old head entry
- let old = self.head.take();
-
- unsafe {
- // Ensure the entry is not already in a stack.
- debug_assert!((*entry.next_stack.get()).is_none());
- debug_assert!((*entry.prev_stack.get()).is_null());
-
- if let Some(ref entry) = old.as_ref() {
- debug_assert!({
- // The head is not already set to the entry
- ptr != &***entry as *const _
- });
-
- // Set the previous link on the old head
- *entry.prev_stack.get() = ptr;
- }
-
- // Set this entry's next pointer
- *entry.next_stack.get() = old;
- }
-
- // Update the head pointer
- self.head = Some(entry);
- }
-
- /// Pops an item from the stack
- fn pop(&mut self, _: &mut ()) -> Option<Arc<Entry>> {
- let entry = self.head.take();
-
- unsafe {
- if let Some(entry) = entry.as_ref() {
- self.head = (*entry.next_stack.get()).take();
-
- if let Some(entry) = self.head.as_ref() {
- *entry.prev_stack.get() = ptr::null();
- }
-
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- entry
- }
-
- fn remove(&mut self, entry: &Entry, _: &mut ()) {
- unsafe {
- // Ensure that the entry is in fact contained by the stack
- debug_assert!({
- // This walks the full linked list even if an entry is found.
- let mut next = self.head.as_ref();
- let mut contains = false;
-
- while let Some(n) = next {
- if entry as *const _ == &**n as *const _ {
- debug_assert!(!contains);
- contains = true;
- }
-
- next = (*n.next_stack.get()).as_ref();
- }
-
- contains
- });
-
- // Unlink `entry` from the next node
- let next = (*entry.next_stack.get()).take();
-
- if let Some(next) = next.as_ref() {
- (*next.prev_stack.get()) = *entry.prev_stack.get();
- }
-
- // Unlink `entry` from the prev node
-
- if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
- *prev.next_stack.get() = next;
- } else {
- // It is the head
- self.head = next;
- }
-
- // Unset the prev pointer
- *entry.prev_stack.get() = ptr::null();
- }
- }
-
- fn when(item: &Entry, _: &()) -> u64 {
- item.when_internal().expect("invalid internal state")
- }
-}