summaryrefslogtreecommitdiffstats
path: root/tokio/src/time/driver/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/time/driver/mod.rs')
-rw-r--r--tokio/src/time/driver/mod.rs494
1 files changed, 264 insertions, 230 deletions
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index 8532c551..917078ef 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -1,26 +1,29 @@
+// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
+// in the future, this will change to the reverse. For now, suppress this
+// warning and generally stick with being explicit about unsafety.
+#![allow(unused_unsafe)]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
//! Time driver
-mod atomic_stack;
-use self::atomic_stack::AtomicStack;
-
mod entry;
-pub(super) use self::entry::Entry;
+pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
mod handle;
pub(crate) use self::handle::Handle;
-use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
+mod wheel;
+
+pub(super) mod sleep;
+
+use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
-use crate::time::{error::Error, wheel};
+use crate::time::error::Error;
use crate::time::{Clock, Duration, Instant};
-use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
-
-use std::sync::Arc;
-use std::usize;
-use std::{cmp, fmt};
+use std::convert::TryInto;
+use std::fmt;
+use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
///
@@ -78,63 +81,96 @@ use std::{cmp, fmt};
/// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval
#[derive(Debug)]
-pub(crate) struct Driver<T: Park> {
+pub(crate) struct Driver<P: Park + 'static> {
+ /// Timing backend in use
+ time_source: ClockTime,
+
/// Shared state
- inner: Arc<Inner>,
+ inner: Handle,
- /// Timer wheel
- wheel: wheel::Wheel,
+ /// Parker to delegate to
+ park: P,
+}
+
+/// A structure which handles conversion from Instants to u64 timestamps.
+#[derive(Debug, Clone)]
+pub(self) struct ClockTime {
+ clock: super::clock::Clock,
+ start_time: Instant,
+}
- /// Thread parker. The `Driver` park implementation delegates to this.
- park: T,
+impl ClockTime {
+ pub(self) fn new(clock: Clock) -> Self {
+ Self {
+ clock,
+ start_time: super::clock::now(),
+ }
+ }
- /// Source of "now" instances
- clock: Clock,
+ pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
+ // Round up to the end of a ms
+ self.instant_to_tick(t + Duration::from_nanos(999_999))
+ }
- /// True if the driver is being shutdown
- is_shutdown: bool,
+ pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
+ // round up
+ let dur: Duration = t
+ .checked_duration_since(self.start_time)
+ .unwrap_or_else(|| Duration::from_secs(0));
+ let ms = dur.as_millis();
+
+ ms.try_into().expect("Duration too far into the future")
+ }
+
+ pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
+ Duration::from_millis(t)
+ }
+
+ pub(self) fn now(&self) -> u64 {
+ self.instant_to_tick(self.clock.now())
+ }
}
/// Timer state shared between `Driver`, `Handle`, and `Registration`.
-pub(crate) struct Inner {
- /// The instant at which the timer started running.
- start: Instant,
+pub(self) struct Inner {
+ /// Timing backend in use
+ time_source: ClockTime,
/// The last published timer `elapsed` value.
- elapsed: AtomicU64,
+ elapsed: u64,
- /// Number of active timeouts
- num: AtomicUsize,
+ /// The earliest time at which we promise to wake up without unparking
+ next_wake: Option<NonZeroU64>,
- /// Head of the "process" linked list.
- process: AtomicStack,
+ /// Timer wheel
+ wheel: wheel::Wheel,
+
+ /// True if the driver is being shutdown
+ is_shutdown: bool,
- /// Unparks the timer thread.
+ /// Unparker that can be used to wake the time driver
unpark: Box<dyn Unpark>,
}
-/// Maximum number of timeouts the system can handle concurrently.
-const MAX_TIMEOUTS: usize = usize::MAX >> 1;
-
// ===== impl Driver =====
-impl<T> Driver<T>
+impl<P> Driver<P>
where
- T: Park,
+ P: Park + 'static,
{
/// Creates a new `Driver` instance that uses `park` to block the current
- /// thread and `clock` to get the current `Instant`.
+ /// thread and `time_source` to get the current time and convert to ticks.
///
/// Specifying the source of time is useful when testing.
- pub(crate) fn new(park: T, clock: Clock) -> Driver<T> {
- let unpark = Box::new(park.unpark());
+ pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
+ let time_source = ClockTime::new(clock);
+
+ let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
Driver {
- inner: Arc::new(Inner::new(clock.now(), unpark)),
- wheel: wheel::Wheel::new(),
+ time_source,
+ inner: Handle::new(Arc::new(Mutex::new(inner))),
park,
- clock,
- is_shutdown: false,
}
}
@@ -145,189 +181,240 @@ where
/// `with_default`, setting the timer as the default timer for the execution
/// context.
pub(crate) fn handle(&self) -> Handle {
- Handle::new(Arc::downgrade(&self.inner))
+ self.inner.clone()
}
- /// Converts an `Expiration` to an `Instant`.
- fn expiration_instant(&self, when: u64) -> Instant {
- self.inner.start + Duration::from_millis(when)
- }
+ fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
+ let clock = &self.time_source.clock;
- /// Runs timer related logic
- fn process(&mut self) {
- let now = crate::time::ms(
- self.clock.now() - self.inner.start,
- crate::time::Round::Down,
- );
+ let mut lock = self.inner.lock();
- while let Some(entry) = self.wheel.poll(now) {
- let when = entry.when_internal().expect("invalid internal entry state");
+ let next_wake = lock.wheel.next_expiration_time();
+ lock.next_wake =
+ next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
- // Fire the entry
- entry.fire(when);
+ drop(lock);
- // Track that the entry has been fired
- entry.set_when_internal(None);
- }
+ match next_wake {
+ Some(when) => {
+ let now = self.time_source.now();
+ // Note that we effectively round up to 1ms here - this avoids
+ // very short-duration microsecond-resolution sleeps that the OS
+ // might treat as zero-length.
+ let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
+
+ if duration > Duration::from_millis(0) {
+ if let Some(limit) = limit {
+ duration = std::cmp::min(limit, duration);
+ }
- // Update the elapsed cache
- self.inner.elapsed.store(self.wheel.elapsed(), SeqCst);
- }
+ if clock.is_paused() {
+ self.park.park_timeout(Duration::from_secs(0))?;
- /// Processes the entry queue
- ///
- /// This handles adding and canceling timeouts.
- fn process_queue(&mut self) {
- for entry in self.inner.process.take() {
- match (entry.when_internal(), entry.load_state()) {
- (None, None) => {
- // Nothing to do
- }
- (Some(_), None) => {
- // Remove the entry
- self.clear_entry(&entry);
- }
- (None, Some(when)) => {
- // Add the entry to the timer wheel
- self.add_entry(entry, when);
+ // Simulate advancing time
+ clock.advance(duration);
+ } else {
+ self.park.park_timeout(duration)?;
+ }
+ } else {
+ self.park.park_timeout(Duration::from_secs(0))?;
}
- (Some(_), Some(next)) => {
- self.clear_entry(&entry);
- self.add_entry(entry, next);
+ }
+ None => {
+ if let Some(duration) = limit {
+ if clock.is_paused() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ clock.advance(duration);
+ } else {
+ self.park.park_timeout(duration)?;
+ }
+ } else {
+ self.park.park()?;
}
}
}
- }
-
- fn clear_entry(&mut self, entry: &Arc<Entry>) {
- self.wheel.remove(entry);
- entry.set_when_internal(None);
- }
- /// Fires the entry if it needs to, otherwise queue it to be processed later.
- fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
- use crate::time::error::InsertError;
+ // Process pending timers after waking up
+ self.inner.process();
- entry.set_when_internal(Some(when));
-
- match self.wheel.insert(when, entry) {
- Ok(_) => {}
- Err((entry, InsertError::Elapsed)) => {
- // The entry's deadline has elapsed, so fire it and update the
- // internal state accordingly.
- entry.set_when_internal(None);
- entry.fire(when);
- }
- Err((entry, InsertError::Invalid)) => {
- // The entry's deadline is invalid, so error it and update the
- // internal state accordingly.
- entry.set_when_internal(None);
- entry.error(Error::invalid());
- }
- }
+ Ok(())
}
}
-impl<T> Park for Driver<T>
-where
- T: Park,
-{
- type Unpark = T::Unpark;
- type Error = T::Error;
+impl Handle {
+ /// Runs timer related logic, and returns the next wakeup time
+ pub(self) fn process(&self) {
+ let now = self.time_source().now();
- fn unpark(&self) -> Self::Unpark {
- self.park.unpark()
+ self.process_at_time(now)
}
- fn park(&mut self) -> Result<(), Self::Error> {
- self.process_queue();
+ pub(self) fn process_at_time(&self, now: u64) {
+ let mut waker_list: [Option<Waker>; 32] = Default::default();
+ let mut waker_idx = 0;
- match self.wheel.poll_at() {
- Some(when) => {
- let now = self.clock.now();
- let deadline = self.expiration_instant(when);
+ let mut lock = self.lock();
- if deadline > now {
- let dur = deadline - now;
+ assert!(now >= lock.elapsed);
- if self.clock.is_paused() {
- self.park.park_timeout(Duration::from_secs(0))?;
- self.clock.advance(dur);
- } else {
- self.park.park_timeout(dur)?;
+ while let Some(entry) = lock.wheel.poll(now) {
+ debug_assert!(unsafe { entry.is_pending() });
+
+ // SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
+ if let Some(waker) = unsafe { entry.fire(Ok(())) } {
+ waker_list[waker_idx] = Some(waker);
+
+ waker_idx += 1;
+
+ if waker_idx == waker_list.len() {
+ // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
+ drop(lock);
+
+ for waker in waker_list.iter_mut() {
+ waker.take().unwrap().wake();
}
- } else {
- self.park.park_timeout(Duration::from_secs(0))?;
+
+ waker_idx = 0;
+
+ lock = self.lock();
}
}
- None => {
- self.park.park()?;
- }
}
- self.process();
+ // Update the elapsed cache
+ lock.elapsed = lock.wheel.elapsed();
+ lock.next_wake = lock
+ .wheel
+ .poll_at()
+ .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
- Ok(())
+ drop(lock);
+
+ for waker in waker_list[0..waker_idx].iter_mut() {
+ waker.take().unwrap().wake();
+ }
}
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.process_queue();
+ /// Removes a registered timer from the driver.
+ ///
+ /// The timer will be moved to the cancelled state. Wakers will _not_ be
+ /// invoked. If the timer is already completed, this function is a no-op.
+ ///
+ /// This function always acquires the driver lock, even if the entry does
+ /// not appear to be registered.
+ ///
+ /// SAFETY: The timer must not be registered with some other driver, and
+ /// `add_entry` must not be called concurrently.
+ pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
+ unsafe {
+ let mut lock = self.lock();
+
+ if entry.as_ref().might_be_registered() {
+ lock.wheel.remove(entry);
+ }
- match self.wheel.poll_at() {
- Some(when) => {
- let now = self.clock.now();
- let deadline = self.expiration_instant(when);
+ entry.as_ref().handle().fire(Ok(()));
+ }
+ }
- if deadline > now {
- let duration = cmp::min(deadline - now, duration);
+ /// Removes and re-adds an entry to the driver.
+ ///
+ /// SAFETY: The timer must be either unregistered, or registered with this
+ /// driver. No other threads are allowed to concurrently manipulate the
+ /// timer at all (the current thread should hold an exclusive reference to
+ /// the `TimerEntry`)
+ pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
+ let waker = unsafe {
+ let mut lock = self.lock();
+
+ // We may have raced with a firing/deregistration, so check before
+ // deregistering.
+ if unsafe { entry.as_ref().might_be_registered() } {
+ lock.wheel.remove(entry);
+ }
- if self.clock.is_paused() {
- self.park.park_timeout(Duration::from_secs(0))?;
- self.clock.advance(duration);
- } else {
- self.park.park_timeout(duration)?;
+ // Now that we have exclusive control of this entry, mint a handle to reinsert it.
+ let entry = entry.as_ref().handle();
+
+ if lock.is_shutdown {
+ unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
+ } else {
+ entry.set_expiration(new_tick);
+
+ // Note: We don't have to worry about racing with some other resetting
+ // thread, because add_entry and reregister require exclusive control of
+ // the timer entry.
+ match unsafe { lock.wheel.insert(entry) } {
+ Ok(when) => {
+ if lock
+ .next_wake
+ .map(|next_wake| when < next_wake.get())
+ .unwrap_or(true)
+ {
+ lock.unpark.unpark();
+ }
+
+ None
}
- } else {
- self.park.park_timeout(Duration::from_secs(0))?;
+ Err((entry, super::error::InsertError::Elapsed)) => unsafe {
+ entry.fire(Ok(()))
+ },
}
}
- None => {
- self.park.park_timeout(duration)?;
- }
+
+ // Must release lock before invoking waker to avoid the risk of deadlock.
+ };
+
+ // The timer was fired synchronously as a result of the reregistration.
+ // Wake the waker; this is needed because we might reset _after_ a poll,
+ // and otherwise the task won't be awoken to poll again.
+ if let Some(waker) = waker {
+ waker.wake();
}
+ }
+}
- self.process();
+impl<P> Park for Driver<P>
+where
+ P: Park + 'static,
+{
+ type Unpark = P::Unpark;
+ type Error = P::Error;
- Ok(())
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park_internal(None)
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park_internal(Some(duration))
}
fn shutdown(&mut self) {
- if self.is_shutdown {
+ let mut lock = self.inner.lock();
+
+ if lock.is_shutdown {
return;
}
- use std::u64;
+ lock.is_shutdown = true;
- // Shutdown the stack of entries to process, preventing any new entries
- // from being pushed.
- self.inner.process.shutdown();
+ drop(lock);
- // Clear the wheel, using u64::MAX allows us to drain everything
- let end_of_time = u64::MAX;
+ // Advance time forward to the end of time.
- while let Some(entry) = self.wheel.poll(end_of_time) {
- entry.error(Error::shutdown());
- }
+ self.inner.process_at_time(u64::MAX);
self.park.shutdown();
-
- self.is_shutdown = true;
}
}
-impl<T> Drop for Driver<T>
+impl<P> Drop for Driver<P>
where
- T: Park,
+ P: Park + 'static,
{
fn drop(&mut self) {
self.shutdown();
@@ -337,69 +424,16 @@ where
// ===== impl Inner =====
impl Inner {
- fn new(start: Instant, unpark: Box<dyn Unpark>) -> Inner {
+ pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
Inner {
- num: AtomicUsize::new(0),
- elapsed: AtomicU64::new(0),
- process: AtomicStack::new(),
- start,
+ time_source,
+ elapsed: 0,
+ next_wake: None,
unpark,
+ wheel: wheel::Wheel::new(),
+ is_shutdown: false,
}
}
-
- fn elapsed(&self) -> u64 {
- self.elapsed.load(SeqCst)
- }
-
- #[cfg(all(test, loom))]
- fn num(&self, ordering: std::sync::atomic::Ordering) -> usize {
- self.num.load(ordering)
- }
-
- /// Increments the number of active timeouts
- fn increment(&self) -> Result<(), Error> {
- let mut curr = self.num.load(Relaxed);
- loop {
- if curr == MAX_TIMEOUTS {
- return Err(Error::at_capacity());
- }
-
- match self
- .num
- .compare_exchange_weak(curr, curr + 1, Release, Relaxed)
- {
- Ok(_) => return Ok(()),
- Err(next) => curr = next,
- }
- }
- }
-
- /// Decrements the number of active timeouts
- fn decrement(&self) {
- let prev = self.num.fetch_sub(1, Acquire);
- debug_assert!(prev <= MAX_TIMEOUTS);
- }
-
- /// add the entry to the "process queue". entries are not immediately
- /// pushed into the timer wheel but are instead pushed into the
- /// process queue and then moved from the process queue into the timer
- /// wheel on next `process`
- fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
- if self.process.push(entry)? {
- // The timer is notified so that it can process the timeout
- self.unpark.unpark();
- }
-
- Ok(())
- }
-
- fn normalize_deadline(&self, deadline: Instant) -> u64 {
- if deadline < self.start {
- return 0;
- }
-
- crate::time::ms(deadline - self.start, crate::time::Round::Up)
- }
}
impl fmt::Debug for Inner {
@@ -408,5 +442,5 @@ impl fmt::Debug for Inner {
}
}
-#[cfg(all(test, loom))]
+#[cfg(test)]
mod tests;