diff options
Diffstat (limited to 'tokio-timer/src/timeout.rs')
-rw-r--r-- | tokio-timer/src/timeout.rs | 178 |
1 files changed, 49 insertions, 129 deletions
diff --git a/tokio-timer/src/timeout.rs b/tokio-timer/src/timeout.rs index e860d3f0..42c0d9b1 100644 --- a/tokio-timer/src/timeout.rs +++ b/tokio-timer/src/timeout.rs @@ -6,9 +6,12 @@ use crate::clock::now; use crate::Delay; -use futures::{Async, Future, Poll, Stream}; -use std::error; +#[cfg(feature = "timeout-stream")] +use futures_core::Stream; use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; use std::time::{Duration, Instant}; /// Allows a `Future` or `Stream` to execute for a limited amount of time. @@ -69,22 +72,10 @@ pub struct Timeout<T> { delay: Delay, } -/// Error returned by `Timeout`. -#[derive(Debug)] -pub struct Error<T>(Kind<T>); -/// Timeout error variants +/// Error returned by `Timeout`. #[derive(Debug)] -enum Kind<T> { - /// Inner value returned an error - Inner(T), - - /// The timeout elapsed. - Elapsed, - - /// Timer returned an error. - Timer(crate::Error), -} +pub struct Elapsed(()); impl<T> Timeout<T> { /// Create a new `Timeout` that allows `value` to execute for a duration of @@ -161,141 +152,70 @@ impl<T> Future for Timeout<T> where T: Future, { - type Item = T::Item; - type Error = Error<T::Error>; + type Output = Result<T::Output, Elapsed>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { // First, try polling the future - match self.value.poll() { - Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), - Ok(Async::NotReady) => {} - Err(e) => return Err(Error::inner(e)), + + // Safety: we never move `self.value` + unsafe { + let p = self.as_mut().map_unchecked_mut(|me| &mut me.value); + if let Poll::Ready(v) = p.poll(cx) { + return Poll::Ready(Ok(v)); + } } // Now check the timer - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => Err(Error::elapsed()), - Err(e) => Err(Error::timer(e)), + // Safety: X_X! + unsafe { + match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))), + Poll::Pending => Poll::Pending + } } } } +#[cfg(feature = "timeout-stream")] impl<T> Stream for Timeout<T> where T: Stream, { - type Item = T::Item; - type Error = Error<T::Error>; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - // First, try polling the future - match self.value.poll() { - Ok(Async::Ready(v)) => { + type Item = Result<T::Item, Elapsed>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + // Safety: T might be !Unpin, but we never move neither `value` + // nor `delay`. + // + // ... X_X + unsafe { + // First, try polling the future + let v = self + .as_mut() + .map_unchecked_mut(|me| &mut me.value) + .poll_next(cx); + + if let Poll::Ready(v) = v { if v.is_some() { - self.delay.reset_timeout(); + self.as_mut().get_unchecked_mut().delay.reset_timeout(); } - return Ok(Async::Ready(v)); + return Poll::Ready(v.map(Ok)); } - Ok(Async::NotReady) => {} - Err(e) => return Err(Error::inner(e)), - } - - // Now check the timer - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => { - self.delay.reset_timeout(); - Err(Error::elapsed()) - } - Err(e) => Err(Error::timer(e)), - } - } -} - -// ===== impl Error ===== - -impl<T> Error<T> { - /// Create a new `Error` representing the inner value completing with `Err`. - pub fn inner(err: T) -> Error<T> { - Error(Kind::Inner(err)) - } - - /// Returns `true` if the error was caused by the inner value completing - /// with `Err`. - pub fn is_inner(&self) -> bool { - match self.0 { - Kind::Inner(_) => true, - _ => false, - } - } - - /// Consumes `self`, returning the inner future error. - pub fn into_inner(self) -> Option<T> { - match self.0 { - Kind::Inner(err) => Some(err), - _ => None, - } - } - - /// Create a new `Error` representing the inner value not completing before - /// the deadline is reached. - pub fn elapsed() -> Error<T> { - Error(Kind::Elapsed) - } - - /// Returns `true` if the error was caused by the inner value not completing - /// before the deadline is reached. - pub fn is_elapsed(&self) -> bool { - match self.0 { - Kind::Elapsed => true, - _ => false, - } - } - - /// Creates a new `Error` representing an error encountered by the timer - /// implementation - pub fn timer(err: crate::Error) -> Error<T> { - Error(Kind::Timer(err)) - } - - /// Returns `true` if the error was caused by the timer. - pub fn is_timer(&self) -> bool { - match self.0 { - Kind::Timer(_) => true, - _ => false, - } - } - /// Consumes `self`, returning the error raised by the timer implementation. - pub fn into_timer(self) -> Option<crate::Error> { - match self.0 { - Kind::Timer(err) => Some(err), - _ => None, + // Now check the timer + ready!(self.map_unchecked_mut(|me| &mut me.delay).poll(cx)); + // if delay was ready, timeout elapsed! + Poll::Ready(Some(Err(Elapsed(())))) } } } -impl<T: error::Error> error::Error for Error<T> { - fn description(&self) -> &str { - use self::Kind::*; +// ===== impl Elapsed ===== - match self.0 { - Inner(ref e) => e.description(), - Elapsed => "deadline has elapsed", - Timer(ref e) => e.description(), - } - } -} - -impl<T: fmt::Display> fmt::Display for Error<T> { +impl fmt::Display for Elapsed { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - use self::Kind::*; - - match self.0 { - Inner(ref e) => e.fmt(fmt), - Elapsed => "deadline has elapsed".fmt(fmt), - Timer(ref e) => e.fmt(fmt), - } + "deadline has elapsed".fmt(fmt) } } + +impl std::error::Error for Elapsed {} |