From 7e35922a1d282b1e3dadf037cd237be336b331fb Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 6 Nov 2019 23:53:46 -0800 Subject: time: rename `tokio::timer` -> `tokio::time` (#1745) --- tokio-test/src/clock.rs | 13 +- tokio-test/src/io.rs | 2 +- tokio-test/tests/block_on.rs | 2 +- tokio-test/tests/clock.rs | 2 +- tokio/Cargo.toml | 8 +- tokio/src/clock.rs | 14 - tokio/src/future.rs | 8 +- tokio/src/lib.rs | 7 +- tokio/src/runtime/builder.rs | 4 +- tokio/src/runtime/timer.rs | 6 +- tokio/src/stream.rs | 10 +- tokio/src/time/atomic.rs | 60 +++ tokio/src/time/clock/mod.rs | 149 ++++++ tokio/src/time/clock/now.rs | 15 + tokio/src/time/deadline.rs | 162 +++++++ tokio/src/time/delay.rs | 115 +++++ tokio/src/time/delay_queue.rs | 854 +++++++++++++++++++++++++++++++++ tokio/src/time/error.rs | 72 +++ tokio/src/time/interval.rs | 112 +++++ tokio/src/time/mod.rs | 143 ++++++ tokio/src/time/throttle.rs | 85 ++++ tokio/src/time/timeout.rs | 232 +++++++++ tokio/src/time/timer/atomic_stack.rs | 124 +++++ tokio/src/time/timer/entry.rs | 396 ++++++++++++++++ tokio/src/time/timer/handle.rs | 187 ++++++++ tokio/src/time/timer/mod.rs | 477 +++++++++++++++++++ tokio/src/time/timer/now.rs | 11 + tokio/src/time/timer/registration.rs | 70 +++ tokio/src/time/timer/stack.rs | 121 +++++ tokio/src/time/wheel/level.rs | 255 ++++++++++ tokio/src/time/wheel/mod.rs | 314 +++++++++++++ tokio/src/time/wheel/stack.rs | 26 ++ tokio/src/timer/atomic.rs | 60 --- tokio/src/timer/clock/mod.rs | 149 ------ tokio/src/timer/clock/now.rs | 15 - tokio/src/timer/deadline.rs | 162 ------- tokio/src/timer/delay.rs | 115 ----- tokio/src/timer/delay_queue.rs | 855 ---------------------------------- tokio/src/timer/error.rs | 72 --- tokio/src/timer/interval.rs | 112 ----- tokio/src/timer/mod.rs | 147 ------ tokio/src/timer/throttle.rs | 85 ---- tokio/src/timer/timeout.rs | 232 --------- tokio/src/timer/timer/atomic_stack.rs | 124 ----- tokio/src/timer/timer/entry.rs | 396 ---------------- tokio/src/timer/timer/handle.rs | 187 -------- tokio/src/timer/timer/mod.rs | 477 ------------------- tokio/src/timer/timer/now.rs | 11 - tokio/src/timer/timer/registration.rs | 70 --- tokio/src/timer/timer/stack.rs | 121 ----- tokio/src/timer/wheel/level.rs | 255 ---------- tokio/src/timer/wheel/mod.rs | 314 ------------- tokio/src/timer/wheel/stack.rs | 26 -- tokio/tests/clock.rs | 6 +- tokio/tests/rt_common.rs | 6 +- tokio/tests/timer_clock.rs | 4 +- tokio/tests/timer_delay.rs | 4 +- tokio/tests/timer_hammer.rs | 2 +- tokio/tests/timer_interval.rs | 2 +- tokio/tests/timer_queue.rs | 2 +- tokio/tests/timer_rt.rs | 4 +- tokio/tests/timer_throttle.rs | 2 +- tokio/tests/timer_timeout.rs | 2 +- 63 files changed, 4026 insertions(+), 4049 deletions(-) delete mode 100644 tokio/src/clock.rs create mode 100644 tokio/src/time/atomic.rs create mode 100644 tokio/src/time/clock/mod.rs create mode 100644 tokio/src/time/clock/now.rs create mode 100644 tokio/src/time/deadline.rs create mode 100644 tokio/src/time/delay.rs create mode 100644 tokio/src/time/delay_queue.rs create mode 100644 tokio/src/time/error.rs create mode 100644 tokio/src/time/interval.rs create mode 100644 tokio/src/time/mod.rs create mode 100644 tokio/src/time/throttle.rs create mode 100644 tokio/src/time/timeout.rs create mode 100644 tokio/src/time/timer/atomic_stack.rs create mode 100644 tokio/src/time/timer/entry.rs create mode 100644 tokio/src/time/timer/handle.rs create mode 100644 tokio/src/time/timer/mod.rs create mode 100644 tokio/src/time/timer/now.rs create mode 100644 tokio/src/time/timer/registration.rs create mode 100644 tokio/src/time/timer/stack.rs create mode 100644 tokio/src/time/wheel/level.rs create mode 100644 tokio/src/time/wheel/mod.rs create mode 100644 tokio/src/time/wheel/stack.rs delete mode 100644 tokio/src/timer/atomic.rs delete mode 100644 tokio/src/timer/clock/mod.rs delete mode 100644 tokio/src/timer/clock/now.rs delete mode 100644 tokio/src/timer/deadline.rs delete mode 100644 tokio/src/timer/delay.rs delete mode 100644 tokio/src/timer/delay_queue.rs delete mode 100644 tokio/src/timer/error.rs delete mode 100644 tokio/src/timer/interval.rs delete mode 100644 tokio/src/timer/mod.rs delete mode 100644 tokio/src/timer/throttle.rs delete mode 100644 tokio/src/timer/timeout.rs delete mode 100644 tokio/src/timer/timer/atomic_stack.rs delete mode 100644 tokio/src/timer/timer/entry.rs delete mode 100644 tokio/src/timer/timer/handle.rs delete mode 100644 tokio/src/timer/timer/mod.rs delete mode 100644 tokio/src/timer/timer/now.rs delete mode 100644 tokio/src/timer/timer/registration.rs delete mode 100644 tokio/src/timer/timer/stack.rs delete mode 100644 tokio/src/timer/wheel/level.rs delete mode 100644 tokio/src/timer/wheel/mod.rs delete mode 100644 tokio/src/timer/wheel/stack.rs diff --git a/tokio-test/src/clock.rs b/tokio-test/src/clock.rs index 38417539..d2f29249 100644 --- a/tokio-test/src/clock.rs +++ b/tokio-test/src/clock.rs @@ -1,10 +1,9 @@ -//! A mocked clock for use with `tokio::timer` based futures. +//! A mocked clock for use with `tokio::time` based futures. //! //! # Example //! //! ``` -//! use tokio::clock; -//! use tokio::timer::delay; +//! use tokio::time::{clock, delay}; //! use tokio_test::{assert_ready, assert_pending, task}; //! //! use std::time::Duration; @@ -23,8 +22,8 @@ //! ``` use tokio::runtime::{Park, Unpark}; -use tokio::timer::clock::{Clock, Now}; -use tokio::timer::Timer; +use tokio::time::clock::{Clock, Now}; +use tokio::time::Timer; use std::marker::PhantomData; use std::rc::Rc; @@ -125,13 +124,13 @@ impl MockClock { where F: FnOnce(&mut Handle) -> R, { - tokio::timer::clock::with_default(&self.clock, || { + tokio::time::clock::with_default(&self.clock, || { let park = self.time.mock_park(); let timer = Timer::new(park); let handle = timer.handle(); let time = self.time.clone(); - let _timer = tokio::timer::set_default(&handle); + let _timer = tokio::time::set_default(&handle); let mut handle = Handle::new(timer, time); f(&mut handle) // lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap() diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index afd1c423..a073193c 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -18,7 +18,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::mpsc; -use tokio::timer::{clock, timer, Delay}; +use tokio::time::{clock, timer, Delay}; use bytes::Buf; use futures_core::ready; diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs index 6d5f481a..c361d500 100644 --- a/tokio-test/tests/block_on.rs +++ b/tokio-test/tests/block_on.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio::timer::delay; +use tokio::time::delay; use tokio_test::block_on; use std::time::{Duration, Instant}; diff --git a/tokio-test/tests/clock.rs b/tokio-test/tests/clock.rs index abb61e23..d9d2fcfc 100644 --- a/tokio-test/tests/clock.rs +++ b/tokio-test/tests/clock.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio::timer::delay; +use tokio::time::delay; use tokio_test::clock::MockClock; use tokio_test::task; use tokio_test::{assert_pending, assert_ready}; diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 58d7cecc..df284172 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -33,7 +33,7 @@ default = [ "rt-full", "signal", "sync", - "timer", + "time", ] executor-core = [] @@ -47,7 +47,7 @@ net-full = ["tcp", "udp", "uds"] net-driver = ["io-traits", "mio", "blocking", "lazy_static"] rt-current-thread = [ "executor-core", - "timer", + "time", "sync", "net-driver", ] @@ -58,7 +58,7 @@ rt-full = [ "net-full", "rt-current-thread", "sync", - "timer", + "time", ] signal = [ "lazy_static", @@ -71,7 +71,7 @@ signal = [ ] sync = ["fnv"] tcp = ["io", "net-driver"] -timer = ["executor-core", "sync", "slab"] +time = ["executor-core", "sync", "slab"] udp = ["io", "net-driver"] uds = ["io", "net-driver", "mio-uds", "libc"] process = [ diff --git a/tokio/src/clock.rs b/tokio/src/clock.rs deleted file mode 100644 index d574af85..00000000 --- a/tokio/src/clock.rs +++ /dev/null @@ -1,14 +0,0 @@ -//! A configurable source of time. -//! -//! This module provides the [`now`][n] function, which returns an `Instant` -//! representing "now". The source of time used by this function is configurable -//! and allows mocking out the source of time in tests or performing caching -//! operations to reduce the number of syscalls. -//! -//! Note that, because the source of time is configurable, it is possible to -//! observe non-monotonic behavior when calling [`now`][n] from different -//! executors. -//! -//! [n]: fn.now.html - -pub use crate::timer::clock::now; diff --git a/tokio/src/future.rs b/tokio/src/future.rs index 2a714a3e..f6b7e4a7 100644 --- a/tokio/src/future.rs +++ b/tokio/src/future.rs @@ -1,9 +1,9 @@ //! Asynchronous values. -#[cfg(feature = "timer")] -use crate::timer::Timeout; +#[cfg(feature = "time")] +use crate::time::Timeout; -#[cfg(feature = "timer")] +#[cfg(feature = "time")] use std::time::Duration; #[doc(inline)] @@ -57,7 +57,7 @@ pub trait FutureExt: Future { /// } /// # } /// ``` - #[cfg(feature = "timer")] + #[cfg(feature = "time")] fn timeout(self, timeout: Duration) -> Timeout where Self: Sized, diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b7b99ac6..8f9736ea 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -85,9 +85,6 @@ macro_rules! thread_local { ($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } } } -#[cfg(feature = "timer")] -pub mod clock; - #[cfg(feature = "fs")] pub mod fs; @@ -117,8 +114,8 @@ pub mod stream; #[cfg(feature = "sync")] pub mod sync; -#[cfg(feature = "timer")] -pub mod timer; +#[cfg(feature = "time")] +pub mod time; #[cfg(feature = "rt-full")] mod util; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 6081c10e..3a81af3e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -22,7 +22,7 @@ use std::fmt; /// /// ``` /// use tokio::runtime::Builder; -/// use tokio::timer::clock::Clock; +/// use tokio::time::clock::Clock; /// /// fn main() { /// // build Runtime @@ -324,7 +324,7 @@ impl Builder { #[cfg(feature = "rt-full")] fn build_threadpool(&mut self) -> io::Result { use crate::runtime::{Kind, ThreadPool}; - use crate::timer::clock; + use crate::time::clock; use std::sync::Mutex; let mut net_handles = Vec::new(); diff --git a/tokio/src/runtime/timer.rs b/tokio/src/runtime/timer.rs index 987324cd..03e501ce 100644 --- a/tokio/src/runtime/timer.rs +++ b/tokio/src/runtime/timer.rs @@ -1,9 +1,9 @@ pub(crate) use self::variant::*; -#[cfg(feature = "timer")] +#[cfg(feature = "time")] mod variant { use crate::runtime::io; - use crate::timer::{clock, timer}; + use crate::time::{clock, timer}; pub(crate) type Clock = clock::Clock; pub(crate) type Driver = timer::Timer; @@ -23,7 +23,7 @@ mod variant { } } -#[cfg(not(feature = "timer"))] +#[cfg(not(feature = "time"))] mod variant { use crate::runtime::io; diff --git a/tokio/src/stream.rs b/tokio/src/stream.rs index e4f5a1b1..0a597a58 100644 --- a/tokio/src/stream.rs +++ b/tokio/src/stream.rs @@ -1,10 +1,10 @@ //! A sequence of asynchronous values. -#[cfg(feature = "timer")] +#[cfg(feature = "time")] use std::time::Duration; -#[cfg(feature = "timer")] -use crate::timer::{throttle::Throttle, Timeout}; +#[cfg(feature = "time")] +use crate::time::{throttle::Throttle, Timeout}; #[doc(inline)] pub use futures_core::Stream; @@ -29,7 +29,7 @@ pub trait StreamExt: Stream { /// Throttle down the stream by enforcing a fixed delay between items. /// /// Errors are also delayed. - #[cfg(feature = "timer")] + #[cfg(feature = "time")] fn throttle(self, duration: Duration) -> Throttle where Self: Sized, @@ -66,7 +66,7 @@ pub trait StreamExt: Stream { /// } /// # } /// ``` - #[cfg(feature = "timer")] + #[cfg(feature = "time")] fn timeout(self, timeout: Duration) -> Timeout where Self: Sized, diff --git a/tokio/src/time/atomic.rs b/tokio/src/time/atomic.rs new file mode 100644 index 00000000..206954fc --- /dev/null +++ b/tokio/src/time/atomic.rs @@ -0,0 +1,60 @@ +//! Implementation of an atomic u64 cell. On 64 bit platforms, this is a +//! re-export of `AtomicU64`. On 32 bit platforms, this is implemented using a +//! `Mutex`. + +pub(crate) use self::imp::AtomicU64; + +// `AtomicU64` can only be used on targets with `target_has_atomic` is 64 or greater. +// Once `cfg_target_has_atomic` feature is stable, we can replace it with +// `#[cfg(target_has_atomic = "64")]`. +// Refs: https://github.com/rust-lang/rust/tree/master/src/librustc_target +#[cfg(not(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc")))] +mod imp { + pub(crate) use std::sync::atomic::AtomicU64; +} + +#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))] +mod imp { + use std::sync::atomic::Ordering; + use std::sync::Mutex; + + #[derive(Debug)] + pub(crate) struct AtomicU64 { + inner: Mutex, + } + + impl AtomicU64 { + pub(crate) fn new(val: u64) -> AtomicU64 { + AtomicU64 { + inner: Mutex::new(val), + } + } + + pub(crate) fn load(&self, _: Ordering) -> u64 { + *self.inner.lock().unwrap() + } + + pub(crate) fn store(&self, val: u64, _: Ordering) { + *self.inner.lock().unwrap() = val; + } + + pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock().unwrap(); + let prev = *lock; + *lock = prev | val; + prev + } + + pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock().unwrap(); + let prev = *lock; + + if prev != old { + return prev; + } + + *lock = new; + prev + } + } +} diff --git a/tokio/src/time/clock/mod.rs b/tokio/src/time/clock/mod.rs new file mode 100644 index 00000000..17cbe2f9 --- /dev/null +++ b/tokio/src/time/clock/mod.rs @@ -0,0 +1,149 @@ +//! A configurable source of time. +//! +//! This module provides an API to get the current instant in such a way that +//! the source of time may be configured. This allows mocking out the source of +//! time in tests. +//! +//! The [`now`][n] function returns the current [`Instant`]. By default, it delegates +//! to [`Instant::now`]. +//! +//! The source of time used by [`now`][n] can be configured by implementing the +//! [`Now`] trait and passing an instance to [`with_default`]. +//! +//! [n]: fn.now.html +//! [`Now`]: trait.Now.html +//! [`Instant`]: std::time::Instant +//! [`Instant::now`]: std::time::Instant::now +//! [`with_default`]: fn.with_default.html + +mod now; + +pub use self::now::Now; + +use std::cell::Cell; +use std::fmt; +use std::sync::Arc; +use std::time::Instant; + +/// A handle to a source of time. +/// +/// `Clock` instances return [`Instant`] values corresponding to "now". The source +/// of these values is configurable. The default source is [`Instant::now`]. +/// +/// [`Instant`]: std::time::Instant +/// [`Instant::now`]: std::time::Instant::now +#[derive(Default, Clone)] +pub struct Clock { + now: Option>, +} + +thread_local! { + /// Thread-local tracking the current clock + static CLOCK: Cell> = Cell::new(None) +} + +/// Returns an `Instant` corresponding to "now". +/// +/// This function delegates to the source of time configured for the current +/// execution context. By default, this is `Instant::now()`. +/// +/// Note that, because the source of time is configurable, it is possible to +/// observe non-monotonic behavior when calling `now` from different +/// executors. +/// +/// See [module](index.html) level documentation for more details. +/// +/// # Examples +/// +/// ``` +/// # use tokio::time::clock; +/// let now = clock::now(); +/// ``` +pub fn now() -> Instant { + CLOCK.with(|current| match current.get() { + Some(ptr) => unsafe { (*ptr).now() }, + None => Instant::now(), + }) +} + +impl Clock { + /// Return a new `Clock` instance that uses the current execution context's + /// source of time. + pub fn new() -> Clock { + CLOCK.with(|current| match current.get() { + Some(ptr) => unsafe { (*ptr).clone() }, + None => Clock::system(), + }) + } + + /// Return a new `Clock` instance that uses `now` as the source of time. + pub fn new_with_now(now: impl Now) -> Clock { + Clock { + now: Some(Arc::new(now)), + } + } + + /// Return a new `Clock` instance that uses [`Instant::now`] as the source + /// of time. + /// + /// [`Instant::now`]: std::time::Instant::now + pub fn system() -> Clock { + Clock { now: None } + } + + /// Returns an instant corresponding to "now" by using the instance's source + /// of time. + pub fn now(&self) -> Instant { + match self.now { + Some(ref now) => now.now(), + None => Instant::now(), + } + } +} + +impl fmt::Debug for Clock { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Clock") + .field("now", { + if self.now.is_some() { + &"Some(Arc)" + } else { + &"None" + } + }) + .finish() + } +} + +/// Set the default clock for the duration of the closure. +/// +/// # Panics +/// +/// This function panics if there already is a default clock set. +pub fn with_default(clock: &Clock, f: F) -> R +where + F: FnOnce() -> R, +{ + CLOCK.with(|cell| { + assert!( + cell.get().is_none(), + "default clock already set for execution context" + ); + + // Ensure that the clock is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a Cell>); + + impl Drop for Reset<'_> { + fn drop(&mut self) { + self.0.set(None); + } + } + + let _reset = Reset(cell); + + cell.set(Some(clock as *const Clock)); + + f() + }) +} diff --git a/tokio/src/time/clock/now.rs b/tokio/src/time/clock/now.rs new file mode 100644 index 00000000..f6b11b70 --- /dev/null +++ b/tokio/src/time/clock/now.rs @@ -0,0 +1,15 @@ +use std::time::Instant; + +/// Returns [`Instant`] values representing the current instant in time. +/// +/// This allows customizing the source of time which is especially useful for +/// testing. +/// +/// Implementations must ensure that calls to `now` return monotonically +/// increasing [`Instant`] values. +/// +/// [`Instant`]: std::time::Instant +pub trait Now: Send + Sync + 'static { + /// Returns an instant corresponding to "now". + fn now(&self) -> Instant; +} diff --git a/tokio/src/time/deadline.rs b/tokio/src/time/deadline.rs new file mode 100644 index 00000000..9df67da6 --- /dev/null +++ b/tokio/src/time/deadline.rs @@ -0,0 +1,162 @@ +#![allow(deprecated)] + +use crate::Delay; +use futures::{Async, Future, Poll}; +use std::error; +use std::fmt; +use std::time::Instant; + +#[deprecated(since = "0.2.6", note = "use Timeout instead")] +#[doc(hidden)] +#[derive(Debug)] +pub struct Deadline { + future: T, + delay: Delay, +} + +#[deprecated(since = "0.2.6", note = "use Timeout instead")] +#[doc(hidden)] +#[derive(Debug)] +pub struct DeadlineError(Kind); + +/// Deadline error variants +#[derive(Debug)] +enum Kind { + /// Inner future returned an error + Inner(T), + + /// The deadline elapsed. + Elapsed, + + /// Timer returned an error. + Timer(crate::Error), +} + +impl Deadline { + /// Create a new `Deadline` that completes when `future` completes or when + /// `deadline` is reached. + pub fn new(future: T, deadline: Instant) -> Deadline { + Deadline::new_with_delay(future, Delay::new(deadline)) + } + + pub(crate) fn new_with_delay(future: T, delay: Delay) -> Deadline { + Deadline { future, delay } + } + + /// Gets a reference to the underlying future in this deadline. + pub fn get_ref(&self) -> &T { + &self.future + } + + /// Gets a mutable reference to the underlying future in this deadline. + pub fn get_mut(&mut self) -> &mut T { + &mut self.future + } + + /// Consumes this deadline, returning the underlying future. + pub fn into_inner(self) -> T { + self.future + } +} + +impl Future for Deadline +where + T: Future, +{ + type Item = T::Item; + type Error = DeadlineError; + + fn poll(&mut self) -> Poll { + // First, try polling the future + match self.future.poll() { + Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), + Ok(Async::NotReady) => {} + Err(e) => return Err(DeadlineError::inner(e)), + } + + // Now check the timer + match self.delay.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => Err(DeadlineError::elapsed()), + Err(e) => Err(DeadlineError::timer(e)), + } + } +} + +// ===== impl DeadlineError ===== + +impl DeadlineError { + /// Create a new `DeadlineError` representing the inner future completing + /// with `Err`. + pub fn inner(err: T) -> DeadlineError { + DeadlineError(Kind::Inner(err)) + } + + /// Returns `true` if the error was caused by the inner future completing + /// with `Err`. + pub fn is_inner(&self) -> bool { + match self.0 { + Kind::Inner(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the inner future error. + pub fn into_inner(self) -> Option { + match self.0 { + Kind::Inner(err) => Some(err), + _ => None, + } + } + + /// Create a new `DeadlineError` representing the inner future not + /// completing before the deadline is reached. + pub fn elapsed() -> DeadlineError { + DeadlineError(Kind::Elapsed) + } + + /// Returns `true` if the error was caused by the inner future not + /// completing before the deadline is reached. + pub fn is_elapsed(&self) -> bool { + match self.0 { + Kind::Elapsed => true, + _ => false, + } + } + + /// Creates a new `DeadlineError` representing an error encountered by the + /// timer implementation + pub fn timer(err: crate::Error) -> DeadlineError { + DeadlineError(Kind::Timer(err)) + } + + /// Returns `true` if the error was caused by the timer. + pub fn is_timer(&self) -> bool { + match self.0 { + Kind::Timer(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the error raised by the timer implementation. + pub fn into_timer(self) -> Option { + match self.0 { + Kind::Timer(err) => Some(err), + _ => None, + } + } +} + +impl error::Error for DeadlineError {} + +impl fmt::Display for DeadlineError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.fmt(fmt), + Elapsed => "deadline has elapsed".fmt(fmt), + Timer(ref e) => e.fmt(fmt), + } + } +} diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs new file mode 100644 index 00000000..e79f9c34 --- /dev/null +++ b/tokio/src/time/delay.rs @@ -0,0 +1,115 @@ +use crate::time::timer::{HandlePriv, Registration}; + +use futures_core::ready; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; + +/// A future that completes at a specified instant in time. +/// +/// Instances of `Delay` perform no work and complete with `()` once the +/// specified deadline has been reached. +/// +/// `Delay` has a resolution of one millisecond and should not be used for tasks +/// that require high-resolution timers. +/// +/// # Cancellation +/// +/// Canceling a `Delay` is done by dropping the value. No additional cleanup or +/// other work is required. +/// +/// [`new`]: #method.new +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Delay { + /// The link between the `Delay` instance at the timer that drives it. + /// + /// This also stores the `deadline` value. + registration: Registration, +} + +impl Delay { + /// Create a new `Delay` instance that elapses at `deadline`. + /// + /// Only millisecond level resolution is guaranteed. There is no guarantee + /// as to how the sub-millisecond portion of `deadline` will be handled. + /// `Delay` should not be used for high-resolution timer use cases. + pub(crate) fn new(deadline: Instant) -> Delay { + let registration = Registration::new(deadline, Duration::from_millis(0)); + + Delay { registration } + } + + pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay { + let registration = Registration::new(deadline, duration); + Delay { registration } + } + + pub(crate) fn new_with_handle( + deadline: Instant, + duration: Duration, + handle: HandlePriv, + ) -> Delay { + let mut registration = Registration::new(deadline, duration); + registration.register_with(handle); + + Delay { registration } + } + + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.registration.deadline() + } + + /// Returns true if the `Delay` has elapsed + /// + /// A `Delay` is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.registration.is_elapsed() + } + + /// Reset the `Delay` instance to a new deadline. + /// + /// Calling this function allows changing the instant at which the `Delay` + /// future completes without having to create new associated state. + /// + /// This function can be called both before and after the future has + /// completed. + pub fn reset(&mut self, deadline: Instant) { + self.registration.reset(deadline); + } + + pub(crate) fn reset_timeout(&mut self) { + self.registration.reset_timeout(); + } + + /// Register the delay with the timer instance for the current execution + /// context. + fn register(&mut self) { + self.registration.register(); + } +} + +impl Future for Delay { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // Ensure the `Delay` instance is associated with a timer. + self.register(); + + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathlogical case where far too many + // delays have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. + match ready!(self.registration.poll_elapsed(cx)) { + Ok(()) => Poll::Ready(()), + Err(e) => panic!("timer error: {}", e), + } + } +} diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs new file mode 100644 index 00000000..38d04940 --- /dev/null +++ b/tokio/src/time/delay_queue.rs @@ -0,0 +1,854 @@ +//! A queue of delayed elements. +//! +//! See [`DelayQueue`] for more details. +//! +//! [`DelayQueue`]: struct.DelayQueue.html + +use crate::time::clock::now; +use crate::time::timer::Handle; +use crate::time::wheel::{self, Wheel}; +use crate::time::{Delay, Error}; + +use futures_core::ready; +use slab::Slab; +use std::cmp; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; + +/// A queue of delayed elements. +/// +/// Once an element is inserted into the `DelayQueue`, it is yielded once the +/// specified deadline has been reached. +/// +/// # Usage +/// +/// Elements are inserted into `DelayQueue` using the [`insert`] or +/// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is +/// returned. The key is used to remove the entry or to change the deadline at +/// which it should be yielded back. +/// +/// Once delays have been configured, the `DelayQueue` is used via its +/// [`Stream`] implementation. [`poll`] is called. If an entry has reached its +/// deadline, it is returned. If not, `Async::NotReady` indicating that the +/// current task will be notified once the deadline has been reached. +/// +/// # `Stream` implementation +/// +/// Items are retrieved from the queue via [`Stream::poll`]. If no delays have +/// expired, no items are returned. In this case, `NotReady` is returned and the +/// current task is registered to be notified once the next item's delay has +/// expired. +/// +/// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` +/// returns `Ready(None)`. This indicates that the stream has reached an end. +/// However, if a new item is inserted *after*, `poll` will once again start +/// returning items or `NotReady. +/// +/// Items are returned ordered by their expirations. Items that are configured +/// to expire first will be returned first. There are no ordering guarantees +/// for items configured to expire the same instant. Also note that delays are +/// rounded to the closest millisecond. +/// +/// # Implementation +/// +/// The `DelayQueue` is backed by the same hashed timing wheel implementation as +/// [`Timer`] as such, it offers the same performance benefits. See [`Timer`] +/// for further implementation notes. +/// +/// State associated with each entry is stored in a [`slab`]. This allows +/// amortizing the cost of allocation. Space created for expired entries is +/// reused when inserting new entries. +/// +/// Capacity can be checked using [`capacity`] and allocated preemptively by using +/// the [`reserve`] method. +/// +/// # Usage +/// +/// Using `DelayQueue` to manage cache entries. +/// +/// ```rust,no_run +/// use tokio::time::{delay_queue, DelayQueue, Error}; +/// +/// use futures_core::ready; +/// use std::collections::HashMap; +/// use std::task::{Context, Poll}; +/// use std::time::Duration; +/// # type CacheKey = String; +/// # type Value = String; +/// +/// struct Cache { +/// entries: HashMap, +/// expirations: DelayQueue, +/// } +/// +/// const TTL_SECS: u64 = 30; +/// +/// impl Cache { +/// fn insert(&mut self, key: CacheKey, value: Value) { +/// let delay = self.expirations +/// .insert(key.clone(), Duration::from_secs(TTL_SECS)); +/// +/// self.entries.insert(key, (value, delay)); +/// } +/// +/// fn get(&self, key: &CacheKey) -> Option<&Value> { +/// self.entries.get(key) +/// .map(|&(ref v, _)| v) +/// } +/// +/// fn remove(&mut self, key: &CacheKey) { +/// if let Some((_, cache_key)) = self.entries.remove(key) { +/// self.expirations.remove(&cache_key); +/// } +/// } +/// +/// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll> { +/// while let Some(res) = ready!(self.expirations.poll_next(cx)) { +/// let entry = res?; +/// self.entries.remove(entry.get_ref()); +/// } +/// +/// Poll::Ready(Ok(())) +/// } +/// } +/// ``` +/// +/// [`insert`]: #method.insert +/// [`insert_at`]: #method.insert_at +/// [`Key`]: struct.Key.html +/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html +/// [`poll`]: #method.poll +/// [`Stream::poll`]: #method.poll +/// [`Timer`]: ../struct.Timer.html +/// [`slab`]: https://docs.rs/slab +/// [`capacity`]: #method.capacity +/// [`reserve`]: #method.reserve +#[derive(Debug)] +pub struct DelayQueue { + /// Handle to the timer driving the `DelayQueue` + handle: Handle, + + /// Stores data associated with entries + slab: Slab>, + + /// Lookup structure tracking all delays in the queue + wheel: Wheel>, + + /// Delays that were inserted when already expired. These cannot be stored + /// in the wheel + expired: Stack, + + /// Delay expiring when the *first* item in the queue expires + delay: Option, + + /// Wheel polling state + poll: wheel::Poll, + + /// Instant at which the timer starts + start: Instant, +} + +/// An entry in `DelayQueue` that has expired and removed. +/// +/// Values are returned by [`DelayQueue::poll`]. +/// +/// [`DelayQueue::poll`]: struct.DelayQueue.html#method.poll +#[derive(Debug)] +pub struct Expired { + /// The data stored in the queue + data: T, + + /// The expiration time + deadline: Instant, + + /// The key associated with the entry + key: Key, +} + +/// Token to a value stored in a `DelayQueue`. +/// +/// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`] +/// documentation for more details. +/// +/// [`DelayQueue`]: struct.DelayQueue.html +/// [`DelayQueue::insert`]: struct.DelayQueue.html#method.insert +#[derive(Debug, Clone)] +pub struct Key { + index: usize, +} + +#[derive(Debug)] +struct Stack { + /// Head of the stack + head: Option, + _p: PhantomData T>, +} + +#[derive(Debug)] +struct Data { + /// The data being stored in the queue and will be returned at the requested + /// instant. + inner: T, + + /// The instant at which the item is returned. + when: u64, + + /// Set to true when stored in the `expired` queue + expired: bool, + + /// Next entry in the stack + next: Option, + + /// Previous entry in the stack + prev: Option, +} + +/// Maximum number of entries the queue can handle +const MAX_ENTRIES: usize = (1 << 30) - 1; + +impl DelayQueue { + /// Create a new, empty, `DelayQueue` + /// + /// The queue will not allocate storage until items are inserted into it. + /// + /// # Examples + /// + /// ```rust + /// # use tokio::time::DelayQueue; + /// let delay_queue: DelayQueue = DelayQueue::new(); + /// ``` + pub fn new() -> DelayQueue { + DelayQueue::with_capacity(0) + } + + /// Create a new, empty, `DelayQueue` backed by the specified timer. + /// + /// The queue will not allocate storage until items are inserted into it. + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tokio::time::DelayQueue; + /// use tokio::time::timer::Handle; + /// + /// let handle = Handle::default(); + /// let delay_queue: DelayQueue = DelayQueue::with_capacity_and_handle(0, &handle); + /// ``` + pub fn with_capacity_and_handle(capacity: usize, handle: &Handle) -> DelayQueue { + DelayQueue { + handle: handle.clone(), + wheel: Wheel::new(), + slab: Slab::with_capacity(capacity), + expired: Stack::default(), + delay: None, + poll: wheel::Poll::new(0), + start: now(), + } + } + + /// Create a new, empty, `DelayQueue` with the specified capacity. + /// + /// The queue will be able to hold at least `capacity` elements without + /// reallocating. If `capacity` is 0, the queue will not allocate for + /// storage. + /// + /// # Examples + /// + /// ```rust + /// # use tokio::time::DelayQueue; + /// # use std::time::Duration; + /// let mut delay_queue = DelayQueue::with_capacity(10); + /// + /// // These insertions are done without further allocation + /// for i in 0..10 { + /// delay_queue.insert(i, Duration::from_secs(i)); + /// } + /// + /// // This will make the queue allocate additional storage + /// delay_queue.insert(11, Duration::from_secs(11)); + /// ``` + pub fn with_capacity(capacity: usize) -> DelayQueue { + DelayQueue::with_capacity_and_handle(capacity, &Handle::default()) + } + + /// Insert `value` into the queue set to expire at a specific instant in + /// time. + /// + /// This function is identical to `insert`, but takes an `Instant` instead + /// of a `Duration`. + /// + /// `value` is stored in the queue until `when` is reached. At which point, + /// `value` will be returned from [`poll`]. If `when` has already been + /// reached, then `value` is immediately made available to poll. + /// + /// The return value represents the insertion and is used at an argument to + /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once + /// `value` is removed from the queue either by calling [`poll`] after + /// `when` is reached or by calling [`remove`]. At this point, the caller + /// must take care to not use the returned [`Key`] again as it may reference + /// a different item in the queue. + /// + /// See [type] level documentation for more details. + /// + /// # Panics + /// + /// This function panics if `when` is too far in the future. + /// + /// # Examples + /// + /// Basic usage + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// use std::time::{Instant, Duration}; + /// + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert_at( + /// "foo", Instant::now() + Duration::from_secs(5)); + /// + /// // Remove the entry + /// let item = delay_queue.remove(&key); + /// assert_eq!(*item.get_ref(), "foo"); + /// ``` + /// + /// [`poll`]: #method.poll + /// [`remove`]: #method.remove + /// [`reset`]: #method.reset + /// [`Key`]: struct.Key.html + /// [type]: # + pub fn insert_at(&mut self, value: T, when: Instant) -> Key { + assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded"); + + // Normalize the deadline. Values cannot be set to expire in the past. + let when = self.normalize_deadline(when); + + // Insert the value in the store + let key = self.slab.insert(Data { + inner: value, + when, + expired: false, + next: None, + prev: None, + }); + + self.insert_idx(when, key); + + // Set a new delay if the current's deadline is later than the one of the new item + let should_set_delay = if let Some(ref delay) = self.delay { + let current_exp = self.normalize_deadline(delay.deadline()); + current_exp > when + } else { + true + }; + + if should_set_delay { + self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when))); + } + + Key::new(key) + } + + /// Attempt to pull out the next value of the delay queue, registering the + /// current task for wakeup if the value is not yet available, and returning + /// None if the queue is exhausted. + pub fn poll_next( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Error>>> { + let item = ready!(self.poll_idx(cx)); + Poll::Ready(item.map(|result| { + result.map(|idx| { + let data = self.slab.remove(idx); + debug_assert!(data.next.is_none()); + debug_assert!(data.prev.is_none()); + + Expired { + key: Key::new(idx), + data: data.inner, + deadline: self.start + Duration::from_millis(data.when), + } + }) + })) + } + + /// Insert `value` into the queue set to expire after the requested duration + /// elapses. + /// + /// This function is identical to `insert_at`, but takes a `Duration` + /// instead of an `Instant`. + /// + /// `value` is stored in the queue until `when` is reached. At which point, + /// `value` will be returned from [`poll`]. If `when` has already been + /// reached, then `value` is immediately made available to poll. + /// + /// The return value represents the insertion and is used at an argument to + /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once + /// `value` is removed from the queue either by calling [`poll`] after + /// `when` is reached or by calling [`remove`]. At this point, the caller + /// must take care to not use the returned [`Key`] again as it may reference + /// a different item in the queue. + /// + /// See [type] level documentation for more details. + /// + /// # Panics + /// + /// This function panics if `timeout` is greater than the maximum supported + /// duration. + /// + /// # Examples + /// + /// Basic usage + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// use std::time::Duration; + /// + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// + /// // Remove the entry + /// let item = delay_queue.remove(&key); + /// assert_eq!(*item.get_ref(), "foo"); + /// ``` + /// + /// [`poll`]: #method.poll + /// [`remove`]: #method.remove + /// [`reset`]: #method.reset + /// [`Key`]: struct.Key.html + /// [type]: # + pub fn insert(&mut self, value: T, timeout: Duration) -> Key { + self.insert_at(value, now() + timeout) + } + + fn insert_idx(&mut self, when: u64, key: usize) { + use self::wheel::{InsertError, Stack}; + + // Register the deadline with the timer wheel + match self.wheel.insert(when, key, &mut self.slab) { + Ok(_) => {} + Err((_, InsertError::Elapsed)) => { + self.slab[key].expired = true; + // The delay is already expired, store it in the expired queue + self.expired.push(key, &mut self.slab); + } + Err((_, err)) => panic!("invalid deadline; err={:?}", err), + } + } + + /// Remove the item associated with `key` from the queue. + /// + /// There must be an item associated with `key`. The function returns the + /// removed item as well as the `Instant` at which it will the delay will + /// have expired. + /// + /// # Panics + /// + /// The function panics if `key` is not contained by the queue. + /// + /// # Examples + /// + /// Basic usage + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// use std::time::Duration; + /// + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// + /// // Remove the entry + /// let item = delay_queue.remove(&key); + /// assert_eq!(*item.get_ref(), "foo"); + /// ``` + pub fn remove(&mut self, key: &Key) -> Expired { + use crate::time::wheel::Stack; + + // Special case the `expired` queue + if self.slab[key.index].expired { + self.expired.remove(&key.index, &mut self.slab); + } else { + self.wheel.remove(&key.index, &mut self.slab); + } + + let data = self.slab.remove(key.index); + + Expired { + key: Key::new(key.index), + data: data.inner, + deadline: self.start + Duration::from_millis(data.when), + } + } + + /// Sets the delay of the item associated with `key` to expire at `when`. + /// + /// This function is identical to `reset` but takes an `Instant` instead of + /// a `Duration`. + /// + /// The item remains in the queue but the delay is set to expire at `when`. + /// If `when` is in the past, then the item is immediately made available to + /// the caller. + /// + /// # Panics + /// + /// This function panics if `when` is too far in the future or if `key` is + /// not contained by the queue. + /// + /// # Examples + /// + /// Basic usage + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// use std::time::{Duration, Instant}; + /// + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// + /// // "foo" is scheduled to be returned in 5 seconds + /// + /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); + /// + /// // "foo"is now scheduled to be returned in 10 seconds + /// ``` + pub fn reset_at(&mut self, key: &Key, when: Instant) { + self.wheel.remove(&key.index, &mut self.slab); + + // Normalize the deadline. Values cannot be set to expire in the past. + let when = self.normalize_deadline(when); + + self.slab[key.index].when = when; + self.insert_idx(when, key.index); + + let next_deadline = self.next_deadline(); + if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { + delay.reset(deadline); + } + } + + /// Returns the next time poll as determined by the wheel + fn next_deadline(&mut self) -> Option { + self.wheel + .poll_at() + .map(|poll_at| self.start + Duration::from_millis(poll_at)) + } + + /// Sets the delay of the item associated with `key` to expire after + /// `timeout`. + /// + /// This function is identical to `reset_at` but takes a `Duration` instead + /// of an `Instant`. + /// + /// The item remains in the queue but the delay is set to expire after + /// `timeout`. If `timeout` is zero, then the item is immediately made + /// available to the caller. + /// + /// # Panics + /// + /// This function panics if `timeout` is greater than the maximum supported + /// duration or if `key` is not contained by the queue. + /// + /// # Examples + /// + /// Basic usage + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// use std::time::Duration; + /// + /// let mut delay_queue = DelayQueue::new(); + /// let key = delay_queue.insert("foo", Duration::from_secs(5)); + /// + /// // "foo" is scheduled to be returned in 5 seconds + /// + /// delay_queue.reset(&key, Duration::from_secs(10)); + /// + /// // "foo"is now scheduled to be returned in 10 seconds + /// ``` + pub fn reset(&mut self, key: &Key, timeout: Duration) { + self.reset_at(key, now() + timeout); + } + + /// Clears the queue, removing all items. + /// + /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`. + /// + /// Note that this method has no effect on the allocated capacity. + /// + /// [`poll`]: #method.poll + /// + /// # Examples + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// use std::time::Duration; + /// + /// let mut delay_queue = DelayQueue::new(); + /// + /// delay_queue.insert("foo", Duration::from_secs(5)); + /// + /// assert!(!delay_queue.is_empty()); + /// + /// delay_queue.clear(); + /// + /// assert!(delay_queue.is_empty()); + /// ``` + pub fn clear(&mut self) { + self.slab.clear(); + self.expired = Stack::default(); + self.wheel = Wheel::new(); + self.delay = None; + } + + /// Returns the number of elements the queue can hold without reallocating. + /// + /// # Examples + /// + /// ```rust + /// use tokio::time::DelayQueue; + /// + /// let delay_queue: DelayQueue = DelayQueue::with_capacity(10); + /// assert_eq!(delay_queue.capacity(), 10); + /// ``` + pub fn capacity(&self) -> usize { + self.slab.capacity() + } + + /// Reserve capacity for at least `additional` more items to be queued + /// without allocating. + /// + /// `reserve` does nothing if the queue already has sufficient capacity for + /// `additional` more values. If more capacity is required, a new segment of + /// memory will be allocated and all existing values will be copied into it. + /// As such, if the queue is already very large, a call to `reserve` can end + /// up being expensive. + /// + /// The queue may reserve more than `additional` extra space in order to + /// avoid frequent reallocations. + /// + /// # Panics + /// + /// Panics if the new capacity exceeds the maximum number of entries the + /// queue can contain. + /// + /// # Examples + /// + /// ``` + /// use tokio::time::DelayQueue; + /// use std::time::Duration; + /// + /// let mut delay_queue = DelayQueue::new(); + /// + /// delay_queue.insert("hello", Duration::from_secs(10)); + /// delay_queue.reserve(10); + /// + /// assert!(delay_queue.capacity() >= 11); + /// ``` + pub fn reserve(&mut self, additional: usize) { + self.slab.reserve(additional); + } + + /// Returns `true` if there are no items in the queue. + /// + /// Note that this function returns `false` even if all items have not yet + /// expired and a call to `poll` will return `NotReady`. + /// + /// # Examples + /// + /// ``` + /// use tokio::time::DelayQueue; + /// use std::time::Duration; + /// + /// let mut delay_queue = DelayQueue::new(); + /// assert!(delay_queue.is_empty()); + /// + /// delay_queue.insert("hello", Duration::from_secs(5)); + /// assert!(!delay_queue.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.slab.is_empty() + } + + /// Polls the queue, returning the index of the next slot in the slab that + /// should be returned. + /// + /// A slot should be returned when the associated deadline has been reached. + fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll>> { + use self::wheel::Stack; + + let expired = self.expired.pop(&mut self.slab); + + if expired.is_some() { + return Poll::Ready(expired.map(Ok)); + } + + loop { + if let Some(ref mut delay) = self.delay { + if !delay.is_elapsed() { + ready!(Pin::new(&mut *delay).poll(cx)); + } + + let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); + + self.poll = wheel::Poll::new(now); + } + + self.delay = None; + + if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) { + return Poll::Ready(Some(Ok(idx))); + } + + if let Some(deadline) = self.next_deadline() { + self.delay = Some(self.handle.delay(deadline)); + } else { + return Poll::Ready(None); + } + } + } + + fn normalize_deadline(&self, when: Instant) -> u64 { + let when = if when < self.start { + 0 + } else { + crate::time::ms(when - self.start, crate::time::Round::Up) + }; + + cmp::max(when, self.wheel.elapsed()) + } +} + +// We never put `T` in a `Pin`... +impl Unpin for DelayQueue {} + +impl futures_core::Stream for DelayQueue { + // DelayQueue seems much more specific, where a user may care that it + // has reached capacity, so return those errors instead of panicking. + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + DelayQueue::poll_next(self.get_mut(), cx) + } +} + +impl Default for DelayQueue { + fn default() -> DelayQueue { + DelayQueue::new() + } +} + +impl wheel::Stack for Stack { + type Owned = usize; + type Borrowed = usize; + type Store = Slab>; + + fn is_empty(&self) -> bool { + self.head.is_none() + } + + fn push(&mut self, item: Self::Owned, store: &mut Self::Store) { + // Ensure the entry is not already in a stack. + debug_assert!(store[item].next.is_none()); + debug_assert!(store[item].prev.is_none()); + + // Remove the old head entry + let old = self.head.take(); + + if let Some(idx) = old { + store[idx].prev = Some(item); + } + + store[item].next = old; + self.head = Some(item) + } + + fn pop(&mut self, store: &mut Self::Store) -> Option { + if let Some(idx) = self.head { + self.head = store[idx].next; + + if let Some(idx) = self.head { + store[idx].prev = None; + } + + store[idx].next = None; + debug_assert!(store[idx].prev.is_none()); + + Some(idx) + } else { + None + } + } + + fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) { + assert!(store.contains(*item)); + + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head; + let mut contains = false; + + while let Some(idx) = next { + if idx == *item { + debug_assert!(!contains); + contains = true; + } + + next = store[idx].next; + } + + contains + }); + + if let Some(next) = store[*item].next { + store[next].prev = store[*item].prev; + } + + if let Some(prev) = store[*item].prev { + store[prev].next = store[*item].next; + } else { + self.head = store[*item].next; + } + + store[*item].next = None; + store[*item].prev = None; + } + + fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 { + store[*item].when + } +} + +impl Default for Stack { + fn default() -> Stack { + Stack { + head: None, + _p: PhantomData, + } + } +} + +impl Key { + pub(crate) fn new(index: usize) -> Key { + Key { index } + } +} + +impl Expired { + /// Returns a reference to the inner value. + pub fn get_ref(&self) -> &T { + &self.data + } + + /// Returns a mutable reference to the inner value. + pub fn get_mut(&mut self) -> &mut T { + &mut self.data + } + + /// Consumes `self` and returns the inner value. + pub fn into_inner(self) -> T { + self.data + } +} diff --git a/tokio/src/time/error.rs b/tokio/src/time/error.rs new file mode 100644 index 00000000..994eec1f --- /dev/null +++ b/tokio/src/time/error.rs @@ -0,0 +1,72 @@ +use self::Kind::*; +use std::error; +use std::fmt; + +/// Errors encountered by the timer implementation. +/// +/// Currently, there are two different errors that can occur: +/// +/// * `shutdown` occurs when a timer operation is attempted, but the timer +/// instance has been dropped. In this case, the operation will never be able +/// to complete and the `shutdown` error is returned. This is a permanent +/// error, i.e., once this error is observed, timer operations will never +/// succeed in the future. +/// +/// * `at_capacity` occurs when a timer operation is attempted, but the timer +/// instance is currently handling its maximum number of outstanding delays. +/// In this case, the operation is not able to be performed at the current +/// moment, and `at_capacity` is returned. This is a transient error, i.e., at +/// some point in the future, if the operation is attempted again, it might +/// succeed. Callers that observe this error should attempt to [shed load]. One +/// way to do this would be dropping the future that issued the timer operation. +/// +/// [shed load]: https://en.wikipedia.org/wiki/Load_Shedding +#[derive(Debug)] +pub struct Error(Kind); + +#[derive(Debug)] +enum Kind { + Shutdown, + AtCapacity, +} + +impl Error { + /// Create an error representing a shutdown timer. + pub fn shutdown() -> Error { + Error(Shutdown) + } + + /// Returns `true` if the error was caused by the timer being shutdown. + pub fn is_shutdown(&self) -> bool { + match self.0 { + Kind::Shutdown => true, + _ => false, + } + } + + /// Create an error representing a timer at capacity. + pub fn at_capacity() -> Error { + Error(AtCapacity) + } + + /// Returns `true` if the error was caused by the timer being at capacity. + pub fn is_at_capacity(&self) -> bool { + match self.0 { + Kind::AtCapacity => true, + _ => false, + } + } +} + +impl error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use self::Kind::*; + let descr = match self.0 { + Shutdown => "timer is shutdown", + AtCapacity => "timer is at capacity and cannot create a new entry", + }; + write!(fmt, "{}", descr) + } +} diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs new file mode 100644 index 00000000..f517b23f --- /dev/null +++ b/tokio/src/time/interval.rs @@ -0,0 +1,112 @@ +use crate::time::{clock, Delay}; + +use futures_core::ready; +use futures_util::future::poll_fn; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; + +/// A stream representing notifications at fixed interval +#[derive(Debug)] +pub struct Interval { + /// Future that completes the next time the `Interval` yields a value. + delay: Delay, + + /// The duration between values yielded by `Interval`. + duration: Duration, +} + +impl Interval { + /// Create a new `Interval` that starts at `at` and yields every `duration` + /// interval after that. + /// + /// Note that when it starts, it produces item too. + /// + /// The `duration` argument must be a non-zero duration. + /// + /// # Panics + /// + /// This function panics if `duration` is zero. + pub fn new(at: Instant, duration: Duration) -> Interval { + assert!( + duration > Duration::new(0, 0), + "`duration` must be non-zero." + ); + + Interval::new_with_delay(Delay::new(at), duration) + } + + /// Creates new `Interval` that yields with interval of `duration`. + /// + /// The function is shortcut for `Interval::new(tokio::time::clock::now() + duration, duration)`. + /// + /// The `duration` argument must be a non-zero duration. + /// + /// # Panics + /// + /// This function panics if `duration` is zero. + pub fn new_interval(duration: Duration) -> Interval { + Interval::new(clock::now() + duration, duration) + } + + pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval { + Interval { delay, duration } + } + + #[doc(hidden)] // TODO: remove + pub fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll> { + // Wait for the delay to be done + ready!(Pin::new(&mut self.delay).poll(cx)); + + // Get the `now` by looking at the `delay` deadline + let now = self.delay.deadline(); + + // The next interval value is `duration` after the one that just + // yielded. + let next = now + self.duration; + self.delay.reset(next); + + // Return the current instant + Poll::Ready(Some(now)) + } + + /// Completes when the next instant in the interval has been reached. + /// + /// # Examples + /// + /// ``` + /// use tokio::time::Interval; + /// + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut interval = Interval::new_interval(Duration::from_millis(10)); + /// + /// interval.next().await; + /// interval.next().await; + /// interval.next().await; + /// + /// // approximately 30ms have elapsed. + /// } + /// ``` + #[allow(clippy::should_implement_trait)] // TODO: rename (tokio-rs/tokio#1261) + pub async fn next(&mut self) -> Option { + poll_fn(|cx| self.poll_next(cx)).await + } +} + +impl futures_core::FusedStream for Interval { + fn is_terminated(&self) -> bool { + false + } +} + +impl futures_core::Stream for Interval { + type Item = Instant; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + Interval::poll_next(self.get_mut(), cx) + } +} diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs new file mode 100644 index 00000000..d6f1e0de --- /dev/null +++ b/tokio/src/time/mod.rs @@ -0,0 +1,143 @@ +//! Utilities for tracking time. +//! +//! This module provides a number of types for executing code after a set period +//! of time. +//! +//! * [`Delay`][Delay] is a future that does no work and completes at a specific `Instant` +//! in time. +//! +//! * [`Interval`][Interval] is a stream yielding a value at a fixed period. It +//! is initialized with a `Duration` and repeatedly yields each time the +//! duration elapses. +//! +//! * [`Timeout`][Timeout]: Wraps a future or stream, setting an upper bound to the +//! amount of time it is allowed to execute. If the future or stream does not +//! complete in time, then it is canceled and an error is returned. +//! +//! * [`DelayQueue`]: A queue where items are returned once the requested delay +//! has expired. +//! +//! These types are sufficient for handling a large number of scenarios +//! involving time. +//! +//! These types must be used from within the context of the +//! [`Runtime`][runtime]. +//! +//! # Examples +//! +//! Wait 100ms and print "Hello World!" +//! +//! ``` +//! use tokio::time::delay_for; +//! +//! use std::time::Duration; +//! +//! +//! #[tokio::main] +//! async fn main() { +//! delay_for(Duration::from_millis(100)).await; +//! println!("100 ms have elapsed"); +//! } +//! ``` +//! +//! Require that an operation takes no more than 300ms. Note that this uses the +//! [`timeout`][ext] function on the [`FutureExt`][ext] trait. This trait is +//! included in the prelude. +//! +//! ``` +//! use tokio::prelude::*; +//! use std::time::Duration; +//! +//! async fn long_future() { +//! // do work here +//! } +//! +//! # async fn dox() { +//! let res = long_future() +//! .timeout(Duration::from_secs(1)) +//! .await; +//! +//! if res.is_err() { +//! println!("operation timed out"); +//! } +//! # } +//! ``` +//! +//! [runtime]: ../runtime/struct.Runtime.html +//! [ext]: ../util/trait.FutureExt.html#method.timeout +//! [Timeout]: struct.Timeout.html +//! [Delay]: struct.Delay.html +//! [Interval]: struct.Interval.html +//! [`DelayQueue`]: struct.DelayQueue.html + +pub mod clock; + +pub mod delay_queue; +#[doc(inline)] +pub use self::delay_queue::DelayQueue; + +pub mod throttle; + +// TODO: clean this up +pub mod timer; +pub use timer::{set_default, Timer}; + +pub mod timeout; +#[doc(inline)] +pub use timeout::Timeout; + +mod atomic; + +mod delay; +pub use self::delay::Delay; + +mod error; +pub use error::Error; + +mod interval; +pub use interval::Interval; + +mod wheel; + +use std::