From ae67851f11b7cc1f577de8ce21767ce3e2c7aff9 Mon Sep 17 00:00:00 2001 From: bdonlan Date: Mon, 23 Nov 2020 10:42:50 -0800 Subject: time: use intrusive lists for timer tracking (#3080) More-or-less a half-rewrite of the current time driver, supporting the use of intrusive futures for timer registration. Fixes: #3028, #3069 --- tokio/tests/macros_select.rs | 8 ++- tokio/tests/stream_timeout.rs | 2 +- tokio/tests/sync_mutex.rs | 7 +- tokio/tests/sync_mutex_owned.rs | 7 +- tokio/tests/time_interval.rs | 3 +- tokio/tests/time_rt.rs | 2 +- tokio/tests/time_sleep.rs | 145 +++++++++++++++++++++++++++++++++++----- 7 files changed, 146 insertions(+), 28 deletions(-) (limited to 'tokio/tests') diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index cc214bbb..3359849d 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -359,12 +359,14 @@ async fn join_with_select() { async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; - let mut sleep = time::sleep(Duration::from_millis(50)); + let sleep = time::sleep(Duration::from_millis(50)); + tokio::pin!(sleep); tokio::select! { - _ = &mut sleep, if !sleep.is_elapsed() => { + _ = time::sleep(Duration::from_millis(50)), if false => { + panic!("if condition ignored") } - _ = async { 1 } => { + _ = async { 1u32 } => { } } } diff --git a/tokio/tests/stream_timeout.rs b/tokio/tests/stream_timeout.rs index a787bba3..216b5f75 100644 --- a/tokio/tests/stream_timeout.rs +++ b/tokio/tests/stream_timeout.rs @@ -78,7 +78,7 @@ async fn return_elapsed_errors_only_once() { // error is returned. assert_pending!(stream.poll_next()); // - time::advance(ms(50)).await; + time::advance(ms(51)).await; let v = assert_ready!(stream.poll_next()); assert!(v.unwrap().is_err()); // timeout! diff --git a/tokio/tests/sync_mutex.rs b/tokio/tests/sync_mutex.rs index 96194b31..0ddb203d 100644 --- a/tokio/tests/sync_mutex.rs +++ b/tokio/tests/sync_mutex.rs @@ -91,10 +91,11 @@ async fn aborted_future_1() { let m2 = m1.clone(); // Try to lock mutex in a future that is aborted prematurely timeout(Duration::from_millis(1u64), async move { - let mut iv = interval(Duration::from_millis(1000)); + let iv = interval(Duration::from_millis(1000)); + tokio::pin!(iv); m2.lock().await; - iv.tick().await; - iv.tick().await; + iv.as_mut().tick().await; + iv.as_mut().tick().await; }) .await .unwrap_err(); diff --git a/tokio/tests/sync_mutex_owned.rs b/tokio/tests/sync_mutex_owned.rs index 394a6708..0f1399c4 100644 --- a/tokio/tests/sync_mutex_owned.rs +++ b/tokio/tests/sync_mutex_owned.rs @@ -58,10 +58,11 @@ async fn aborted_future_1() { let m2 = m1.clone(); // Try to lock mutex in a future that is aborted prematurely timeout(Duration::from_millis(1u64), async move { - let mut iv = interval(Duration::from_millis(1000)); + let iv = interval(Duration::from_millis(1000)); + tokio::pin!(iv); m2.lock_owned().await; - iv.tick().await; - iv.tick().await; + iv.as_mut().tick().await; + iv.as_mut().tick().await; }) .await .unwrap_err(); diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index 5ac6ae69..a0787157 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -49,7 +49,8 @@ async fn usage_stream() { use tokio::stream::StreamExt; let start = Instant::now(); - let mut interval = time::interval(ms(10)); + let interval = time::interval(ms(10)); + tokio::pin!(interval); for _ in 0..3 { interval.next().await.unwrap(); diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs index 85db78db..07753435 100644 --- a/tokio/tests/time_rt.rs +++ b/tokio/tests/time_rt.rs @@ -68,7 +68,7 @@ async fn starving() { } let when = Instant::now() + Duration::from_millis(20); - let starve = Starve(sleep_until(when), 0); + let starve = Starve(Box::pin(sleep_until(when)), 0); starve.await; assert!(Instant::now() >= when); diff --git a/tokio/tests/time_sleep.rs b/tokio/tests/time_sleep.rs index 955d833b..d110ec27 100644 --- a/tokio/tests/time_sleep.rs +++ b/tokio/tests/time_sleep.rs @@ -1,6 +1,11 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::future::Future; +use std::task::Context; + +use futures::task::noop_waker_ref; + use tokio::time::{self, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; @@ -30,6 +35,25 @@ async fn immediate_sleep() { assert_elapsed!(now, 0); } +#[tokio::test] +async fn is_elapsed() { + time::pause(); + + let sleep = time::sleep(Duration::from_millis(50)); + + tokio::pin!(sleep); + + assert!(!sleep.is_elapsed()); + + assert!(futures::poll!(sleep.as_mut()).is_pending()); + + assert!(!sleep.is_elapsed()); + + sleep.as_mut().await; + + assert!(sleep.is_elapsed()); +} + #[tokio::test] async fn delayed_sleep_level_0() { time::pause(); @@ -75,12 +99,12 @@ async fn reset_future_sleep_before_fire() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); let mut sleep = sleep.into_inner(); - sleep.reset(Instant::now() + ms(200)); + sleep.as_mut().reset(Instant::now() + ms(200)); sleep.await; assert_elapsed!(now, 200); @@ -92,12 +116,12 @@ async fn reset_past_sleep_before_turn() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); let mut sleep = sleep.into_inner(); - sleep.reset(now + ms(80)); + sleep.as_mut().reset(now + ms(80)); sleep.await; assert_elapsed!(now, 80); @@ -109,14 +133,14 @@ async fn reset_past_sleep_before_fire() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); let mut sleep = sleep.into_inner(); time::sleep(ms(10)).await; - sleep.reset(now + ms(80)); + sleep.as_mut().reset(now + ms(80)); sleep.await; assert_elapsed!(now, 80); @@ -127,12 +151,12 @@ async fn reset_future_sleep_after_fire() { time::pause(); let now = Instant::now(); - let mut sleep = time::sleep_until(now + ms(100)); + let mut sleep = Box::pin(time::sleep_until(now + ms(100))); - (&mut sleep).await; + sleep.as_mut().await; assert_elapsed!(now, 100); - sleep.reset(now + ms(110)); + sleep.as_mut().reset(now + ms(110)); sleep.await; assert_elapsed!(now, 110); } @@ -143,16 +167,17 @@ async fn reset_sleep_to_past() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); time::sleep(ms(50)).await; assert!(!sleep.is_woken()); - sleep.reset(now + ms(40)); + sleep.as_mut().reset(now + ms(40)); - assert!(sleep.is_woken()); + // TODO: is this required? + //assert!(sleep.is_woken()); assert_ready!(sleep.poll()); } @@ -167,22 +192,110 @@ fn creating_sleep_outside_of_context() { let _fut = time::sleep_until(now + ms(500)); } -#[should_panic] #[tokio::test] async fn greater_than_max() { const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000; + time::pause(); time::sleep_until(Instant::now() + ms(YR_5)).await; } +#[tokio::test] +async fn short_sleeps() { + for i in 0..10000 { + if (i % 10) == 0 { + eprintln!("=== {}", i); + } + tokio::time::sleep(std::time::Duration::from_millis(0)).await; + } +} + +#[tokio::test] +async fn multi_long_sleeps() { + tokio::time::pause(); + + for _ in 0..5u32 { + tokio::time::sleep(Duration::from_secs( + // about a year + 365 * 24 * 3600, + )) + .await; + } + + let deadline = tokio::time::Instant::now() + + Duration::from_secs( + // about 10 years + 10 * 365 * 24 * 3600, + ); + + tokio::time::sleep_until(deadline).await; + + assert!(tokio::time::Instant::now() >= deadline); +} + +#[tokio::test] +async fn long_sleeps() { + tokio::time::pause(); + + let deadline = tokio::time::Instant::now() + + Duration::from_secs( + // about 10 years + 10 * 365 * 24 * 3600, + ); + + tokio::time::sleep_until(deadline).await; + + assert!(tokio::time::Instant::now() >= deadline); + assert!(tokio::time::Instant::now() <= deadline + Duration::from_millis(1)); +} + +#[tokio::test] +#[should_panic(expected = "Duration too far into the future")] +async fn very_long_sleeps() { + tokio::time::pause(); + + // Some platforms (eg macos) can't represent times this far in the future + if let Some(deadline) = tokio::time::Instant::now().checked_add(Duration::from_secs(1u64 << 62)) + { + tokio::time::sleep_until(deadline).await; + } else { + // make it pass anyway (we can't skip/ignore the test based on the + // result of checked_add) + panic!("Duration too far into the future (test ignored)") + } +} + +#[tokio::test] +async fn reset_after_firing() { + let timer = tokio::time::sleep(std::time::Duration::from_millis(1)); + tokio::pin!(timer); + + let deadline = timer.deadline(); + + timer.as_mut().await; + assert_ready!(timer + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref()))); + timer + .as_mut() + .reset(tokio::time::Instant::now() + std::time::Duration::from_secs(600)); + + assert_ne!(deadline, timer.deadline()); + + assert_pending!(timer + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref()))); + assert_pending!(timer + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref()))); +} + const NUM_LEVELS: usize = 6; const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; -#[should_panic] #[tokio::test] async fn exactly_max() { - // TODO: this should not panic but `time::ms()` is acting up - // If fixed, make sure to update documentation on `time::sleep` too. + time::pause(); time::sleep(ms(MAX_DURATION)).await; } -- cgit v1.2.3