summaryrefslogtreecommitdiffstats
path: root/tokio-timer/src
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-timer/src')
-rw-r--r--tokio-timer/src/delay.rs26
-rw-r--r--tokio-timer/src/delay_queue.rs56
-rw-r--r--tokio-timer/src/interval.rs15
-rw-r--r--tokio-timer/src/lib.rs19
-rw-r--r--tokio-timer/src/throttle.rs137
-rw-r--r--tokio-timer/src/timeout.rs178
-rw-r--r--tokio-timer/src/timer/entry.rs35
-rw-r--r--tokio-timer/src/timer/handle.rs12
-rw-r--r--tokio-timer/src/timer/registration.rs11
9 files changed, 177 insertions, 312 deletions
diff --git a/tokio-timer/src/delay.rs b/tokio-timer/src/delay.rs
index 941dde7a..47c4fab3 100644
--- a/tokio-timer/src/delay.rs
+++ b/tokio-timer/src/delay.rs
@@ -1,7 +1,8 @@
use crate::timer::{HandlePriv, Registration};
-use crate::Error;
-use futures::{Future, Poll};
+use std::future::Future;
+use std::pin::Pin;
use std::time::{Duration, Instant};
+use std::task::{self, Poll};
/// A future that completes at a specified instant in time.
///
@@ -72,6 +73,8 @@ impl Delay {
self.registration.reset(deadline);
}
+ // Used by `Timeout<Stream>`
+ #[cfg(feature = "timeout-stream")]
pub(crate) fn reset_timeout(&mut self) {
self.registration.reset_timeout();
}
@@ -84,13 +87,24 @@ impl Delay {
}
impl Future for Delay {
- type Item = ();
- type Error = Error;
+ type Output = ();
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// Ensure the `Delay` instance is associated with a timer.
self.register();
- self.registration.poll_elapsed()
+ // `poll_elapsed` can return an error in two cases:
+ //
+ // - AtCapacity: this is a pathlogical case where far too many
+ // delays have been scheduled.
+ // - Shutdown: No timer has been setup, which is a mis-use error.
+ //
+ // Both cases are extremely rare, and pretty accurately fit into
+ // "logic errors", so we just panic in this case. A user couldn't
+ // really do much better if we passed the error onwards.
+ match ready!(self.registration.poll_elapsed(cx)) {
+ Ok(()) => Poll::Ready(()),
+ Err(e) => panic!("timer error: {}", e),
+ }
}
}
diff --git a/tokio-timer/src/delay_queue.rs b/tokio-timer/src/delay_queue.rs
index 56ff9e5b..77b7b409 100644
--- a/tokio-timer/src/delay_queue.rs
+++ b/tokio-timer/src/delay_queue.rs
@@ -8,10 +8,13 @@ use crate::clock::now;
use crate::timer::Handle;
use crate::wheel::{self, Wheel};
use crate::{Delay, Error};
-use futures::{try_ready, Future, Poll, Stream};
+use futures_core::Stream;
use slab::Slab;
use std::cmp;
+use std::future::Future;
use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::{self, Poll};
use std::time::{Duration, Instant};
/// A queue of delayed elements.
@@ -177,7 +180,7 @@ pub struct Key {
struct Stack<T> {
/// Head of the stack
head: Option<usize>,
- _p: PhantomData<T>,
+ _p: PhantomData<fn() -> T>,
}
#[derive(Debug)]
@@ -645,19 +648,19 @@ impl<T> DelayQueue<T> {
/// should be returned.
///
/// A slot should be returned when the associated deadline has been reached.
- fn poll_idx(&mut self) -> Poll<Option<usize>, Error> {
+ fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> {
use self::wheel::Stack;
let expired = self.expired.pop(&mut self.slab);
if expired.is_some() {
- return Ok(expired.into());
+ return Poll::Ready(expired.map(Ok));
}
loop {
if let Some(ref mut delay) = self.delay {
if !delay.is_elapsed() {
- try_ready!(delay.poll());
+ ready!(Pin::new(&mut *delay).poll(cx));
}
let now = crate::ms(delay.deadline() - self.start, crate::Round::Down);
@@ -668,13 +671,13 @@ impl<T> DelayQueue<T> {
self.delay = None;
if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) {
- return Ok(Some(idx).into());
+ return Poll::Ready(Some(Ok(idx)));
}
if let Some(deadline) = self.next_deadline() {
self.delay = Some(self.handle.delay(deadline));
} else {
- return Ok(None.into());
+ return Poll::Ready(None);
}
}
}
@@ -690,24 +693,29 @@ impl<T> DelayQueue<T> {
}
}
-impl<T> Stream for DelayQueue<T> {
- type Item = Expired<T>;
- type Error = Error;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Error> {
- let item = try_ready!(self.poll_idx()).map(|idx| {
- let data = self.slab.remove(idx);
- debug_assert!(data.next.is_none());
- debug_assert!(data.prev.is_none());
-
- Expired {
- key: Key::new(idx),
- data: data.inner,
- deadline: self.start + Duration::from_millis(data.when),
- }
- });
+// We never put `T` in a `Pin`...
+impl<T> Unpin for DelayQueue<T> {}
- Ok(item.into())
+impl<T> Stream for DelayQueue<T> {
+ // DelayQueue seems much more specific, where a user may care that it
+ // has reached capacity, so return those errors instead of panicking.
+ type Item = Result<Expired<T>, Error>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ let item = ready!(self.poll_idx(cx));
+ Poll::Ready(item.map(|result| {
+ result.map(|idx| {
+ let data = self.slab.remove(idx);
+ debug_assert!(data.next.is_none());
+ debug_assert!(data.prev.is_none());
+
+ Expired {
+ key: Key::new(idx),
+ data: data.inner,
+ deadline: self.start + Duration::from_millis(data.when),
+ }
+ })
+ }))
}
}
diff --git a/tokio-timer/src/interval.rs b/tokio-timer/src/interval.rs
index e065bf12..8b18e33d 100644
--- a/tokio-timer/src/interval.rs
+++ b/tokio-timer/src/interval.rs
@@ -1,6 +1,9 @@
use crate::clock;
use crate::Delay;
-use futures::{try_ready, Future, Poll, Stream};
+use futures_core::Stream;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
use std::time::{Duration, Instant};
/// A stream representing notifications at fixed interval
@@ -53,20 +56,20 @@ impl Interval {
impl Stream for Interval {
type Item = Instant;
- type Error = crate::Error;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// Wait for the delay to be done
- let _ = try_ready!(self.delay.poll());
+ ready!(Pin::new(&mut self.delay).poll(cx));
// Get the `now` by looking at the `delay` deadline
let now = self.delay.deadline();
// The next interval value is `duration` after the one that just
// yielded.
- self.delay.reset(now + self.duration);
+ let next = now + self.duration;
+ self.delay.reset(next);
// Return the current instant
- Ok(Some(now).into())
+ Poll::Ready(Some(now))
}
}
diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs
index 3a960240..083d8d87 100644
--- a/tokio-timer/src/lib.rs
+++ b/tokio-timer/src/lib.rs
@@ -31,27 +31,36 @@
//! [`Interval`]: struct.Interval.html
//! [`Timer`]: timer/struct.Timer.html
+macro_rules! ready {
+ ($e:expr) => (
+ match $e {
+ ::std::task::Poll::Ready(v) => v,
+ ::std::task::Poll::Pending => return ::std::task::Poll::Pending,
+ }
+ )
+}
+
pub mod clock;
+#[cfg(feature = "delay-queue")]
pub mod delay_queue;
+#[cfg(feature = "throttle")]
pub mod throttle;
pub mod timeout;
pub mod timer;
mod atomic;
-mod deadline;
mod delay;
mod error;
+#[cfg(feature = "interval")]
mod interval;
mod wheel;
-#[deprecated(since = "0.2.6", note = "use Timeout instead")]
-#[doc(hidden)]
-#[allow(deprecated)]
-pub use deadline::{Deadline, DeadlineError};
pub use delay::Delay;
+#[cfg(feature = "delay-queue")]
#[doc(inline)]
pub use delay_queue::DelayQueue;
pub use error::Error;
+#[cfg(feature = "interval")]
pub use interval::Interval;
#[doc(inline)]
pub use timeout::Timeout;
diff --git a/tokio-timer/src/throttle.rs b/tokio-timer/src/throttle.rs
index de71bf7f..ab8733fd 100644
--- a/tokio-timer/src/throttle.rs
+++ b/tokio-timer/src/throttle.rs
@@ -1,11 +1,12 @@
//! Slow down a stream by enforcing a delay between items.
-use crate::{clock, Delay, Error};
-use futures::future::Either;
-use futures::{try_ready, Async, Future, Poll, Stream};
+use crate::{clock, Delay};
+use futures_core::Stream;
use std::{
- error::Error as StdError,
- fmt::{Display, Formatter, Result as FmtResult},
+ future::Future,
+ marker::Unpin,
+ pin::Pin,
+ task::{self, Poll},
time::Duration,
};
@@ -13,26 +14,25 @@ use std::{
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Throttle<T> {
- delay: Option<Delay>,
- duration: Duration,
+ delay: Delay,
+ /// Set to true when `delay` has returned ready, but `stream` hasn't.
+ has_delayed: bool,
stream: T,
}
-/// Either the error of the underlying stream, or an error within
-/// tokio's timing machinery.
-#[derive(Debug)]
-pub struct ThrottleError<T>(Either<T, Error>);
-
impl<T> Throttle<T> {
/// Slow down a stream by enforcing a delay between items.
pub fn new(stream: T, duration: Duration) -> Self {
Self {
- delay: None,
- duration: duration,
+ delay: Delay::new_timeout(clock::now() + duration, duration),
+ has_delayed: false,
stream: stream,
}
}
+}
+// XXX: are these safe if `T: !Unpin`?
+impl<T: Unpin> Throttle<T> {
/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
@@ -59,107 +59,22 @@ impl<T> Throttle<T> {
impl<T: Stream> Stream for Throttle<T> {
type Item = T::Item;
- type Error = ThrottleError<T::Error>;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- if let Some(ref mut delay) = self.delay {
- try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) });
- }
- self.delay = None;
- let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) });
-
- if value.is_some() {
- self.delay = Some(Delay::new(clock::now() + self.duration));
- }
-
- Ok(Async::Ready(value))
- }
-}
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ unsafe {
+ if !self.has_delayed {
+ ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx));
+ self.as_mut().get_unchecked_mut().has_delayed = true;
+ }
-impl<T> ThrottleError<T> {
- /// Creates a new `ThrottleError` from the given stream error.
- pub fn from_stream_err(err: T) -> Self {
- ThrottleError(Either::A(err))
- }
-
- /// Creates a new `ThrottleError` from the given tokio timer error.
- pub fn from_timer_err(err: Error) -> Self {
- ThrottleError(Either::B(err))
- }
-
- /// Attempts to get the underlying stream error, if it is present.
- pub fn get_stream_error(&self) -> Option<&T> {
- match self.0 {
- Either::A(ref x) => Some(x),
- _ => None,
- }
- }
-
- /// Attempts to get the underlying timer error, if it is present.
- pub fn get_timer_error(&self) -> Option<&Error> {
- match self.0 {
- Either::B(ref x) => Some(x),
- _ => None,
- }
- }
+ let value = ready!(self.as_mut().map_unchecked_mut(|me| &mut me.stream).poll_next(cx));
- /// Attempts to extract the underlying stream error, if it is present.
- pub fn into_stream_error(self) -> Option<T> {
- match self.0 {
- Either::A(x) => Some(x),
- _ => None,
- }
- }
-
- /// Attempts to extract the underlying timer error, if it is present.
- pub fn into_timer_error(self) -> Option<Error> {
- match self.0 {
- Either::B(x) => Some(x),
- _ => None,
- }
- }
-
- /// Returns whether the throttle error has occured because of an error
- /// in the underlying stream.
- pub fn is_stream_error(&self) -> bool {
- !self.is_timer_error()
- }
-
- /// Returns whether the throttle error has occured because of an error
- /// in tokio's timer system.
- pub fn is_timer_error(&self) -> bool {
- match self.0 {
- Either::A(_) => false,
- Either::B(_) => true,
- }
- }
-}
-
-impl<T: StdError> Display for ThrottleError<T> {
- fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
- match self.0 {
- Either::A(ref err) => write!(f, "stream error: {}", err),
- Either::B(ref err) => write!(f, "timer error: {}", err),
- }
- }
-}
-
-impl<T: StdError + 'static> StdError for ThrottleError<T> {
- fn description(&self) -> &str {
- match self.0 {
- Either::A(_) => "stream error",
- Either::B(_) => "timer error",
- }
- }
+ if value.is_some() {
+ self.as_mut().get_unchecked_mut().delay.reset_timeout();
+ self.as_mut().get_unchecked_mut().has_delayed = false;
+ }
- // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30,
- // replace this with Error::source.
- #[allow(deprecated)]
- fn cause(&self) -> Option<&dyn StdError> {
- match self.0 {
- Either::A(ref err) => Some(err),
- Either::B(ref err) => Some(err),
+ Poll::Ready(value)
}
}
}
diff --git a/tokio-timer/src/timeout.rs b/tokio-timer/src/timeout.rs
index e860d3f0..42c0d9b1 100644
--- a/tokio-timer/src/timeout.rs
+++ b/tokio-timer/src/timeout.rs
@@ -6,9 +6,12 @@
use crate::clock::now;
use crate::Delay;
-use futures::{Async, Future, Poll, Stream};
-use std::error;
+#[cfg(feature = "timeout-stream")]
+use futures_core::Stream;
use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
use std::time::{Duration, Instant};
/// Allows a `Future` or `Stream` to execute for a limited amount of time.
@@ -69,22 +72,10 @@ pub struct Timeout<T> {
delay: Delay,
}
-/// Error returned by `Timeout`.
-#[derive(Debug)]
-pub struct Error<T>(Kind<T>);
-/// Timeout error variants
+/// Error returned by `Timeout`.
#[derive(Debug)]
-enum Kind<T> {
- /// Inner value returned an error
- Inner(T),
-
- /// The timeout elapsed.
- Elapsed,
-
- /// Timer returned an error.
- Timer(crate::Error),
-}
+pub struct Elapsed(());
impl<T> Timeout<T> {
/// Create a new `Timeout` that allows `value` to execute for a duration of
@@ -161,141 +152,70 @@ impl<T> Future for Timeout<T>
where
T: Future,
{
- type Item = T::Item;
- type Error = Error<T::Error>;
+ type Output = Result<T::Output, Elapsed>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// First, try polling the future
- match self.value.poll() {
- Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
- Ok(Async::NotReady) => {}
- Err(e) => return Err(Error::inner(e)),
+
+ // Safety: we never move `self.value`
+ unsafe {
+ let p = self.as_mut().map_unchecked_mut(|me| &mut me.value);
+ if let Poll::Ready(v) = p.poll(cx) {
+ return Poll::Ready(Ok(v));
+ }
}
// Now check the timer
- match self.delay.poll() {
- Ok(Async::NotReady) => Ok(Async::NotReady),
- Ok(Async::Ready(_)) => Err(Error::elapsed()),
- Err(e) => Err(Error::timer(e)),
+ // Safety: X_X!
+ unsafe {
+ match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) {
+ Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))),
+ Poll::Pending => Poll::Pending
+ }
}
}
}
+#[cfg(feature = "timeout-stream")]
impl<T> Stream for Timeout<T>
where
T: Stream,
{
- type Item = T::Item;
- type Error = Error<T::Error>;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- // First, try polling the future
- match self.value.poll() {
- Ok(Async::Ready(v)) => {
+ type Item = Result<T::Item, Elapsed>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ // Safety: T might be !Unpin, but we never move neither `value`
+ // nor `delay`.
+ //
+ // ... X_X
+ unsafe {
+ // First, try polling the future
+ let v = self
+ .as_mut()
+ .map_unchecked_mut(|me| &mut me.value)
+ .poll_next(cx);
+
+ if let Poll::Ready(v) = v {
if v.is_some() {
- self.delay.reset_timeout();
+ self.as_mut().get_unchecked_mut().delay.reset_timeout();
}
- return Ok(Async::Ready(v));
+ return Poll::Ready(v.map(Ok));
}
- Ok(Async::NotReady) => {}
- Err(e) => return Err(Error::inner(e)),
- }
-
- // Now check the timer
- match self.delay.poll() {
- Ok(Async::NotReady) => Ok(Async::NotReady),
- Ok(Async::Ready(_)) => {
- self.delay.reset_timeout();
- Err(Error::elapsed())
- }
- Err(e) => Err(Error::timer(e)),
- }
- }
-}
-
-// ===== impl Error =====
-
-impl<T> Error<T> {
- /// Create a new `Error` representing the inner value completing with `Err`.
- pub fn inner(err: T) -> Error<T> {
- Error(Kind::Inner(err))
- }
-
- /// Returns `true` if the error was caused by the inner value completing
- /// with `Err`.
- pub fn is_inner(&self) -> bool {
- match self.0 {
- Kind::Inner(_) => true,
- _ => false,
- }
- }
-
- /// Consumes `self`, returning the inner future error.
- pub fn into_inner(self) -> Option<T> {
- match self.0 {
- Kind::Inner(err) => Some(err),
- _ => None,
- }
- }
-
- /// Create a new `Error` representing the inner value not completing before
- /// the deadline is reached.
- pub fn elapsed() -> Error<T> {
- Error(Kind::Elapsed)
- }
-
- /// Returns `true` if the error was caused by the inner value not completing
- /// before the deadline is reached.
- pub fn is_elapsed(&self) -> bool {
- match self.0 {
- Kind::Elapsed => true,
- _ => false,
- }
- }
-
- /// Creates a new `Error` representing an error encountered by the timer
- /// implementation
- pub fn timer(err: crate::Error) -> Error<T> {
- Error(Kind::Timer(err))
- }
-
- /// Returns `true` if the error was caused by the timer.
- pub fn is_timer(&self) -> bool {
- match self.0 {
- Kind::Timer(_) => true,
- _ => false,
- }
- }
- /// Consumes `self`, returning the error raised by the timer implementation.
- pub fn into_timer(self) -> Option<crate::Error> {
- match self.0 {
- Kind::Timer(err) => Some(err),
- _ => None,
+ // Now check the timer
+ ready!(self.map_unchecked_mut(|me| &mut me.delay).poll(cx));
+ // if delay was ready, timeout elapsed!
+ Poll::Ready(Some(Err(Elapsed(()))))
}
}
}
-impl<T: error::Error> error::Error for Error<T> {
- fn description(&self) -> &str {
- use self::Kind::*;
+// ===== impl Elapsed =====
- match self.0 {
- Inner(ref e) => e.description(),
- Elapsed => "deadline has elapsed",
- Timer(ref e) => e.description(),
- }
- }
-}
-
-impl<T: fmt::Display> fmt::Display for Error<T> {
+impl fmt::Display for Elapsed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- use self::Kind::*;
-
- match self.0 {
- Inner(ref e) => e.fmt(fmt),
- Elapsed => "deadline has elapsed".fmt(fmt),
- Timer(ref e) => e.fmt(fmt),
- }
+ "deadline has elapsed".fmt(fmt)
}
}
+
+impl std::error::Error for Elapsed {}
diff --git a/tokio-timer/src/timer/entry.rs b/tokio-timer/src/timer/entry.rs
index 3ce969a1..c1676e97 100644
--- a/tokio-timer/src/timer/entry.rs
+++ b/tokio-timer/src/timer/entry.rs
@@ -2,15 +2,15 @@ use crate::atomic::AtomicU64;
use crate::timer::{HandlePriv, Inner};
use crate::Error;
use crossbeam_utils::CachePadded;
-use futures::task::AtomicTask;
-use futures::Poll;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, Weak};
+use std::task::{self, Poll};
use std::time::{Duration, Instant};
use std::u64;
+use tokio_sync::task::AtomicWaker;
/// Internal state shared between a `Delay` instance and the timer.
///
@@ -46,7 +46,7 @@ pub(crate) struct Entry {
state: AtomicU64,
/// Task to notify once the deadline is reached.
- task: AtomicTask,
+ waker: AtomicWaker,
/// True when the entry is queued in the "process" stack. This value
/// is set before pushing the value and unset after popping the value.
@@ -109,7 +109,7 @@ impl Entry {
Entry {
time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
inner: None,
- task: AtomicTask::new(),
+ waker: AtomicWaker::new(),
state: AtomicU64::new(0),
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
@@ -246,7 +246,7 @@ impl Entry {
curr = actual;
}
- self.task.notify();
+ self.waker.wake();
}
pub fn error(&self) {
@@ -269,7 +269,7 @@ impl Entry {
curr = actual;
}
- self.task.notify();
+ self.waker.wake();
}
pub fn cancel(entry: &Arc<Entry>) {
@@ -289,32 +289,31 @@ impl Entry {
let _ = inner.queue(entry);
}
- pub fn poll_elapsed(&self) -> Poll<(), Error> {
- use futures::Async::NotReady;
+ pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let mut curr = self.state.load(SeqCst);
if is_elapsed(curr) {
- if curr == ERROR {
- return Err(Error::shutdown());
+ return Poll::Ready(if curr == ERROR {
+ Err(Error::shutdown())
} else {
- return Ok(().into());
- }
+ Ok(())
+ });
}
- self.task.register();
+ self.waker.register_by_ref(cx.waker());
curr = self.state.load(SeqCst).into();
if is_elapsed(curr) {
- if curr == ERROR {
- return Err(Error::shutdown());
+ return Poll::Ready(if curr == ERROR {
+ Err(Error::shutdown())
} else {
- return Ok(().into());
- }
+ Ok(())
+ });
}
- Ok(NotReady)
+ Poll::Pending
}
/// Only called by `Registration`
diff --git a/tokio-timer/src/timer/handle.rs b/tokio-timer/src/timer/handle.rs
index 7e8b95c7..128723f3 100644
--- a/tokio-timer/src/timer/handle.rs
+++ b/tokio-timer/src/timer/handle.rs
@@ -1,9 +1,9 @@
use crate::timer::Inner;
-use crate::{Deadline, Delay, Error, Interval, Timeout};
+use crate::{Delay, Error, /*Interval,*/ Timeout};
use std::cell::RefCell;
use std::fmt;
use std::sync::{Arc, Weak};
-use std::time::{Duration, Instant};
+use std::time::{/*Duration,*/ Instant};
use tokio_executor::Enter;
/// Handle to timer instance.
@@ -137,22 +137,18 @@ impl Handle {
}
}
- #[doc(hidden)]
- #[deprecated(since = "0.2.11", note = "use timeout instead")]
- pub fn deadline<T>(&self, future: T, deadline: Instant) -> Deadline<T> {
- Deadline::new_with_delay(future, self.delay(deadline))
- }
-
/// Create a `Timeout` driven by this handle's associated `Timer`.
pub fn timeout<T>(&self, value: T, deadline: Instant) -> Timeout<T> {
Timeout::new_with_delay(value, self.delay(deadline))
}
+ /*
/// Create a new `Interval` that starts at `at` and yields every `duration`
/// interval after that.
pub fn interval(&self, at: Instant, duration: Duration) -> Interval {
Interval::new_with_delay(self.delay(at), duration)
}
+ */
fn as_priv(&self) -> Option<&HandlePriv> {
self.inner.as_ref()
diff --git a/tokio-timer/src/timer/registration.rs b/tokio-timer/src/timer/registration.rs
index ee7e3feb..74a32d90 100644
--- a/tokio-timer/src/timer/registration.rs
+++ b/tokio-timer/src/timer/registration.rs
@@ -1,8 +1,7 @@
-use crate::clock::now;
use crate::timer::{Entry, HandlePriv};
use crate::Error;
-use futures::Poll;
use std::sync::Arc;
+use std::task::{self, Poll};
use std::time::{Duration, Instant};
/// Registration with a timer.
@@ -43,8 +42,10 @@ impl Registration {
Entry::reset(&mut self.entry);
}
+ // Used by `Timeout<Stream>`
+ #[cfg(feature = "timeout-stream")]
pub fn reset_timeout(&mut self) {
- let deadline = now() + self.entry.time_ref().duration;
+ let deadline = crate::clock::now() + self.entry.time_ref().duration;
self.entry.time_mut().deadline = deadline;
Entry::reset(&mut self.entry);
}
@@ -53,8 +54,8 @@ impl Registration {
self.entry.is_elapsed()
}
- pub fn poll_elapsed(&self) -> Poll<(), Error> {
- self.entry.poll_elapsed()
+ pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ self.entry.poll_elapsed(cx)
}
}