From 60d81bbe10faf344ea18438a1c5ecb9173e6ec52 Mon Sep 17 00:00:00 2001 From: Juan Alvarez Date: Thu, 8 Oct 2020 22:35:12 -0500 Subject: time: rename `Delay` future to `Sleep` (#2932) --- tokio-test/src/io.rs | 4 +- tokio-test/tests/block_on.rs | 2 +- tokio-util/src/time/delay_queue.rs | 4 +- tokio/src/lib.rs | 6 +- tokio/src/macros/select.rs | 26 +-- tokio/src/runtime/handle.rs | 2 +- tokio/src/runtime/mod.rs | 4 +- tokio/src/stream/throttle.rs | 6 +- tokio/src/stream/timeout.rs | 6 +- tokio/src/sync/mod.rs | 10 +- tokio/src/time/clock.rs | 2 +- tokio/src/time/delay.rs | 139 ------------ tokio/src/time/driver/entry.rs | 10 +- tokio/src/time/driver/mod.rs | 18 +- tokio/src/time/error.rs | 2 +- tokio/src/time/interval.rs | 4 +- tokio/src/time/mod.rs | 6 +- tokio/src/time/sleep.rs | 139 ++++++++++++ tokio/src/time/tests/mod.rs | 10 +- tokio/src/time/tests/test_delay.rs | 449 ------------------------------------- tokio/src/time/tests/test_sleep.rs | 449 +++++++++++++++++++++++++++++++++++++ tokio/src/time/timeout.rs | 8 +- tokio/src/time/wheel/mod.rs | 2 +- tokio/tests/macros_select.rs | 4 +- tokio/tests/rt_common.rs | 6 +- tokio/tests/stream_timeout.rs | 8 +- tokio/tests/time_delay.rs | 196 ---------------- tokio/tests/time_sleep.rs | 196 ++++++++++++++++ 28 files changed, 859 insertions(+), 859 deletions(-) delete mode 100644 tokio/src/time/delay.rs create mode 100644 tokio/src/time/sleep.rs delete mode 100644 tokio/src/time/tests/test_delay.rs create mode 100644 tokio/src/time/tests/test_sleep.rs delete mode 100644 tokio/tests/time_delay.rs create mode 100644 tokio/tests/time_sleep.rs diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index 3d04a58a..7cfad2ec 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -20,7 +20,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::mpsc; -use tokio::time::{self, Delay, Duration, Instant}; +use tokio::time::{self, Duration, Instant, Sleep}; use futures_core::ready; use std::collections::VecDeque; @@ -67,7 +67,7 @@ enum Action { struct Inner { actions: VecDeque, waiting: Option, - sleep: Option, + sleep: Option, read_wait: Option, rx: mpsc::UnboundedReceiver, } diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs index 280471df..efaaf510 100644 --- a/tokio-test/tests/block_on.rs +++ b/tokio-test/tests/block_on.rs @@ -18,7 +18,7 @@ fn async_fn() { } #[test] -fn test_delay() { +fn test_sleep() { let deadline = Instant::now() + Duration::from_millis(100); block_on(async { diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index 92c922b8..7a1d1acf 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -7,7 +7,7 @@ use crate::time::wheel::{self, Wheel}; use futures_core::ready; -use tokio::time::{sleep_until, Delay, Duration, Error, Instant}; +use tokio::time::{sleep_until, Duration, Error, Instant, Sleep}; use slab::Slab; use std::cmp; @@ -138,7 +138,7 @@ pub struct DelayQueue { expired: Stack, /// Delay expiring when the *first* item in the queue expires - delay: Option, + delay: Option, /// Wheel polling state wheel_now: u64, diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 948ac888..326b1395 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -25,7 +25,7 @@ //! provides a few major components: //! //! * Tools for [working with asynchronous tasks][tasks], including -//! [synchronization primitives and channels][sync] and [timeouts, delays, and +//! [synchronization primitives and channels][sync] and [timeouts, sleeps, and //! intervals][time]. //! * APIs for [performing asynchronous I/O][io], including [TCP and UDP][net] sockets, //! [filesystem][fs] operations, and [process] and [signal] management. @@ -183,13 +183,13 @@ //! //! The [`tokio::time`] module provides utilities for tracking time and //! scheduling work. This includes functions for setting [timeouts][timeout] for -//! tasks, [delaying][delay] work to run in the future, or [repeating an operation at an +//! tasks, [sleeping][sleep] work to run in the future, or [repeating an operation at an //! interval][interval]. //! //! In order to use `tokio::time`, the "time" feature flag must be enabled. //! //! [`tokio::time`]: crate::time -//! [delay]: crate::time::sleep() +//! [sleep]: crate::time::sleep() //! [interval]: crate::time::interval() //! [timeout]: crate::time::timeout() //! diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index 8f15f9aa..b63abdd2 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -63,9 +63,9 @@ /// Given that `if` preconditions are used to disable `select!` branches, some /// caution must be used to avoid missing values. /// -/// For example, here is **incorrect** usage of `delay` with `if`. The objective +/// For example, here is **incorrect** usage of `sleep` with `if`. The objective /// is to repeatedly run an asynchronous task for up to 50 milliseconds. -/// However, there is a potential for the `delay` completion to be missed. +/// However, there is a potential for the `sleep` completion to be missed. /// /// ```no_run /// use tokio::time::{self, Duration}; @@ -76,11 +76,11 @@ /// /// #[tokio::main] /// async fn main() { -/// let mut delay = time::sleep(Duration::from_millis(50)); +/// let mut sleep = time::sleep(Duration::from_millis(50)); /// -/// while !delay.is_elapsed() { +/// while !sleep.is_elapsed() { /// tokio::select! { -/// _ = &mut delay, if !delay.is_elapsed() => { +/// _ = &mut sleep, if !sleep.is_elapsed() => { /// println!("operation timed out"); /// } /// _ = some_async_work() => { @@ -91,11 +91,11 @@ /// } /// ``` /// -/// In the above example, `delay.is_elapsed()` may return `true` even if -/// `delay.poll()` never returned `Ready`. This opens up a potential race -/// condition where `delay` expires between the `while !delay.is_elapsed()` +/// In the above example, `sleep.is_elapsed()` may return `true` even if +/// `sleep.poll()` never returned `Ready`. This opens up a potential race +/// condition where `sleep` expires between the `while !sleep.is_elapsed()` /// check and the call to `select!` resulting in the `some_async_work()` call to -/// run uninterrupted despite the delay having elapsed. +/// run uninterrupted despite the sleep having elapsed. /// /// One way to write the above example without the race would be: /// @@ -109,11 +109,11 @@ /// /// #[tokio::main] /// async fn main() { -/// let mut delay = time::sleep(Duration::from_millis(50)); +/// let mut sleep = time::sleep(Duration::from_millis(50)); /// /// loop { /// tokio::select! { -/// _ = &mut delay => { +/// _ = &mut sleep => { /// println!("operation timed out"); /// break; /// } @@ -226,7 +226,7 @@ /// #[tokio::main] /// async fn main() { /// let mut stream = stream::iter(vec![1, 2, 3]); -/// let mut delay = time::sleep(Duration::from_secs(1)); +/// let mut sleep = time::sleep(Duration::from_secs(1)); /// /// loop { /// tokio::select! { @@ -237,7 +237,7 @@ /// break; /// } /// } -/// _ = &mut delay => { +/// _ = &mut sleep => { /// println!("timeout"); /// break; /// } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index dfcc5e97..d48b6242 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -28,7 +28,7 @@ pub(crate) struct Handle { impl Handle { /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. + /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. /// It will also allow you to call methods such as [`tokio::spawn`]. pub(crate) fn enter(&self, f: F) -> R where diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 22109f7d..8e70db85 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -450,10 +450,10 @@ impl Runtime { } /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. + /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. /// It will also allow you to call methods such as [`tokio::spawn`]. /// - /// [`Delay`]: struct@crate::time::Delay + /// [`Sleep`]: struct@crate::time::Sleep /// [`TcpStream`]: struct@crate::net::TcpStream /// [`tokio::spawn`]: fn@crate::spawn /// diff --git a/tokio/src/stream/throttle.rs b/tokio/src/stream/throttle.rs index 1200b384..8f4a256d 100644 --- a/tokio/src/stream/throttle.rs +++ b/tokio/src/stream/throttle.rs @@ -1,7 +1,7 @@ //! Slow down a stream by enforcing a delay between items. use crate::stream::Stream; -use crate::time::{Delay, Duration, Instant}; +use crate::time::{Duration, Instant, Sleep}; use std::future::Future; use std::marker::Unpin; @@ -17,7 +17,7 @@ where let delay = if duration == Duration::from_millis(0) { None } else { - Some(Delay::new_timeout(Instant::now() + duration, duration)) + Some(Sleep::new_timeout(Instant::now() + duration, duration)) }; Throttle { @@ -34,7 +34,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct Throttle { // `None` when duration is zero. - delay: Option, + delay: Option, duration: Duration, // Set to true when `delay` has returned ready, but `stream` hasn't. diff --git a/tokio/src/stream/timeout.rs b/tokio/src/stream/timeout.rs index b8a2024f..b16000c6 100644 --- a/tokio/src/stream/timeout.rs +++ b/tokio/src/stream/timeout.rs @@ -1,5 +1,5 @@ use crate::stream::{Fuse, Stream}; -use crate::time::{Delay, Elapsed, Instant}; +use crate::time::{Elapsed, Instant, Sleep}; use core::future::Future; use core::pin::Pin; @@ -14,7 +14,7 @@ pin_project! { pub struct Timeout { #[pin] stream: Fuse, - deadline: Delay, + deadline: Sleep, duration: Duration, poll_deadline: bool, } @@ -23,7 +23,7 @@ pin_project! { impl Timeout { pub(super) fn new(stream: S, duration: Duration) -> Self { let next = Instant::now() + duration; - let deadline = Delay::new_timeout(next, duration); + let deadline = Sleep::new_timeout(next, duration); Timeout { stream: Fuse::new(stream), diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 294b0b50..4919ad8e 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -359,11 +359,11 @@ //! let mut conf = rx.borrow().clone(); //! //! let mut op_start = Instant::now(); -//! let mut delay = time::sleep_until(op_start + conf.timeout); +//! let mut sleep = time::sleep_until(op_start + conf.timeout); //! //! loop { //! tokio::select! { -//! _ = &mut delay => { +//! _ = &mut sleep => { //! // The operation elapsed. Restart it //! op.set(my_async_operation()); //! @@ -371,14 +371,14 @@ //! op_start = Instant::now(); //! //! // Restart the timeout -//! delay = time::sleep_until(op_start + conf.timeout); +//! sleep = time::sleep_until(op_start + conf.timeout); //! } //! _ = rx.changed() => { //! conf = rx.borrow().clone(); //! //! // The configuration has been updated. Update the -//! // `delay` using the new `timeout` value. -//! delay.reset(op_start + conf.timeout); +//! // `sleep` using the new `timeout` value. +//! sleep.reset(op_start + conf.timeout); //! } //! _ = &mut op => { //! // The operation completed! diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index bd67d7a3..80682d59 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -58,7 +58,7 @@ cfg_test_util! { /// The current value of `Instant::now()` is saved and all subsequent calls /// to `Instant::now()` until the timer wheel is checked again will return the saved value. /// Once the timer wheel is checked, time will immediately advance to the next registered - /// `Delay`. This is useful for running tests that depend on time. + /// `Sleep`. This is useful for running tests that depend on time. /// /// # Panics /// diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs deleted file mode 100644 index 9364860d..00000000 --- a/tokio/src/time/delay.rs +++ /dev/null @@ -1,139 +0,0 @@ -use crate::time::driver::{Entry, Handle}; -use crate::time::{Duration, Error, Instant}; - -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{self, Poll}; - -/// Waits until `deadline` is reached. -/// -/// No work is performed while awaiting on the delay to complete. The delay -/// operates at millisecond granularity and should not be used for tasks that -/// require high-resolution timers. -/// -/// # Cancellation -/// -/// Canceling a delay is done by dropping the returned future. No additional -/// cleanup work is required. -pub fn sleep_until(deadline: Instant) -> Delay { - Delay::new_timeout(deadline, Duration::from_millis(0)) -} - -/// Waits until `duration` has elapsed. -/// -/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous -/// analog to `std::thread::sleep`. -/// -/// No work is performed while awaiting on the delay to complete. The delay -/// operates at millisecond granularity and should not be used for tasks that -/// require high-resolution timers. -/// -/// To run something regularly on a schedule, see [`interval`]. -/// -/// # Cancellation -/// -/// Canceling a delay is done by dropping the returned future. No additional -/// cleanup work is required. -/// -/// # Examples -/// -/// Wait 100ms and print "100 ms have elapsed". -/// -/// ``` -/// use tokio::time::{sleep, Duration}; -/// -/// #[tokio::main] -/// async fn main() { -/// sleep(Duration::from_millis(100)).await; -/// println!("100 ms have elapsed"); -/// } -/// ``` -/// -/// [`interval`]: crate::time::interval() -pub fn sleep(duration: Duration) -> Delay { - sleep_until(Instant::now() + duration) -} - -/// Future returned by [`sleep`](sleep) and -/// [`sleep_until`](sleep_until). -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Delay { - /// The link between the `Delay` instance and the timer that drives it. - /// - /// This also stores the `deadline` value. - entry: Arc, -} - -impl Delay { - pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay { - let handle = Handle::current(); - let entry = Entry::new(&handle, deadline, duration); - - Delay { entry } - } - - /// Returns the instant at which the future will complete. - pub fn deadline(&self) -> Instant { - self.entry.time_ref().deadline - } - - /// Returns `true` if the `Delay` has elapsed - /// - /// A `Delay` is elapsed when the requested duration has elapsed. - pub fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() - } - - /// Resets the `Delay` instance to a new deadline. - /// - /// Calling this function allows changing the instant at which the `Delay` - /// future completes without having to create new associated state. - /// - /// This function can be called both before and after the future has - /// completed. - pub fn reset(&mut self, deadline: Instant) { - unsafe { - self.entry.time_mut().deadline = deadline; - } - - Entry::reset(&mut self.entry); - } - - fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - self.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } -} - -impl Future for Delay { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // `poll_elapsed` can return an error in two cases: - // - // - AtCapacity: this is a pathological case where far too many - // delays have been scheduled. - // - Shutdown: No timer has been setup, which is a mis-use error. - // - // Both cases are extremely rare, and pretty accurately fit into - // "logic errors", so we just panic in this case. A user couldn't - // really do much better if we passed the error onwards. - match ready!(self.poll_elapsed(cx)) { - Ok(()) => Poll::Ready(()), - Err(e) => panic!("timer error: {}", e), - } - } -} - -impl Drop for Delay { - fn drop(&mut self) { - Entry::cancel(&self.entry); - } -} diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 20f8e1c6..20a3a8c5 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -11,7 +11,7 @@ use std::sync::{Arc, Weak}; use std::task::{self, Poll}; use std::u64; -/// Internal state shared between a `Delay` instance and the timer. +/// Internal state shared between a `Sleep` instance and the timer. /// /// This struct is used as a node in two intrusive data structures: /// @@ -28,7 +28,7 @@ pub(crate) struct Entry { time: CachePadded>, /// Timer internals. Using a weak pointer allows the timer to shutdown - /// without all `Delay` instances having completed. + /// without all `Sleep` instances having completed. /// /// When empty, it means that the entry has not yet been linked with a /// timer instance. @@ -69,8 +69,8 @@ pub(crate) struct Entry { /// When the entry expires, relative to the `start` of the timer /// (Inner::start). This is only used by the timer. /// - /// A `Delay` instance can be reset to a different deadline by the thread - /// that owns the `Delay` instance. In this case, the timer thread will not + /// A `Sleep` instance can be reset to a different deadline by the thread + /// that owns the `Sleep` instance. In this case, the timer thread will not /// immediately know that this has happened. The timer thread must know the /// last deadline that it saw as it uses this value to locate the entry in /// its wheel. @@ -94,7 +94,7 @@ pub(crate) struct Entry { pub(crate) prev_stack: UnsafeCell<*const Entry>, } -/// Stores the info for `Delay`. +/// Stores the info for `Sleep`. #[derive(Debug)] pub(crate) struct Time { pub(crate) deadline: Instant, diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 5ece7c72..94e905b4 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -20,12 +20,12 @@ use std::sync::Arc; use std::usize; use std::{cmp, fmt}; -/// Time implementation that drives [`Delay`][delay], [`Interval`][interval], and [`Timeout`][timeout]. +/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// /// A `Driver` instance tracks the state necessary for managing time and -/// notifying the [`Delay`][delay] instances once their deadlines are reached. +/// notifying the [`Sleep`][sleep] instances once their deadlines are reached. /// -/// It is expected that a single instance manages many individual [`Delay`][delay] +/// It is expected that a single instance manages many individual [`Sleep`][sleep] /// instances. The `Driver` implementation is thread-safe and, as such, is able /// to handle callers from across threads. /// @@ -36,9 +36,9 @@ use std::{cmp, fmt}; /// The driver has a resolution of one millisecond. Any unit of time that falls /// between milliseconds are rounded up to the next millisecond. /// -/// When an instance is dropped, any outstanding [`Delay`][delay] instance that has not +/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not /// elapsed will be notified with an error. At this point, calling `poll` on the -/// [`Delay`][delay] instance will result in panic. +/// [`Sleep`][sleep] instance will result in panic. /// /// # Implementation /// @@ -65,14 +65,14 @@ use std::{cmp, fmt}; /// * Level 5: 64 x ~12 day slots. /// /// When the timer processes entries at level zero, it will notify all the -/// `Delay` instances as their deadlines have been reached. For all higher +/// `Sleep` instances as their deadlines have been reached. For all higher /// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will +/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will /// either be canceled (dropped) or their associated entries will reach level /// zero and be notified. /// /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf -/// [delay]: crate::time::Delay +/// [sleep]: crate::time::Sleep /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] @@ -138,7 +138,7 @@ where /// Returns a handle to the timer. /// - /// The `Handle` is how `Delay` instances are created. The `Delay` instances + /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances /// can either be created directly or the `Handle` instance can be passed to /// `with_default`, setting the timer as the default timer for the execution /// context. diff --git a/tokio/src/time/error.rs b/tokio/src/time/error.rs index a022f78f..7154a330 100644 --- a/tokio/src/time/error.rs +++ b/tokio/src/time/error.rs @@ -13,7 +13,7 @@ use std::fmt; /// succeed in the future. /// /// * `at_capacity` occurs when a timer operation is attempted, but the timer -/// instance is currently handling its maximum number of outstanding delays. +/// instance is currently handling its maximum number of outstanding sleep instances. /// In this case, the operation is not able to be performed at the current /// moment, and `at_capacity` is returned. This is a transient error, i.e., at /// some point in the future, if the operation is attempted again, it might diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 1b443a36..c7c58e17 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -1,5 +1,5 @@ use crate::future::poll_fn; -use crate::time::{sleep_until, Delay, Duration, Instant}; +use crate::time::{sleep_until, Duration, Instant, Sleep}; use std::future::Future; use std::pin::Pin; @@ -115,7 +115,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value. - delay: Delay, + delay: Sleep, /// The duration between values yielded by `Interval`. period: Duration, diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index a68e11b5..41148641 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -3,7 +3,7 @@ //! This module provides a number of types for executing code after a set period //! of time. //! -//! * `Delay` is a future that does no work and completes at a specific `Instant` +//! * `Sleep` is a future that does no work and completes at a specific `Instant` //! in time. //! //! * `Interval` is a stream yielding a value at a fixed period. It is @@ -93,8 +93,8 @@ pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; -mod delay; -pub use delay::{sleep, sleep_until, Delay}; +mod sleep; +pub use sleep::{sleep, sleep_until, Sleep}; pub(crate) mod driver; diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs new file mode 100644 index 00000000..9f836b0f --- /dev/null +++ b/tokio/src/time/sleep.rs @@ -0,0 +1,139 @@ +use crate::time::driver::{Entry, Handle}; +use crate::time::{Duration, Error, Instant}; + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{self, Poll}; + +/// Waits until `deadline` is reached. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +pub fn sleep_until(deadline: Instant) -> Sleep { + Sleep::new_timeout(deadline, Duration::from_millis(0)) +} + +/// Waits until `duration` has elapsed. +/// +/// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous +/// analog to `std::thread::sleep`. +/// +/// No work is performed while awaiting on the sleep future to complete. `Sleep` +/// operates at millisecond granularity and should not be used for tasks that +/// require high-resolution timers. +/// +/// To run something regularly on a schedule, see [`interval`]. +/// +/// # Cancellation +/// +/// Canceling a sleep instance is done by dropping the returned future. No additional +/// cleanup work is required. +/// +/// # Examples +/// +/// Wait 100ms and print "100 ms have elapsed". +/// +/// ``` +/// use tokio::time::{sleep, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// sleep(Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// } +/// ``` +/// +/// [`interval`]: crate::time::interval() +pub fn sleep(duration: Duration) -> Sleep { + sleep_until(Instant::now() + duration) +} + +/// Future returned by [`sleep`](sleep) and +/// [`sleep_until`](sleep_until). +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Sleep { + /// The link between the `Sleep` instance and the timer that drives it. + /// + /// This also stores the `deadline` value. + entry: Arc, +} + +impl Sleep { + pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Sleep { + let handle = Handle::current(); + let entry = Entry::new(&handle, deadline, duration); + + Sleep { entry } + } + + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.entry.time_ref().deadline + } + + /// Returns `true` if `Sleep` has elapsed. + /// + /// A `Sleep` instance is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.entry.is_elapsed() + } + + /// Resets the `Sleep` instance to a new deadline. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state. + /// + /// This function can be called both before and after the future has + /// completed. + pub fn reset(&mut self, deadline: Instant) { + unsafe { + self.entry.time_mut().deadline = deadline; + } + + Entry::reset(&mut self.entry); + } + + fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + self.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) + } +} + +impl Future for Sleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathological case where far too many + // sleep instances have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. + match ready!(self.poll_elapsed(cx)) { + Ok(()) => Poll::Ready(()), + Err(e) => panic!("timer error: {}", e), + } + } +} + +impl Drop for Sleep { + fn drop(&mut self) { + Entry::cancel(&self.entry); + } +} diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs index a043d65e..fae67da9 100644 --- a/tokio/src/time/tests/mod.rs +++ b/tokio/src/time/tests/mod.rs @@ -1,4 +1,4 @@ -mod test_delay; +mod test_sleep; use crate::time::{self, Instant}; use std::time::Duration; @@ -8,15 +8,15 @@ fn assert_sync() {} #[test] fn registration_is_send_and_sync() { - use crate::time::delay::Delay; + use crate::time::sleep::Sleep; - assert_send::(); - assert_sync::(); + assert_send::(); + assert_sync::(); } #[test] #[should_panic] -fn delay_is_eager() { +fn sleep_is_eager() { let when = Instant::now() + Duration::from_millis(100); let _ = time::sleep_until(when); } diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs deleted file mode 100644 index b732e458..00000000 --- a/tokio/src/time/tests/test_delay.rs +++ /dev/null @@ -1,449 +0,0 @@ -use crate::park::{Park, Unpark}; -use crate::time::driver::{Driver, Entry, Handle}; -use crate::time::Clock; -use crate::time::{Duration, Instant}; - -use tokio_test::task; -use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; - -use std::sync::Arc; - -macro_rules! poll { - ($e:expr) => { - $e.enter(|cx, e| e.poll_elapsed(cx)) - }; -} - -#[test] -fn frozen_utility_returns_correct_advanced_duration() { - let clock = Clock::new(); - clock.pause(); - let start = clock.now(); - - clock.advance(ms(10)); - assert_eq!(clock.now() - start, ms(10)); -} - -#[test] -fn immediate_delay() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - let when = clock.now(); - let mut e = task::spawn(delay_until(&handle, when)); - - assert_ready_ok!(poll!(e)); - - assert_ok!(driver.park_timeout(Duration::from_millis(1000))); - - // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.now() - start, ms(1000)); -} - -#[test] -fn delayed_delay_level_0() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - for &i in &[1, 10, 60] { - // Create a `Delay` that elapses in the future - let mut e = task::spawn(delay_until(&handle, start + ms(i))); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(i)); - - assert_ready_ok!(poll!(e)); - } -} - -#[test] -fn sub_ms_delayed_delay() { - let (mut driver, clock, handle) = setup(); - - for _ in 0..5 { - let deadline = clock.now() + ms(1) + Duration::new(0, 1); - - let mut e = task::spawn(delay_until(&handle, deadline)); - - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_ready_ok!(poll!(e)); - - assert!(clock.now() >= deadline); - - clock.advance(Duration::new(0, 1)); - } -} - -#[test] -fn delayed_delay_wrapping_level_0() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - assert_ok!(driver.park_timeout(ms(5))); - assert_eq!(clock.now() - start, ms(5)); - - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(60))); - - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(64)); - assert_pending!(poll!(e)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(65)); - - assert_ready_ok!(poll!(e)); -} - -#[test] -fn timer_wrapping_with_higher_levels() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Set delay to hit level 1 - let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(64))); - assert_pending!(poll!(e1)); - - // Turn a bit - assert_ok!(driver.park_timeout(ms(5))); - - // Set timeout such that it will hit level 0, but wrap - let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(60))); - assert_pending!(poll!(e2)); - - // This should result in s1 firing - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(64)); - - assert_ready_ok!(poll!(e1)); - assert_pending!(poll!(e2)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(65)); - - assert_ready_ok!(poll!(e1)); -} - -#[test] -fn delay_with_deadline_in_past() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create `Delay` that elapsed immediately. - let mut e = task::spawn(delay_until(&handle, clock.now() - ms(100))); - - // Even though the delay expires in the past, it is not ready yet - // because the timer must observe it. - assert_ready_ok!(poll!(e)); - - // Turn the timer, it runs for the elapsed time - assert_ok!(driver.park_timeout(ms(1000))); - - // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.now() - start, ms(1000)); -} - -#[test] -fn delayed_delay_level_1() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Delay` that elapses in the future - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234))); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer, this will wake up to cascade the timer down. - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(192)); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer again - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(234)); - - // The delay has elapsed. - assert_ready_ok!(poll!(e)); - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Delay` that elapses in the future - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234))); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer with a smaller timeout than the cascade. - assert_ok!(driver.park_timeout(ms(100))); - assert_eq!(clock.now() - start, ms(100)); - - assert_pending!(poll!(e)); - - // Turn the timer, this will wake up to cascade the timer down. - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(192)); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer again - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(234)); - - // The delay has elapsed. - assert_ready_ok!(poll!(e)); -} - -#[test] -fn concurrently_set_two_timers_second_one_shorter() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(500))); - let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(200))); - - // The delay has not elapsed - assert_pending!(poll!(e1)); - assert_pending!(poll!(e2)); - - // Delay until a cascade - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(192)); - - // Delay until the second timer. - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(200)); - - // The shorter delay fires - assert_ready_ok!(poll!(e2)); - assert_pending!(poll!(e1)); - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(448)); - - assert_pending!(poll!(e1)); - - // Turn again, this time the time will advance to the second delay - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(500)); - - assert_ready_ok!(poll!(e1)); -} - -#[test] -fn short_delay() { - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Delay` that elapses in the future - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1))); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - // Turn the timer, but not enough time will go by. - assert_ok!(driver.park()); - - // The delay has elapsed. - assert_ready_ok!(poll!(e)); - - // The time has advanced to the point of the delay elapsing. - assert_eq!(clock.now() - start, ms(1)); -} - -#[test] -fn sorta_long_delay_until() { - const MIN_5: u64 = 5 * 60 * 1000; - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Delay` that elapses in the future - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MIN_5))); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64]; - - for &elapsed in cascades { - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(elapsed)); - - assert_pending!(poll!(e)); - } - - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(MIN_5)); - - // The delay has elapsed. - assert_ready_ok!(poll!(e)); -} - -#[test] -fn very_long_delay() { - const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - // Create a `Delay` that elapses in the future - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MO_5))); - - // The delay has not elapsed. - assert_pending!(poll!(e)); - - let cascades = &[ - 12_884_901_888, - 12_952_010_752, - 12_959_875_072, - 12_959_997_952, - ]; - - for &elapsed in cascades { - assert_ok!(driver.park()); - assert_eq!(clock.now() - start, ms(elapsed)); - - assert_pending!(poll!(e)); - } - - // Turn the timer, but not enough time will go by. - assert_ok!(driver.park()); - - // The time has advanced to the point of the delay elapsing. - assert_eq!(clock.now() - start, ms(MO_5)); - - // The delay has elapsed. - assert_ready_ok!(poll!(e)); -} - -#[test] -fn unpark_is_delayed() { - // A special park that will take much longer than the requested duration - struct MockPark(Clock); - - struct MockUnpark; - - impl Park for MockPark { - type Unpark = MockUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MockUnpark - } - - fn park(&mut self) -> Result<(), Self::Error> { - panic!("parking forever"); - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - assert_eq!(duration, ms(0)); - self.0.advance(ms(436)); - Ok(()) - } - - fn shutdown(&mut self) {} - } - - impl Unpark for MockUnpark { - fn unpark(&self) {} - } - - let clock = Clock::new(); - clock.pause(); - let start = clock.now(); - let mut driver = Driver::new(MockPark(clock.clone()), clock.clone()); - let handle = driver.handle(); - - let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(100))); - let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(101))); - let mut e3 = task::spawn(delay_until(&handle, clock.now() + ms(200))); - - assert_pending!(poll!(e1)); - assert_pending!(poll!(e2)); - assert_pending!(poll!(e3)); - - assert_ok!(driver.park()); - - assert_eq!(clock.now() - start, ms(500)); - - assert_ready_ok!(poll!(e1)); - assert_ready_ok!(poll!(e2)); - assert_ready_ok!(poll!(e3)); -} - -#[test] -fn set_timeout_at_deadline_greater_than_max_timer() { - const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; - const YR_5: u64 = 5 * YR_1; - - let (mut driver, clock, handle) = setup(); - let start = clock.now(); - - for _ in 0..5 { - assert_ok!(driver.park_timeout(ms(YR_1))); - } - - let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1))); - assert_pending!(poll!(e)); - - assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.now() - start, ms(YR_5) + ms(1)); - - assert_ready_ok!(poll!(e)); -} - -fn setup() -> (Driver, Clock, Handle) { - let clock = Clock::new(); - clock.pause(); - let driver = Driver::new(MockPark(clock.clone()), clock.clone()); - let handle = driver.handle(); - - (driver, clock, handle) -} - -fn delay_until(handle: &Handle, when: Instant) -> Arc { - Entry::new(&handle, when, ms(0)) -} - -struct MockPark(Clock); - -struct MockUnpark; - -impl Park for MockPark { - type Unpark = MockUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MockUnpark - } - - fn park(&mut self) -> Result<(), Self::Error> { - panic!("parking forever"); - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.0.advance(duration); - Ok(()) - } - - fn shutdown(&mut self) {} -} - -impl Unpark for MockUnpark { - fn unpark(&self) {} -} - -fn ms(n: u64) -> Duration { - Duration::from_millis(n) -} diff --git a/tokio/src/time/tests/test_sleep.rs b/tokio/src/time/tests/test_sleep.rs new file mode 100644 index 00000000..c8d931a8 --- /dev/null +++ b/tokio/src/time/tests/test_sleep.rs @@ -0,0 +1,449 @@ +use crate::park::{Park, Unpark}; +use crate::time::driver::{Driver, Entry, Handle}; +use crate::time::Clock; +use crate::time::{Duration, Instant}; + +use tokio_test::task; +use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; + +use std::sync::Arc; + +macro_rules! poll { + ($e:expr) => { + $e.enter(|cx, e| e.poll_elapsed(cx)) + }; +} + +#[test] +fn frozen_utility_returns_correct_advanced_duration() { + let clock = Clock::new(); + clock.pause(); + let start = clock.now(); + + clock.advance(ms(10)); + assert_eq!(clock.now() - start, ms(10)); +} + +#[test] +fn immediate_sleep() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + let when = clock.now(); + let mut e = task::spawn(sleep_until(&handle, when)); + + assert_ready_ok!(poll!(e)); + + assert_ok!(driver.park_timeout(Duration::from_millis(1000))); + + // The time has not advanced. The `turn` completed immediately. + assert_eq!(clock.now() - start, ms(1000)); +} + +#[test] +fn delayed_sleep_level_0() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + for &i in &[1, 10, 60] { + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, start + ms(i))); + + // The sleep instance has not elapsed. + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(i)); + + assert_ready_ok!(poll!(e)); + } +} + +#[test] +fn sub_ms_delayed_sleep() { + let (mut driver, clock, handle) = setup(); + + for _ in 0..5 { + let deadline = clock.now() + ms(1) + Duration::new(0, 1); + + let mut e = task::spawn(sleep_until(&handle, deadline)); + + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_ready_ok!(poll!(e)); + + assert!(clock.now() >= deadline); + + clock.advance(Duration::new(0, 1)); + } +} + +#[test] +fn delayed_sleep_wrapping_level_0() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + assert_ok!(driver.park_timeout(ms(5))); + assert_eq!(clock.now() - start, ms(5)); + + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(60))); + + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(64)); + assert_pending!(poll!(e)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(65)); + + assert_ready_ok!(poll!(e)); +} + +#[test] +fn timer_wrapping_with_higher_levels() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Set sleep to hit level 1 + let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(64))); + assert_pending!(poll!(e1)); + + // Turn a bit + assert_ok!(driver.park_timeout(ms(5))); + + // Set timeout such that it will hit level 0, but wrap + let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(60))); + assert_pending!(poll!(e2)); + + // This should result in s1 firing + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(64)); + + assert_ready_ok!(poll!(e1)); + assert_pending!(poll!(e2)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(65)); + + assert_ready_ok!(poll!(e1)); +} + +#[test] +fn sleep_with_deadline_in_past() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create `Sleep` that elapsed immediately. + let mut e = task::spawn(sleep_until(&handle, clock.now() - ms(100))); + + // Even though the `Sleep` expires in the past, it is not ready yet + // because the timer must observe it. + assert_ready_ok!(poll!(e)); + + // Turn the timer, it runs for the elapsed time + assert_ok!(driver.park_timeout(ms(1000))); + + // The time has not advanced. The `turn` completed immediately. + assert_eq!(clock.now() - start, ms(1000)); +} + +#[test] +fn delayed_sleep_level_1() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer, this will wake up to cascade the timer down. + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(192)); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer again + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(234)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(234))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer with a smaller timeout than the cascade. + assert_ok!(driver.park_timeout(ms(100))); + assert_eq!(clock.now() - start, ms(100)); + + assert_pending!(poll!(e)); + + // Turn the timer, this will wake up to cascade the timer down. + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(192)); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer again + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(234)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); +} + +#[test] +fn concurrently_set_two_timers_second_one_shorter() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(500))); + let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(200))); + + // The sleep has not elapsed + assert_pending!(poll!(e1)); + assert_pending!(poll!(e2)); + + // Sleep until a cascade + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(192)); + + // Sleep until the second timer. + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(200)); + + // The shorter sleep fires + assert_ready_ok!(poll!(e2)); + assert_pending!(poll!(e1)); + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(448)); + + assert_pending!(poll!(e1)); + + // Turn again, this time the time will advance to the second sleep + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(500)); + + assert_ready_ok!(poll!(e1)); +} + +#[test] +fn short_sleep() { + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + // Turn the timer, but not enough time will go by. + assert_ok!(driver.park()); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); + + // The time has advanced to the point of the sleep elapsing. + assert_eq!(clock.now() - start, ms(1)); +} + +#[test] +fn sorta_long_sleep_until() { + const MIN_5: u64 = 5 * 60 * 1000; + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MIN_5))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64]; + + for &elapsed in cascades { + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(elapsed)); + + assert_pending!(poll!(e)); + } + + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(MIN_5)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); +} + +#[test] +fn very_long_sleep() { + const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + // Create a `Sleep` that elapses in the future + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(MO_5))); + + // The sleep has not elapsed. + assert_pending!(poll!(e)); + + let cascades = &[ + 12_884_901_888, + 12_952_010_752, + 12_959_875_072, + 12_959_997_952, + ]; + + for &elapsed in cascades { + assert_ok!(driver.park()); + assert_eq!(clock.now() - start, ms(elapsed)); + + assert_pending!(poll!(e)); + } + + // Turn the timer, but not enough time will go by. + assert_ok!(driver.park()); + + // The time has advanced to the point of the sleep elapsing. + assert_eq!(clock.now() - start, ms(MO_5)); + + // The sleep has elapsed. + assert_ready_ok!(poll!(e)); +} + +#[test] +fn unpark_is_delayed() { + // A special park that will take much longer than the requested duration + struct MockPark(Clock); + + struct MockUnpark; + + impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + MockUnpark + } + + fn park(&mut self) -> Result<(), Self::Error> { + panic!("parking forever"); + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + assert_eq!(duration, ms(0)); + self.0.advance(ms(436)); + Ok(()) + } + + fn shutdown(&mut self) {} + } + + impl Unpark for MockUnpark { + fn unpark(&self) {} + } + + let clock = Clock::new(); + clock.pause(); + let start = clock.now(); + let mut driver = Driver::new(MockPark(clock.clone()), clock.clone()); + let handle = driver.handle(); + + let mut e1 = task::spawn(sleep_until(&handle, clock.now() + ms(100))); + let mut e2 = task::spawn(sleep_until(&handle, clock.now() + ms(101))); + let mut e3 = task::spawn(sleep_until(&handle, clock.now() + ms(200))); + + assert_pending!(poll!(e1)); + assert_pending!(poll!(e2)); + assert_pending!(poll!(e3)); + + assert_ok!(driver.park()); + + assert_eq!(clock.now() - start, ms(500)); + + assert_ready_ok!(poll!(e1)); + assert_ready_ok!(poll!(e2)); + assert_ready_ok!(poll!(e3)); +} + +#[test] +fn set_timeout_at_deadline_greater_than_max_timer() { + const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; + const YR_5: u64 = 5 * YR_1; + + let (mut driver, clock, handle) = setup(); + let start = clock.now(); + + for _ in 0..5 { + assert_ok!(driver.park_timeout(ms(YR_1))); + } + + let mut e = task::spawn(sleep_until(&handle, clock.now() + ms(1))); + assert_pending!(poll!(e)); + + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.now() - start, ms(YR_5) + ms(1)); + + assert_ready_ok!(poll!(e)); +} + +fn setup() -> (Driver, Clock, Handle) { + let clock = Clock::new(); + clock.pause(); + let driver = Driver::new(MockPark(clock.clone()), clock.clone()); + let handle = driver.handle(); + + (driver, clock, handle) +} + +fn sleep_until(handle: &Handle, when: Instant) -> Arc { + Entry::new(&handle, when, ms(0)) +} + +struct MockPark(Clock); + +struct MockUnpark; + +impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + MockUnpark + } + + fn park(&mut self) -> Result<(), Self::Error> { + panic!("parking forever"); + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.0.advance(duration); + Ok(()) + } + + fn shutdown(&mut self) {} +} + +impl Unpark for MockUnpark { + fn unpark(&self) {} +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 0804f265..d35121ac 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -4,7 +4,7 @@ //! //! [`Timeout`]: struct@Timeout -use crate::time::{sleep_until, Delay, Duration, Instant}; +use crate::time::{sleep_until, Duration, Instant, Sleep}; use pin_project_lite::pin_project; use std::fmt; @@ -50,7 +50,7 @@ pub fn timeout(duration: Duration, future: T) -> Timeout where T: Future, { - let delay = Delay::new_timeout(Instant::now() + duration, duration); + let delay = Sleep::new_timeout(Instant::now() + duration, duration); Timeout::new_with_delay(future, delay) } @@ -108,7 +108,7 @@ pin_project! { #[pin] value: T, #[pin] - delay: Delay, + delay: Sleep, } } @@ -125,7 +125,7 @@ impl Elapsed { } impl Timeout { - pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout { + pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout { Timeout { value, delay } } diff --git a/tokio/src/time/wheel/mod.rs b/tokio/src/time/wheel/mod.rs index 03861240..18559dfd 100644 --- a/tokio/src/time/wheel/mod.rs +++ b/tokio/src/time/wheel/mod.rs @@ -47,7 +47,7 @@ pub(crate) struct Wheel { /// precision of 1 millisecond. const NUM_LEVELS: usize = 6; -/// The maximum duration of a delay +/// The maximum duration of a `Sleep` const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; #[derive(Debug)] diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index f77f3f01..cc214bbb 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -359,10 +359,10 @@ async fn join_with_select() { async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; - let mut delay = time::sleep(Duration::from_millis(50)); + let mut sleep = time::sleep(Duration::from_millis(50)); tokio::select! { - _ = &mut delay, if !delay.is_elapsed() => { + _ = &mut sleep, if !sleep.is_elapsed() => { } _ = async { 1 } => { } diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 56ab840d..1273593f 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -430,7 +430,7 @@ rt_test! { } #[test] - fn delay_at_root() { + fn sleep_at_root() { let rt = rt(); let now = Instant::now(); @@ -444,7 +444,7 @@ rt_test! { } #[test] - fn delay_in_spawn() { + fn sleep_in_spawn() { let rt = rt(); let now = Instant::now(); @@ -515,7 +515,7 @@ rt_test! { } #[test] - fn delay_from_blocking() { + fn sleep_from_blocking() { let rt = rt(); rt.block_on(async move { diff --git a/tokio/tests/stream_timeout.rs b/tokio/tests/stream_timeout.rs index 698c1d39..a787bba3 100644 --- a/tokio/tests/stream_timeout.rs +++ b/tokio/tests/stream_timeout.rs @@ -6,7 +6,7 @@ use tokio_test::*; use futures::StreamExt as _; -async fn maybe_delay(idx: i32) -> i32 { +async fn maybe_sleep(idx: i32) -> i32 { if idx % 2 == 0 { sleep(ms(200)).await; } @@ -26,7 +26,7 @@ async fn basic_usage() { // // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] - let stream = stream::iter(1..=4).then(maybe_delay).timeout(ms(100)); + let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); let mut stream = task::spawn(stream); // First item completes immediately @@ -68,7 +68,7 @@ async fn basic_usage() { async fn return_elapsed_errors_only_once() { time::pause(); - let stream = stream::iter(1..=3).then(maybe_delay).timeout(ms(50)); + let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); let mut stream = task::spawn(stream); // First item completes immediately @@ -97,7 +97,7 @@ async fn return_elapsed_errors_only_once() { #[tokio::test] async fn no_timeouts() { let stream = stream::iter(vec![1, 3, 5]) - .then(maybe_delay) + .then(maybe_sleep) .timeout(ms(100)); let mut stream = task::spawn(stream); diff --git a/tokio/tests/time_delay.rs b/tokio/tests/time_delay.rs deleted file mode 100644 index 559c18c8..00000000 --- a/tokio/tests/time_delay.rs +++ /dev/null @@ -1,196 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(feature = "full")] - -use tokio::time::{self, Duration, Instant}; -use tokio_test::{assert_pending, assert_ready, task}; - -macro_rules! assert_elapsed { - ($now:expr, $ms:expr) => {{ - let elapsed = $now.elapsed(); - let lower = ms($ms); - - // Handles ms rounding - assert!( - elapsed >= lower && elapsed <= lower + ms(1), - "actual = {:?}, expected = {:?}", - elapsed, - lower - ); - }}; -} - -#[tokio::test] -async fn immediate_delay() { - time::pause(); - - let now = Instant::now(); - - // Ready! - time::sleep_until(now).await; - assert_elapsed!(now, 0); -} - -#[tokio::test] -async fn delayed_delay_level_0() { - time::pause(); - - for &i in &[1, 10, 60] { - let now = Instant::now(); - - time::sleep_until(now + ms(i)).await; - - assert_elapsed!(now, i); - } -} - -#[tokio::test] -async fn sub_ms_delayed_delay() { - time::pause(); - - for _ in 0..5 { - let now = Instant::now(); - let deadline = now + ms(1) + Duration::new(0, 1); - - time::sleep_until(deadline).await; - - assert_elapsed!(now, 1); - } -} - -#[tokio::test] -async fn delayed_delay_wrapping_level_0() { - time::pause(); - - time::sleep(ms(5)).await; - - let now = Instant::now(); - time::sleep_until(now + ms(60)).await; - - assert_elapsed!(now, 60); -} - -#[tokio::test] -async fn reset_future_delay_before_fire() { - time::pause(); - - let now = Instant::now(); - - let mut delay = task::spawn(time::sleep_until(now + ms(100))); - assert_pending!(delay.poll()); - - let mut delay = delay.into_inner(); - - delay.reset(Instant::now() + ms(200)); - delay.await; - - assert_elapsed!(now, 200); -} - -#[tokio::test] -async fn reset_past_delay_before_turn() { - time::pause(); - - let now = Instant::now(); - - let mut delay = task::spawn(time::sleep_until(now + ms(100))); - assert_pending!(delay.poll()); - - let mut delay = delay.into_inner(); - - delay.reset(now + ms(80)); - delay.await; - - assert_elapsed!(now, 80); -} - -#[tokio::test] -async fn reset_past_delay_before_fire() { - time::pause(); - - let now = Instant::now(); - - let mut delay = task::spawn(time::sleep_until(now + ms(100))); - assert_pending!(delay.poll()); - - let mut delay = delay.into_inner(); - - time::sleep(ms(10)).await; - - delay.reset(now + ms(80)); - delay.await; - - assert_elapsed!(now, 80); -} - -#[tokio::test] -async fn reset_future_delay_after_fire() { - time::pause(); - - let now = Instant::now(); - let mut delay = time::sleep_until(now + ms(100)); - - (&mut delay).await; - assert_elapsed!(now, 100); - - delay.reset(now + ms(110)); - delay.await; - assert_elapsed!(now, 110); -} - -#[tokio::test] -async fn reset_delay_to_past() { - time::pause(); - - let now = Instant::now(); - - let mut delay = task::spawn(time::sleep_until(now + ms(100))); - assert_pending!(delay.poll()); - - time::sleep(ms(50)).await; - - assert!(!delay.is_woken()); - - delay.reset(now + ms(40)); - - assert!(delay.is_woken()); - - assert_ready!(delay.poll()); -} - -#[test] -#[should_panic] -fn creating_delay_outside_of_context() { - let now = Instant::now(); - - // This creates a delay outside of the context of a mock timer. This tests - // that it will panic. - let _fut = time::sleep_until(now + ms(500)); -} - -#[should_panic] -#[tokio::test] -async fn greater_than_max() { - const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000; - - time::sleep_until(Instant::now() + ms(YR_5)).await; -} - -const NUM_LEVELS: usize = 6; -const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; - -#[should_panic] -#[tokio::test] -async fn exactly_max() { - // TODO: this should not panic but `time::ms()` is acting up - time::sleep(ms(MAX_DURATION)).await; -} - -#[tokio::test] -async fn no_out_of_bounds_close_to_max() { - time::pause(); - time::sleep(ms(MAX_DURATION - 1)).await; -} - -fn ms(n: u64) -> Duration { - Duration::from_millis(n) -} diff --git a/tokio/tests/time_sleep.rs b/tokio/tests/time_sleep.rs new file mode 100644 index 00000000..87d69dee --- /dev/null +++ b/tokio/tests/time_sleep.rs @@ -0,0 +1,196 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, Duration, Instant}; +use tokio_test::{assert_pending, assert_ready, task}; + +macro_rules! assert_elapsed { + ($now:expr, $ms:expr) => {{ + let elapsed = $now.elapsed(); + let lower = ms($ms); + + // Handles ms rounding + assert!( + elapsed >= lower && elapsed <= lower + ms(1), + "actual = {:?}, expected = {:?}", + elapsed, + lower + ); + }}; +} + +#[tokio::test] +async fn immediate_sleep() { + time::pause(); + + let now = Instant::now(); + + // Ready! + time::sleep_until(now).await; + assert_elapsed!(now, 0); +} + +#[tokio::test] +async fn delayed_sleep_level_0() { + time::pause(); + + for &i in &[1, 10, 60] { + let now = Instant::now(); + + time::sleep_until(now + ms(i)).await; + + assert_elapsed!(now, i); + } +} + +#[tokio::test] +async fn sub_ms_delayed_sleep() { + time::pause(); + + for _ in 0..5 { + let now = Instant::now(); + let deadline = now + ms(1) + Duration::new(0, 1); + + time::sleep_until(deadline).await; + + assert_elapsed!(now, 1); + } +} + +#[tokio::test] +async fn delayed_sleep_wrapping_level_0() { + time::pause(); + + time::sleep(ms(5)).await; + + let now = Instant::now(); + time::sleep_until(now + ms(60)).await; + + assert_elapsed!(now, 60); +} + +#[tokio::test] +async fn reset_future_sleep_before_fire() { + time::pause(); + + let now = Instant::now(); + + let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + assert_pending!(sleep.poll()); + + let mut sleep = sleep.into_inner(); + + sleep.reset(Instant::now() + ms(200)); + sleep.await; + + assert_elapsed!(now, 200); +} + +#[tokio::test] +async fn reset_past_sleep_before_turn() { + time::pause(); + + let now = Instant::now();