summaryrefslogtreecommitdiffstats
path: root/tokio-util/src/time/delay_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-util/src/time/delay_queue.rs')
-rw-r--r--tokio-util/src/time/delay_queue.rs22
1 files changed, 21 insertions, 1 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs
index 000b4423..4edd5cd6 100644
--- a/tokio-util/src/time/delay_queue.rs
+++ b/tokio-util/src/time/delay_queue.rs
@@ -14,7 +14,7 @@ use std::cmp;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
-use std::task::{self, Poll};
+use std::task::{self, Poll, Waker};
/// A queue of delayed elements.
///
@@ -145,6 +145,11 @@ pub struct DelayQueue<T> {
/// Instant at which the timer starts
start: Instant,
+
+ /// Waker that is invoked when we potentially need to reset the timer.
+ /// Because we lazily create the timer when the first entry is created, we
+ /// need to awaken any poller that polled us before that point.
+ waker: Option<Waker>,
}
/// An entry in `DelayQueue` that has expired and removed.
@@ -253,6 +258,7 @@ impl<T> DelayQueue<T> {
delay: None,
wheel_now: 0,
start: Instant::now(),
+ waker: None,
}
}
@@ -330,6 +336,10 @@ impl<T> DelayQueue<T> {
};
if should_set_delay {
+ if let Some(waker) = self.waker.take() {
+ waker.wake();
+ }
+
let delay_time = self.start + Duration::from_millis(when);
if let Some(ref mut delay) = &mut self.delay {
delay.reset(delay_time);
@@ -348,6 +358,15 @@ impl<T> DelayQueue<T> {
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Expired<T>, Error>>> {
+ if !self
+ .waker
+ .as_ref()
+ .map(|w| w.will_wake(cx.waker()))
+ .unwrap_or(false)
+ {
+ self.waker = Some(cx.waker().clone());
+ }
+
let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|idx| {
@@ -533,6 +552,7 @@ impl<T> DelayQueue<T> {
let next_deadline = self.next_deadline();
if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
+ // This should awaken us if necessary (ie, if already expired)
delay.reset(deadline);
}
}