diff options
Diffstat (limited to 'tokio/src/time/delay_queue.rs')
-rw-r--r-- | tokio/src/time/delay_queue.rs | 23 |
1 files changed, 6 insertions, 17 deletions
diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index 6fa455a6..6a7cc6b3 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -5,9 +5,8 @@ //! [`DelayQueue`]: struct.DelayQueue.html use crate::time::wheel::{self, Wheel}; -use crate::time::{Delay, Duration, Error, Instant}; +use crate::time::{delay_until, Delay, Duration, Error, Instant}; -use futures_core::ready; use slab::Slab; use std::cmp; use std::future::Future; @@ -69,7 +68,7 @@ use std::task::{self, Poll}; /// ```rust,no_run /// use tokio::time::{delay_queue, DelayQueue, Error}; /// -/// use futures_core::ready; +/// use futures::ready; /// use std::collections::HashMap; /// use std::task::{Context, Poll}; /// use std::time::Duration; @@ -103,7 +102,7 @@ use std::task::{self, Poll}; /// } /// /// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { -/// while let Some(res) = ready!(self.expirations.poll_next(cx)) { +/// while let Some(res) = ready!(self.expirations.poll_expired(cx)) { /// let entry = res?; /// self.entries.remove(entry.get_ref()); /// } @@ -320,7 +319,7 @@ impl<T> DelayQueue<T> { }; if should_set_delay { - self.delay = Some(Delay::new(self.start + Duration::from_millis(when))); + self.delay = Some(delay_until(self.start + Duration::from_millis(when))); } Key::new(key) @@ -329,7 +328,7 @@ impl<T> DelayQueue<T> { /// Attempt to pull out the next value of the delay queue, registering the /// current task for wakeup if the value is not yet available, and returning /// None if the queue is exhausted. - pub fn poll_next( + pub fn poll_expired( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Expired<T>, Error>>> { @@ -676,7 +675,7 @@ impl<T> DelayQueue<T> { } if let Some(deadline) = self.next_deadline() { - self.delay = Some(Delay::new(deadline)); + self.delay = Some(delay_until(deadline)); } else { return Poll::Ready(None); } @@ -697,16 +696,6 @@ impl<T> DelayQueue<T> { // We never put `T` in a `Pin`... impl<T> Unpin for DelayQueue<T> {} -impl<T> futures_core::Stream for DelayQueue<T> { - // DelayQueue seems much more specific, where a user may care that it - // has reached capacity, so return those errors instead of panicking. - type Item = Result<Expired<T>, Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { - DelayQueue::poll_next(self.get_mut(), cx) - } -} - impl<T> Default for DelayQueue<T> { fn default() -> DelayQueue<T> { DelayQueue::new() |