diff options
author | greenwoodcm <greenwd@amazon.com> | 2020-10-06 12:48:01 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-06 12:48:01 -0700 |
commit | fcdf9345bf19e9a1e1664f01713f9eba54da27c5 (patch) | |
tree | 2d5d1a380e2e5bf29979335a553d19665aaaef29 /tokio/src/time/driver | |
parent | 4cf45c038b9691f24fac22df13594c2223b185f6 (diff) |
time: clean time driver (#2905)
* remove unnecessary wheel::Poll
the timer wheel uses the `wheel::Poll` struct as input when
advancing the timer to the next time step. the `Poll` struct
contains an instant representing the time step to advance to
and also contains an optional and mutable reference to an
`Expiration` struct. from what I can tell, the latter field
is only used in the context of polling the wheel and does not
need to be exposed outside of that method. without the
expiration field the `Poll` struct is nothing more than a
wrapper around the instant being polled. this change removes
the `Poll` struct and updates integration points accordingly.
* remove Stack trait in favor of concrete Stack implementation
* remove timer Registration struct
Diffstat (limited to 'tokio/src/time/driver')
-rw-r--r-- | tokio/src/time/driver/entry.rs | 20 | ||||
-rw-r--r-- | tokio/src/time/driver/mod.rs | 29 | ||||
-rw-r--r-- | tokio/src/time/driver/registration.rs | 56 | ||||
-rw-r--r-- | tokio/src/time/driver/stack.rs | 121 |
4 files changed, 24 insertions, 202 deletions
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 974465c1..20f8e1c6 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -83,7 +83,7 @@ pub(crate) struct Entry { /// Next entry in the State's linked list. /// /// This is only accessed by the timer - pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>, + pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>, /// Previous entry in the State's linked list. /// @@ -91,7 +91,7 @@ pub(crate) struct Entry { /// entry. /// /// This is a weak reference. - pub(super) prev_stack: UnsafeCell<*const Entry>, + pub(crate) prev_stack: UnsafeCell<*const Entry>, } /// Stores the info for `Delay`. @@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX; impl Entry { pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> { let inner = handle.inner().unwrap(); - let entry: Entry; - // Increment the number of active timeouts - if let Err(err) = inner.increment() { - entry = Entry::new2(deadline, duration, Weak::new(), ERROR); + // Attempt to increment the number of active timeouts + let entry = if let Err(err) = inner.increment() { + let entry = Entry::new2(deadline, duration, Weak::new(), ERROR); entry.error(err); + entry } else { let when = inner.normalize_deadline(deadline); let state = if when <= inner.elapsed() { @@ -125,8 +125,8 @@ impl Entry { } else { when }; - entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state); - } + Entry::new2(deadline, duration, Arc::downgrade(&inner), state) + }; let entry = Arc::new(entry); if let Err(err) = inner.queue(&entry) { @@ -147,6 +147,10 @@ impl Entry { &mut *self.time.0.get() } + pub(crate) fn when(&self) -> u64 { + self.when_internal().expect("invalid internal state") + } + /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub(crate) fn when_internal(&self) -> Option<u64> { diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index bb6c28b3..5ece7c72 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -9,12 +9,6 @@ pub(super) use self::entry::Entry; mod handle; pub(crate) use self::handle::Handle; -mod registration; -pub(crate) use self::registration::Registration; - -mod stack; -use self::stack::Stack; - use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; @@ -73,7 +67,7 @@ use std::{cmp, fmt}; /// When the timer processes entries at level zero, it will notify all the /// `Delay` instances as their deadlines have been reached. For all higher /// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will +/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will /// either be canceled (dropped) or their associated entries will reach level /// zero and be notified. /// @@ -87,7 +81,7 @@ pub(crate) struct Driver<T: Park> { inner: Arc<Inner>, /// Timer wheel - wheel: wheel::Wheel<Stack>, + wheel: wheel::Wheel, /// Thread parker. The `Driver` park implementation delegates to this. park: T, @@ -163,9 +157,8 @@ where self.clock.now() - self.inner.start, crate::time::Round::Down, ); - let mut poll = wheel::Poll::new(now); - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(now) { let when = entry.when_internal().expect("invalid internal entry state"); // Fire the entry @@ -193,7 +186,7 @@ where self.clear_entry(&entry); } (None, Some(when)) => { - // Queue the entry + // Add the entry to the timer wheel self.add_entry(entry, when); } (Some(_), Some(next)) => { @@ -205,19 +198,17 @@ where } fn clear_entry(&mut self, entry: &Arc<Entry>) { - self.wheel.remove(entry, &mut ()); + self.wheel.remove(entry); entry.set_when_internal(None); } /// Fires the entry if it needs to, otherwise queue it to be processed later. - /// - /// Returns `None` if the entry was fired. fn add_entry(&mut self, entry: Arc<Entry>, when: u64) { use crate::time::wheel::InsertError; entry.set_when_internal(Some(when)); - match self.wheel.insert(when, entry, &mut ()) { + match self.wheel.insert(when, entry) { Ok(_) => {} Err((entry, InsertError::Elapsed)) => { // The entry's deadline has elapsed, so fire it and update the @@ -320,9 +311,9 @@ where self.inner.process.shutdown(); // Clear the wheel, using u64::MAX allows us to drain everything - let mut poll = wheel::Poll::new(u64::MAX); + let end_of_time = u64::MAX; - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(end_of_time) { entry.error(Error::shutdown()); } @@ -387,6 +378,10 @@ impl Inner { debug_assert!(prev <= MAX_TIMEOUTS); } + /// add the entry to the "process queue". entries are not immediately + /// pushed into the timer wheel but are instead pushed into the + /// process queue and then moved from the process queue into the timer + /// wheel on next `process` fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> { if self.process.push(entry)? { // The timer is notified so that it can process the timeout diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs deleted file mode 100644 index 3a0b3450..00000000 --- a/tokio/src/time/driver/registration.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::time::driver::{Entry, Handle}; -use crate::time::{Duration, Error, Instant}; - -use std::sync::Arc; -use std::task::{self, Poll}; - -/// Registration with a timer. -/// -/// The association between a `Delay` instance and a timer is done lazily in -/// `poll` -#[derive(Debug)] -pub(crate) struct Registration { - entry: Arc<Entry>, -} - -impl Registration { - pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { - let handle = Handle::current(); - - Registration { - entry: Entry::new(&handle, deadline, duration), - } - } - - pub(crate) fn deadline(&self) -> Instant { - self.entry.time_ref().deadline - } - - pub(crate) fn reset(&mut self, deadline: Instant) { - unsafe { - self.entry.time_mut().deadline = deadline; - } - - Entry::reset(&mut self.entry); - } - - pub(crate) fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() - } - - pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - self.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } -} - -impl Drop for Registration { - fn drop(&mut self) { - Entry::cancel(&self.entry); - } -} diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs deleted file mode 100644 index 3e2924f2..00000000 --- a/tokio/src/time/driver/stack.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::time::driver::Entry; -use crate::time::wheel; - -use std::ptr; -use std::sync::Arc; - -/// A doubly linked stack -#[derive(Debug)] -pub(crate) struct Stack { - head: Option<Arc<Entry>>, -} - -impl Default for Stack { - fn default() -> Stack { - Stack { head: None } - } -} - -impl wheel::Stack for Stack { - type Owned = Arc<Entry>; - type Borrowed = Entry; - type Store = (); - - fn is_empty(&self) -> bool { - self.head.is_none() - } - - fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) { - // Get a pointer to the entry to for the prev link - let ptr: *const Entry = &*entry as *const _; - - // Remove the old head entry - let old = self.head.take(); - - unsafe { - // Ensure the entry is not already in a stack. - debug_assert!((*entry.next_stack.get()).is_none()); - debug_assert!((*entry.prev_stack.get()).is_null()); - - if let Some(ref entry) = old.as_ref() { - debug_assert!({ - // The head is not already set to the entry - ptr != &***entry as *const _ - }); - - // Set the previous link on the old head - *entry.prev_stack.get() = ptr; - } - - // Set this entry's next pointer - *entry.next_stack.get() = old; - } - - // Update the head pointer - self.head = Some(entry); - } - - /// Pops an item from the stack - fn pop(&mut self, _: &mut ()) -> Option<Arc<Entry>> { - let entry = self.head.take(); - - unsafe { - if let Some(entry) = entry.as_ref() { - self.head = (*entry.next_stack.get()).take(); - - if let Some(entry) = self.head.as_ref() { - *entry.prev_stack.get() = ptr::null(); - } - - *entry.prev_stack.get() = ptr::null(); - } - } - - entry - } - - fn remove(&mut self, entry: &Entry, _: &mut ()) { - unsafe { - // Ensure that the entry is in fact contained by the stack - debug_assert!({ - // This walks the full linked list even if an entry is found. - let mut next = self.head.as_ref(); - let mut contains = false; - - while let Some(n) = next { - if entry as *const _ == &**n as *const _ { - debug_assert!(!contains); - contains = true; - } - - next = (*n.next_stack.get()).as_ref(); - } - - contains - }); - - // Unlink `entry` from the next node - let next = (*entry.next_stack.get()).take(); - - if let Some(next) = next.as_ref() { - (*next.prev_stack.get()) = *entry.prev_stack.get(); - } - - // Unlink `entry` from the prev node - - if let Some(prev) = (*entry.prev_stack.get()).as_ref() { - *prev.next_stack.get() = next; - } else { - // It is the head - self.head = next; - } - - // Unset the prev pointer - *entry.prev_stack.get() = ptr::null(); - } - } - - fn when(item: &Entry, _: &()) -> u64 { - item.when_internal().expect("invalid internal state") - } -} |