summaryrefslogtreecommitdiffstats
path: root/tokio-util
diff options
context:
space:
mode:
authorbdonlan <bdonlan@gmail.com>2020-11-23 10:42:50 -0800
committerGitHub <noreply@github.com>2020-11-23 10:42:50 -0800
commitae67851f11b7cc1f577de8ce21767ce3e2c7aff9 (patch)
treebe43cb76333b0e9e42a101d659f9b2e41555d779 /tokio-util
parentf927f01a34d7cedf0cdc820f729a7a6cd56e83dd (diff)
time: use intrusive lists for timer tracking (#3080)
More-or-less a half-rewrite of the current time driver, supporting the use of intrusive futures for timer registration. Fixes: #3028, #3069
Diffstat (limited to 'tokio-util')
-rw-r--r--tokio-util/src/time/delay_queue.rs22
-rw-r--r--tokio-util/tests/time_delay_queue.rs24
2 files changed, 36 insertions, 10 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs
index 000b4423..4edd5cd6 100644
--- a/tokio-util/src/time/delay_queue.rs
+++ b/tokio-util/src/time/delay_queue.rs
@@ -14,7 +14,7 @@ use std::cmp;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
-use std::task::{self, Poll};
+use std::task::{self, Poll, Waker};
/// A queue of delayed elements.
///
@@ -145,6 +145,11 @@ pub struct DelayQueue<T> {
/// Instant at which the timer starts
start: Instant,
+
+ /// Waker that is invoked when we potentially need to reset the timer.
+ /// Because we lazily create the timer when the first entry is created, we
+ /// need to awaken any poller that polled us before that point.
+ waker: Option<Waker>,
}
/// An entry in `DelayQueue` that has expired and removed.
@@ -253,6 +258,7 @@ impl<T> DelayQueue<T> {
delay: None,
wheel_now: 0,
start: Instant::now(),
+ waker: None,
}
}
@@ -330,6 +336,10 @@ impl<T> DelayQueue<T> {
};
if should_set_delay {
+ if let Some(waker) = self.waker.take() {
+ waker.wake();
+ }
+
let delay_time = self.start + Duration::from_millis(when);
if let Some(ref mut delay) = &mut self.delay {
delay.reset(delay_time);
@@ -348,6 +358,15 @@ impl<T> DelayQueue<T> {
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Expired<T>, Error>>> {
+ if !self
+ .waker
+ .as_ref()
+ .map(|w| w.will_wake(cx.waker()))
+ .unwrap_or(false)
+ {
+ self.waker = Some(cx.waker().clone());
+ }
+
let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|idx| {
@@ -533,6 +552,7 @@ impl<T> DelayQueue<T> {
let next_deadline = self.next_deadline();
if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
+ // This should awaken us if necessary (ie, if already expired)
delay.reset(deadline);
}
}
diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs
index 42a56b8b..d42dca87 100644
--- a/tokio-util/tests/time_delay_queue.rs
+++ b/tokio-util/tests/time_delay_queue.rs
@@ -2,7 +2,7 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::time::{self, sleep, Duration, Instant};
+use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;
@@ -107,9 +107,10 @@ async fn multi_delay_at_start() {
assert_pending!(poll!(queue));
assert!(!queue.is_woken());
+ let start = Instant::now();
for elapsed in 0..1200 {
- sleep(ms(1)).await;
let elapsed = elapsed + 1;
+ tokio::time::sleep_until(start + ms(elapsed)).await;
if delays.contains(&elapsed) {
assert!(queue.is_woken());
@@ -117,7 +118,12 @@ async fn multi_delay_at_start() {
assert_pending!(poll!(queue));
} else if queue.is_woken() {
let cascade = &[192, 960];
- assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
+ assert!(
+ cascade.contains(&elapsed),
+ "elapsed={} dt={:?}",
+ elapsed,
+ Instant::now() - start
+ );
assert_pending!(poll!(queue));
}
@@ -205,7 +211,7 @@ async fn reset_much_later() {
sleep(ms(3)).await;
- queue.reset_at(&key, now + ms(5));
+ queue.reset_at(&key, now + ms(10));
sleep(ms(20)).await;
@@ -402,7 +408,7 @@ async fn insert_before_first_after_poll() {
sleep(ms(99)).await;
- assert!(!queue.is_woken());
+ assert_pending!(poll!(queue));
sleep(ms(1)).await;
@@ -457,7 +463,7 @@ async fn reset_later_after_slot_starts() {
assert_pending!(poll!(queue));
- sleep(ms(80)).await;
+ sleep_until(now + Duration::from_millis(80)).await;
assert!(!queue.is_woken());
@@ -472,7 +478,7 @@ async fn reset_later_after_slot_starts() {
assert_pending!(poll!(queue));
- sleep(ms(39)).await;
+ sleep_until(now + Duration::from_millis(119)).await;
assert!(!queue.is_woken());
sleep(ms(1)).await;
@@ -515,7 +521,7 @@ async fn reset_earlier_after_slot_starts() {
assert_pending!(poll!(queue));
- sleep(ms(80)).await;
+ sleep_until(now + Duration::from_millis(80)).await;
assert!(!queue.is_woken());
@@ -530,7 +536,7 @@ async fn reset_earlier_after_slot_starts() {
assert_pending!(poll!(queue));
- sleep(ms(39)).await;
+ sleep_until(now + Duration::from_millis(119)).await;
assert!(!queue.is_woken());
sleep(ms(1)).await;