diff options
Diffstat (limited to 'tokio-timer/src')
-rw-r--r-- | tokio-timer/src/delay.rs | 26 | ||||
-rw-r--r-- | tokio-timer/src/delay_queue.rs | 56 | ||||
-rw-r--r-- | tokio-timer/src/interval.rs | 15 | ||||
-rw-r--r-- | tokio-timer/src/lib.rs | 19 | ||||
-rw-r--r-- | tokio-timer/src/throttle.rs | 137 | ||||
-rw-r--r-- | tokio-timer/src/timeout.rs | 178 | ||||
-rw-r--r-- | tokio-timer/src/timer/entry.rs | 35 | ||||
-rw-r--r-- | tokio-timer/src/timer/handle.rs | 12 | ||||
-rw-r--r-- | tokio-timer/src/timer/registration.rs | 11 |
9 files changed, 177 insertions, 312 deletions
diff --git a/tokio-timer/src/delay.rs b/tokio-timer/src/delay.rs index 941dde7a..47c4fab3 100644 --- a/tokio-timer/src/delay.rs +++ b/tokio-timer/src/delay.rs @@ -1,7 +1,8 @@ use crate::timer::{HandlePriv, Registration}; -use crate::Error; -use futures::{Future, Poll}; +use std::future::Future; +use std::pin::Pin; use std::time::{Duration, Instant}; +use std::task::{self, Poll}; /// A future that completes at a specified instant in time. /// @@ -72,6 +73,8 @@ impl Delay { self.registration.reset(deadline); } + // Used by `Timeout<Stream>` + #[cfg(feature = "timeout-stream")] pub(crate) fn reset_timeout(&mut self) { self.registration.reset_timeout(); } @@ -84,13 +87,24 @@ impl Delay { } impl Future for Delay { - type Item = (); - type Error = Error; + type Output = (); - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { // Ensure the `Delay` instance is associated with a timer. self.register(); - self.registration.poll_elapsed() + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathlogical case where far too many + // delays have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. + match ready!(self.registration.poll_elapsed(cx)) { + Ok(()) => Poll::Ready(()), + Err(e) => panic!("timer error: {}", e), + } } } diff --git a/tokio-timer/src/delay_queue.rs b/tokio-timer/src/delay_queue.rs index 56ff9e5b..77b7b409 100644 --- a/tokio-timer/src/delay_queue.rs +++ b/tokio-timer/src/delay_queue.rs @@ -8,10 +8,13 @@ use crate::clock::now; use crate::timer::Handle; use crate::wheel::{self, Wheel}; use crate::{Delay, Error}; -use futures::{try_ready, Future, Poll, Stream}; +use futures_core::Stream; use slab::Slab; use std::cmp; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{self, Poll}; use std::time::{Duration, Instant}; /// A queue of delayed elements. @@ -177,7 +180,7 @@ pub struct Key { struct Stack<T> { /// Head of the stack head: Option<usize>, - _p: PhantomData<T>, + _p: PhantomData<fn() -> T>, } #[derive(Debug)] @@ -645,19 +648,19 @@ impl<T> DelayQueue<T> { /// should be returned. /// /// A slot should be returned when the associated deadline has been reached. - fn poll_idx(&mut self) -> Poll<Option<usize>, Error> { + fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> { use self::wheel::Stack; let expired = self.expired.pop(&mut self.slab); if expired.is_some() { - return Ok(expired.into()); + return Poll::Ready(expired.map(Ok)); } loop { if let Some(ref mut delay) = self.delay { if !delay.is_elapsed() { - try_ready!(delay.poll()); + ready!(Pin::new(&mut *delay).poll(cx)); } let now = crate::ms(delay.deadline() - self.start, crate::Round::Down); @@ -668,13 +671,13 @@ impl<T> DelayQueue<T> { self.delay = None; if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) { - return Ok(Some(idx).into()); + return Poll::Ready(Some(Ok(idx))); } if let Some(deadline) = self.next_deadline() { self.delay = Some(self.handle.delay(deadline)); } else { - return Ok(None.into()); + return Poll::Ready(None); } } } @@ -690,24 +693,29 @@ impl<T> DelayQueue<T> { } } -impl<T> Stream for DelayQueue<T> { - type Item = Expired<T>; - type Error = Error; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Error> { - let item = try_ready!(self.poll_idx()).map(|idx| { - let data = self.slab.remove(idx); - debug_assert!(data.next.is_none()); - debug_assert!(data.prev.is_none()); - - Expired { - key: Key::new(idx), - data: data.inner, - deadline: self.start + Duration::from_millis(data.when), - } - }); +// We never put `T` in a `Pin`... +impl<T> Unpin for DelayQueue<T> {} - Ok(item.into()) +impl<T> Stream for DelayQueue<T> { + // DelayQueue seems much more specific, where a user may care that it + // has reached capacity, so return those errors instead of panicking. + type Item = Result<Expired<T>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + let item = ready!(self.poll_idx(cx)); + Poll::Ready(item.map(|result| { + result.map(|idx| { + let data = self.slab.remove(idx); + debug_assert!(data.next.is_none()); + debug_assert!(data.prev.is_none()); + + Expired { + key: Key::new(idx), + data: data.inner, + deadline: self.start + Duration::from_millis(data.when), + } + }) + })) } } diff --git a/tokio-timer/src/interval.rs b/tokio-timer/src/interval.rs index e065bf12..8b18e33d 100644 --- a/tokio-timer/src/interval.rs +++ b/tokio-timer/src/interval.rs @@ -1,6 +1,9 @@ use crate::clock; use crate::Delay; -use futures::{try_ready, Future, Poll, Stream}; +use futures_core::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; use std::time::{Duration, Instant}; /// A stream representing notifications at fixed interval @@ -53,20 +56,20 @@ impl Interval { impl Stream for Interval { type Item = Instant; - type Error = crate::Error; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { // Wait for the delay to be done - let _ = try_ready!(self.delay.poll()); + ready!(Pin::new(&mut self.delay).poll(cx)); // Get the `now` by looking at the `delay` deadline let now = self.delay.deadline(); // The next interval value is `duration` after the one that just // yielded. - self.delay.reset(now + self.duration); + let next = now + self.duration; + self.delay.reset(next); // Return the current instant - Ok(Some(now).into()) + Poll::Ready(Some(now)) } } diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs index 3a960240..083d8d87 100644 --- a/tokio-timer/src/lib.rs +++ b/tokio-timer/src/lib.rs @@ -31,27 +31,36 @@ //! [`Interval`]: struct.Interval.html //! [`Timer`]: timer/struct.Timer.html +macro_rules! ready { + ($e:expr) => ( + match $e { + ::std::task::Poll::Ready(v) => v, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + ) +} + pub mod clock; +#[cfg(feature = "delay-queue")] pub mod delay_queue; +#[cfg(feature = "throttle")] pub mod throttle; pub mod timeout; pub mod timer; mod atomic; -mod deadline; mod delay; mod error; +#[cfg(feature = "interval")] mod interval; mod wheel; -#[deprecated(since = "0.2.6", note = "use Timeout instead")] -#[doc(hidden)] -#[allow(deprecated)] -pub use deadline::{Deadline, DeadlineError}; pub use delay::Delay; +#[cfg(feature = "delay-queue")] #[doc(inline)] pub use delay_queue::DelayQueue; pub use error::Error; +#[cfg(feature = "interval")] pub use interval::Interval; #[doc(inline)] pub use timeout::Timeout; diff --git a/tokio-timer/src/throttle.rs b/tokio-timer/src/throttle.rs index de71bf7f..ab8733fd 100644 --- a/tokio-timer/src/throttle.rs +++ b/tokio-timer/src/throttle.rs @@ -1,11 +1,12 @@ //! Slow down a stream by enforcing a delay between items. -use crate::{clock, Delay, Error}; -use futures::future::Either; -use futures::{try_ready, Async, Future, Poll, Stream}; +use crate::{clock, Delay}; +use futures_core::Stream; use std::{ - error::Error as StdError, - fmt::{Display, Formatter, Result as FmtResult}, + future::Future, + marker::Unpin, + pin::Pin, + task::{self, Poll}, time::Duration, }; @@ -13,26 +14,25 @@ use std::{ #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Throttle<T> { - delay: Option<Delay>, - duration: Duration, + delay: Delay, + /// Set to true when `delay` has returned ready, but `stream` hasn't. + has_delayed: bool, stream: T, } -/// Either the error of the underlying stream, or an error within -/// tokio's timing machinery. -#[derive(Debug)] -pub struct ThrottleError<T>(Either<T, Error>); - impl<T> Throttle<T> { /// Slow down a stream by enforcing a delay between items. pub fn new(stream: T, duration: Duration) -> Self { Self { - delay: None, - duration: duration, + delay: Delay::new_timeout(clock::now() + duration, duration), + has_delayed: false, stream: stream, } } +} +// XXX: are these safe if `T: !Unpin`? +impl<T: Unpin> Throttle<T> { /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &T { @@ -59,107 +59,22 @@ impl<T> Throttle<T> { impl<T: Stream> Stream for Throttle<T> { type Item = T::Item; - type Error = ThrottleError<T::Error>; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - if let Some(ref mut delay) = self.delay { - try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) }); - } - self.delay = None; - let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) }); - - if value.is_some() { - self.delay = Some(Delay::new(clock::now() + self.duration)); - } - - Ok(Async::Ready(value)) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + unsafe { + if !self.has_delayed { + ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx)); + self.as_mut().get_unchecked_mut().has_delayed = true; + } -impl<T> ThrottleError<T> { - /// Creates a new `ThrottleError` from the given stream error. - pub fn from_stream_err(err: T) -> Self { - ThrottleError(Either::A(err)) - } - - /// Creates a new `ThrottleError` from the given tokio timer error. - pub fn from_timer_err(err: Error) -> Self { - ThrottleError(Either::B(err)) - } - - /// Attempts to get the underlying stream error, if it is present. - pub fn get_stream_error(&self) -> Option<&T> { - match self.0 { - Either::A(ref x) => Some(x), - _ => None, - } - } - - /// Attempts to get the underlying timer error, if it is present. - pub fn get_timer_error(&self) -> Option<&Error> { - match self.0 { - Either::B(ref x) => Some(x), - _ => None, - } - } + let value = ready!(self.as_mut().map_unchecked_mut(|me| &mut me.stream).poll_next(cx)); - /// Attempts to extract the underlying stream error, if it is present. - pub fn into_stream_error(self) -> Option<T> { - match self.0 { - Either::A(x) => Some(x), - _ => None, - } - } - - /// Attempts to extract the underlying timer error, if it is present. - pub fn into_timer_error(self) -> Option<Error> { - match self.0 { - Either::B(x) => Some(x), - _ => None, - } - } - - /// Returns whether the throttle error has occured because of an error - /// in the underlying stream. - pub fn is_stream_error(&self) -> bool { - !self.is_timer_error() - } - - /// Returns whether the throttle error has occured because of an error - /// in tokio's timer system. - pub fn is_timer_error(&self) -> bool { - match self.0 { - Either::A(_) => false, - Either::B(_) => true, - } - } -} - -impl<T: StdError> Display for ThrottleError<T> { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - match self.0 { - Either::A(ref err) => write!(f, "stream error: {}", err), - Either::B(ref err) => write!(f, "timer error: {}", err), - } - } -} - -impl<T: StdError + 'static> StdError for ThrottleError<T> { - fn description(&self) -> &str { - match self.0 { - Either::A(_) => "stream error", - Either::B(_) => "timer error", - } - } + if value.is_some() { + self.as_mut().get_unchecked_mut().delay.reset_timeout(); + self.as_mut().get_unchecked_mut().has_delayed = false; + } - // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30, - // replace this with Error::source. - #[allow(deprecated)] - fn cause(&self) -> Option<&dyn StdError> { - match self.0 { - Either::A(ref err) => Some(err), - Either::B(ref err) => Some(err), + Poll::Ready(value) } } } diff --git a/tokio-timer/src/timeout.rs b/tokio-timer/src/timeout.rs index e860d3f0..42c0d9b1 100644 --- a/tokio-timer/src/timeout.rs +++ b/tokio-timer/src/timeout.rs @@ -6,9 +6,12 @@ use crate::clock::now; use crate::Delay; -use futures::{Async, Future, Poll, Stream}; -use std::error; +#[cfg(feature = "timeout-stream")] +use futures_core::Stream; use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; use std::time::{Duration, Instant}; /// Allows a `Future` or `Stream` to execute for a limited amount of time. @@ -69,22 +72,10 @@ pub struct Timeout<T> { delay: Delay, } -/// Error returned by `Timeout`. -#[derive(Debug)] -pub struct Error<T>(Kind<T>); -/// Timeout error variants +/// Error returned by `Timeout`. #[derive(Debug)] -enum Kind<T> { - /// Inner value returned an error - Inner(T), - - /// The timeout elapsed. - Elapsed, - - /// Timer returned an error. - Timer(crate::Error), -} +pub struct Elapsed(()); impl<T> Timeout<T> { /// Create a new `Timeout` that allows `value` to execute for a duration of @@ -161,141 +152,70 @@ impl<T> Future for Timeout<T> where T: Future, { - type Item = T::Item; - type Error = Error<T::Error>; + type Output = Result<T::Output, Elapsed>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { // First, try polling the future - match self.value.poll() { - Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), - Ok(Async::NotReady) => {} - Err(e) => return Err(Error::inner(e)), + + // Safety: we never move `self.value` + unsafe { + let p = self.as_mut().map_unchecked_mut(|me| &mut me.value); + if let Poll::Ready(v) = p.poll(cx) { + return Poll::Ready(Ok(v)); + } } // Now check the timer - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => Err(Error::elapsed()), - Err(e) => Err(Error::timer(e)), + // Safety: X_X! + unsafe { + match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))), + Poll::Pending => Poll::Pending + } } } } +#[cfg(feature = "timeout-stream")] impl<T> Stream for Timeout<T> where T: Stream, { - type Item = T::Item; - type Error = Error<T::Error>; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - // First, try polling the future - match self.value.poll() { - Ok(Async::Ready(v)) => { + type Item = Result<T::Item, Elapsed>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + // Safety: T might be !Unpin, but we never move neither `value` + // nor `delay`. + // + // ... X_X + unsafe { + // First, try polling the future + let v = self + .as_mut() + .map_unchecked_mut(|me| &mut me.value) + .poll_next(cx); + + if let Poll::Ready(v) = v { if v.is_some() { - self.delay.reset_timeout(); + self.as_mut().get_unchecked_mut().delay.reset_timeout(); } - return Ok(Async::Ready(v)); + return Poll::Ready(v.map(Ok)); } - Ok(Async::NotReady) => {} - Err(e) => return Err(Error::inner(e)), - } - - // Now check the timer - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => { - self.delay.reset_timeout(); - Err(Error::elapsed()) - } - Err(e) => Err(Error::timer(e)), - } - } -} - -// ===== impl Error ===== - -impl<T> Error<T> { - /// Create a new `Error` representing the inner value completing with `Err`. - pub fn inner(err: T) -> Error<T> { - Error(Kind::Inner(err)) - } - - /// Returns `true` if the error was caused by the inner value completing - /// with `Err`. - pub fn is_inner(&self) -> bool { - match self.0 { - Kind::Inner(_) => true, - _ => false, - } - } - - /// Consumes `self`, returning the inner future error. - pub fn into_inner(self) -> Option<T> { - match self.0 { - Kind::Inner(err) => Some(err), - _ => None, - } - } - - /// Create a new `Error` representing the inner value not completing before - /// the deadline is reached. - pub fn elapsed() -> Error<T> { - Error(Kind::Elapsed) - } - - /// Returns `true` if the error was caused by the inner value not completing - /// before the deadline is reached. - pub fn is_elapsed(&self) -> bool { - match self.0 { - Kind::Elapsed => true, - _ => false, - } - } - - /// Creates a new `Error` representing an error encountered by the timer - /// implementation - pub fn timer(err: crate::Error) -> Error<T> { - Error(Kind::Timer(err)) - } - - /// Returns `true` if the error was caused by the timer. - pub fn is_timer(&self) -> bool { - match self.0 { - Kind::Timer(_) => true, - _ => false, - } - } - /// Consumes `self`, returning the error raised by the timer implementation. - pub fn into_timer(self) -> Option<crate::Error> { - match self.0 { - Kind::Timer(err) => Some(err), - _ => None, + // Now check the timer + ready!(self.map_unchecked_mut(|me| &mut me.delay).poll(cx)); + // if delay was ready, timeout elapsed! + Poll::Ready(Some(Err(Elapsed(())))) } } } -impl<T: error::Error> error::Error for Error<T> { - fn description(&self) -> &str { - use self::Kind::*; +// ===== impl Elapsed ===== - match self.0 { - Inner(ref e) => e.description(), - Elapsed => "deadline has elapsed", - Timer(ref e) => e.description(), - } - } -} - -impl<T: fmt::Display> fmt::Display for Error<T> { +impl fmt::Display for Elapsed { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - use self::Kind::*; - - match self.0 { - Inner(ref e) => e.fmt(fmt), - Elapsed => "deadline has elapsed".fmt(fmt), - Timer(ref e) => e.fmt(fmt), - } + "deadline has elapsed".fmt(fmt) } } + +impl std::error::Error for Elapsed {} diff --git a/tokio-timer/src/timer/entry.rs b/tokio-timer/src/timer/entry.rs index 3ce969a1..c1676e97 100644 --- a/tokio-timer/src/timer/entry.rs +++ b/tokio-timer/src/timer/entry.rs @@ -2,15 +2,15 @@ use crate::atomic::AtomicU64; use crate::timer::{HandlePriv, Inner}; use crate::Error; use crossbeam_utils::CachePadded; -use futures::task::AtomicTask; -use futures::Poll; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::{Arc, Weak}; +use std::task::{self, Poll}; use std::time::{Duration, Instant}; use std::u64; +use tokio_sync::task::AtomicWaker; /// Internal state shared between a `Delay` instance and the timer. /// @@ -46,7 +46,7 @@ pub(crate) struct Entry { state: AtomicU64, /// Task to notify once the deadline is reached. - task: AtomicTask, + waker: AtomicWaker, /// True when the entry is queued in the "process" stack. This value /// is set before pushing the value and unset after popping the value. @@ -109,7 +109,7 @@ impl Entry { Entry { time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })), inner: None, - task: AtomicTask::new(), + waker: AtomicWaker::new(), state: AtomicU64::new(0), queued: AtomicBool::new(false), next_atomic: UnsafeCell::new(ptr::null_mut()), @@ -246,7 +246,7 @@ impl Entry { curr = actual; } - self.task.notify(); + self.waker.wake(); } pub fn error(&self) { @@ -269,7 +269,7 @@ impl Entry { curr = actual; } - self.task.notify(); + self.waker.wake(); } pub fn cancel(entry: &Arc<Entry>) { @@ -289,32 +289,31 @@ impl Entry { let _ = inner.queue(entry); } - pub fn poll_elapsed(&self) -> Poll<(), Error> { - use futures::Async::NotReady; + pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { let mut curr = self.state.load(SeqCst); if is_elapsed(curr) { - if curr == ERROR { - return Err(Error::shutdown()); + return Poll::Ready(if curr == ERROR { + Err(Error::shutdown()) } else { - return Ok(().into()); - } + Ok(()) + }); } - self.task.register(); + self.waker.register_by_ref(cx.waker()); curr = self.state.load(SeqCst).into(); if is_elapsed(curr) { - if curr == ERROR { - return Err(Error::shutdown()); + return Poll::Ready(if curr == ERROR { + Err(Error::shutdown()) } else { - return Ok(().into()); - } + Ok(()) + }); } - Ok(NotReady) + Poll::Pending } /// Only called by `Registration` diff --git a/tokio-timer/src/timer/handle.rs b/tokio-timer/src/timer/handle.rs index 7e8b95c7..128723f3 100644 --- a/tokio-timer/src/timer/handle.rs +++ b/tokio-timer/src/timer/handle.rs @@ -1,9 +1,9 @@ use crate::timer::Inner; -use crate::{Deadline, Delay, Error, Interval, Timeout}; +use crate::{Delay, Error, /*Interval,*/ Timeout}; use std::cell::RefCell; use std::fmt; use std::sync::{Arc, Weak}; -use std::time::{Duration, Instant}; +use std::time::{/*Duration,*/ Instant}; use tokio_executor::Enter; /// Handle to timer instance. @@ -137,22 +137,18 @@ impl Handle { } } - #[doc(hidden)] - #[deprecated(since = "0.2.11", note = "use timeout instead")] - pub fn deadline<T>(&self, future: T, deadline: Instant) -> Deadline<T> { - Deadline::new_with_delay(future, self.delay(deadline)) - } - /// Create a `Timeout` driven by this handle's associated `Timer`. pub fn timeout<T>(&self, value: T, deadline: Instant) -> Timeout<T> { Timeout::new_with_delay(value, self.delay(deadline)) } + /* /// Create a new `Interval` that starts at `at` and yields every `duration` /// interval after that. pub fn interval(&self, at: Instant, duration: Duration) -> Interval { Interval::new_with_delay(self.delay(at), duration) } + */ fn as_priv(&self) -> Option<&HandlePriv> { self.inner.as_ref() diff --git a/tokio-timer/src/timer/registration.rs b/tokio-timer/src/timer/registration.rs index ee7e3feb..74a32d90 100644 --- a/tokio-timer/src/timer/registration.rs +++ b/tokio-timer/src/timer/registration.rs @@ -1,8 +1,7 @@ -use crate::clock::now; use crate::timer::{Entry, HandlePriv}; use crate::Error; -use futures::Poll; use std::sync::Arc; +use std::task::{self, Poll}; use std::time::{Duration, Instant}; /// Registration with a timer. @@ -43,8 +42,10 @@ impl Registration { Entry::reset(&mut self.entry); } + // Used by `Timeout<Stream>` + #[cfg(feature = "timeout-stream")] pub fn reset_timeout(&mut self) { - let deadline = now() + self.entry.time_ref().duration; + let deadline = crate::clock::now() + self.entry.time_ref().duration; self.entry.time_mut().deadline = deadline; Entry::reset(&mut self.entry); } @@ -53,8 +54,8 @@ impl Registration { self.entry.is_elapsed() } - pub fn poll_elapsed(&self) -> Poll<(), Error> { - self.entry.poll_elapsed() + pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + self.entry.poll_elapsed(cx) } } |