diff options
author | bdonlan <bdonlan@gmail.com> | 2020-11-23 10:42:50 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-23 10:42:50 -0800 |
commit | ae67851f11b7cc1f577de8ce21767ce3e2c7aff9 (patch) | |
tree | be43cb76333b0e9e42a101d659f9b2e41555d779 /tokio-util | |
parent | f927f01a34d7cedf0cdc820f729a7a6cd56e83dd (diff) |
time: use intrusive lists for timer tracking (#3080)
More-or-less a half-rewrite of the current time driver, supporting the
use of intrusive futures for timer registration.
Fixes: #3028, #3069
Diffstat (limited to 'tokio-util')
-rw-r--r-- | tokio-util/src/time/delay_queue.rs | 22 | ||||
-rw-r--r-- | tokio-util/tests/time_delay_queue.rs | 24 |
2 files changed, 36 insertions, 10 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index 000b4423..4edd5cd6 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -14,7 +14,7 @@ use std::cmp; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; -use std::task::{self, Poll}; +use std::task::{self, Poll, Waker}; /// A queue of delayed elements. /// @@ -145,6 +145,11 @@ pub struct DelayQueue<T> { /// Instant at which the timer starts start: Instant, + + /// Waker that is invoked when we potentially need to reset the timer. + /// Because we lazily create the timer when the first entry is created, we + /// need to awaken any poller that polled us before that point. + waker: Option<Waker>, } /// An entry in `DelayQueue` that has expired and removed. @@ -253,6 +258,7 @@ impl<T> DelayQueue<T> { delay: None, wheel_now: 0, start: Instant::now(), + waker: None, } } @@ -330,6 +336,10 @@ impl<T> DelayQueue<T> { }; if should_set_delay { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + let delay_time = self.start + Duration::from_millis(when); if let Some(ref mut delay) = &mut self.delay { delay.reset(delay_time); @@ -348,6 +358,15 @@ impl<T> DelayQueue<T> { &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Expired<T>, Error>>> { + if !self + .waker + .as_ref() + .map(|w| w.will_wake(cx.waker())) + .unwrap_or(false) + { + self.waker = Some(cx.waker().clone()); + } + let item = ready!(self.poll_idx(cx)); Poll::Ready(item.map(|result| { result.map(|idx| { @@ -533,6 +552,7 @@ impl<T> DelayQueue<T> { let next_deadline = self.next_deadline(); if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { + // This should awaken us if necessary (ie, if already expired) delay.reset(deadline); } } diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index 42a56b8b..d42dca87 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -2,7 +2,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, sleep, Duration, Instant}; +use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_ok, assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; @@ -107,9 +107,10 @@ async fn multi_delay_at_start() { assert_pending!(poll!(queue)); assert!(!queue.is_woken()); + let start = Instant::now(); for elapsed in 0..1200 { - sleep(ms(1)).await; let elapsed = elapsed + 1; + tokio::time::sleep_until(start + ms(elapsed)).await; if delays.contains(&elapsed) { assert!(queue.is_woken()); @@ -117,7 +118,12 @@ async fn multi_delay_at_start() { assert_pending!(poll!(queue)); } else if queue.is_woken() { let cascade = &[192, 960]; - assert!(cascade.contains(&elapsed), "elapsed={}", elapsed); + assert!( + cascade.contains(&elapsed), + "elapsed={} dt={:?}", + elapsed, + Instant::now() - start + ); assert_pending!(poll!(queue)); } @@ -205,7 +211,7 @@ async fn reset_much_later() { sleep(ms(3)).await; - queue.reset_at(&key, now + ms(5)); + queue.reset_at(&key, now + ms(10)); sleep(ms(20)).await; @@ -402,7 +408,7 @@ async fn insert_before_first_after_poll() { sleep(ms(99)).await; - assert!(!queue.is_woken()); + assert_pending!(poll!(queue)); sleep(ms(1)).await; @@ -457,7 +463,7 @@ async fn reset_later_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(80)).await; + sleep_until(now + Duration::from_millis(80)).await; assert!(!queue.is_woken()); @@ -472,7 +478,7 @@ async fn reset_later_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(39)).await; + sleep_until(now + Duration::from_millis(119)).await; assert!(!queue.is_woken()); sleep(ms(1)).await; @@ -515,7 +521,7 @@ async fn reset_earlier_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(80)).await; + sleep_until(now + Duration::from_millis(80)).await; assert!(!queue.is_woken()); @@ -530,7 +536,7 @@ async fn reset_earlier_after_slot_starts() { assert_pending!(poll!(queue)); - sleep(ms(39)).await; + sleep_until(now + Duration::from_millis(119)).await; assert!(!queue.is_woken()); sleep(ms(1)).await; |