diff options
Diffstat (limited to 'tokio-util/src/time/delay_queue.rs')
-rw-r--r-- | tokio-util/src/time/delay_queue.rs | 22 |
1 files changed, 21 insertions, 1 deletions
diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index 000b4423..4edd5cd6 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -14,7 +14,7 @@ use std::cmp; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; -use std::task::{self, Poll}; +use std::task::{self, Poll, Waker}; /// A queue of delayed elements. /// @@ -145,6 +145,11 @@ pub struct DelayQueue<T> { /// Instant at which the timer starts start: Instant, + + /// Waker that is invoked when we potentially need to reset the timer. + /// Because we lazily create the timer when the first entry is created, we + /// need to awaken any poller that polled us before that point. + waker: Option<Waker>, } /// An entry in `DelayQueue` that has expired and removed. @@ -253,6 +258,7 @@ impl<T> DelayQueue<T> { delay: None, wheel_now: 0, start: Instant::now(), + waker: None, } } @@ -330,6 +336,10 @@ impl<T> DelayQueue<T> { }; if should_set_delay { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + let delay_time = self.start + Duration::from_millis(when); if let Some(ref mut delay) = &mut self.delay { delay.reset(delay_time); @@ -348,6 +358,15 @@ impl<T> DelayQueue<T> { &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Expired<T>, Error>>> { + if !self + .waker + .as_ref() + .map(|w| w.will_wake(cx.waker())) + .unwrap_or(false) + { + self.waker = Some(cx.waker().clone()); + } + let item = ready!(self.poll_idx(cx)); Poll::Ready(item.map(|result| { result.map(|idx| { @@ -533,6 +552,7 @@ impl<T> DelayQueue<T> { let next_deadline = self.next_deadline(); if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { + // This should awaken us if necessary (ie, if already expired) delay.reset(deadline); } } |