summaryrefslogtreecommitdiffstats
path: root/tokio-timer/src/timeout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-timer/src/timeout.rs')
-rw-r--r--tokio-timer/src/timeout.rs178
1 files changed, 49 insertions, 129 deletions
diff --git a/tokio-timer/src/timeout.rs b/tokio-timer/src/timeout.rs
index e860d3f0..42c0d9b1 100644
--- a/tokio-timer/src/timeout.rs
+++ b/tokio-timer/src/timeout.rs
@@ -6,9 +6,12 @@
use crate::clock::now;
use crate::Delay;
-use futures::{Async, Future, Poll, Stream};
-use std::error;
+#[cfg(feature = "timeout-stream")]
+use futures_core::Stream;
use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
use std::time::{Duration, Instant};
/// Allows a `Future` or `Stream` to execute for a limited amount of time.
@@ -69,22 +72,10 @@ pub struct Timeout<T> {
delay: Delay,
}
-/// Error returned by `Timeout`.
-#[derive(Debug)]
-pub struct Error<T>(Kind<T>);
-/// Timeout error variants
+/// Error returned by `Timeout`.
#[derive(Debug)]
-enum Kind<T> {
- /// Inner value returned an error
- Inner(T),
-
- /// The timeout elapsed.
- Elapsed,
-
- /// Timer returned an error.
- Timer(crate::Error),
-}
+pub struct Elapsed(());
impl<T> Timeout<T> {
/// Create a new `Timeout` that allows `value` to execute for a duration of
@@ -161,141 +152,70 @@ impl<T> Future for Timeout<T>
where
T: Future,
{
- type Item = T::Item;
- type Error = Error<T::Error>;
+ type Output = Result<T::Output, Elapsed>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// First, try polling the future
- match self.value.poll() {
- Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
- Ok(Async::NotReady) => {}
- Err(e) => return Err(Error::inner(e)),
+
+ // Safety: we never move `self.value`
+ unsafe {
+ let p = self.as_mut().map_unchecked_mut(|me| &mut me.value);
+ if let Poll::Ready(v) = p.poll(cx) {
+ return Poll::Ready(Ok(v));
+ }
}
// Now check the timer
- match self.delay.poll() {
- Ok(Async::NotReady) => Ok(Async::NotReady),
- Ok(Async::Ready(_)) => Err(Error::elapsed()),
- Err(e) => Err(Error::timer(e)),
+ // Safety: X_X!
+ unsafe {
+ match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) {
+ Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))),
+ Poll::Pending => Poll::Pending
+ }
}
}
}
+#[cfg(feature = "timeout-stream")]
impl<T> Stream for Timeout<T>
where
T: Stream,
{
- type Item = T::Item;
- type Error = Error<T::Error>;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- // First, try polling the future
- match self.value.poll() {
- Ok(Async::Ready(v)) => {
+ type Item = Result<T::Item, Elapsed>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ // Safety: T might be !Unpin, but we never move neither `value`
+ // nor `delay`.
+ //
+ // ... X_X
+ unsafe {
+ // First, try polling the future
+ let v = self
+ .as_mut()
+ .map_unchecked_mut(|me| &mut me.value)
+ .poll_next(cx);
+
+ if let Poll::Ready(v) = v {
if v.is_some() {
- self.delay.reset_timeout();
+ self.as_mut().get_unchecked_mut().delay.reset_timeout();
}
- return Ok(Async::Ready(v));
+ return Poll::Ready(v.map(Ok));
}
- Ok(Async::NotReady) => {}
- Err(e) => return Err(Error::inner(e)),
- }
-
- // Now check the timer
- match self.delay.poll() {
- Ok(Async::NotReady) => Ok(Async::NotReady),
- Ok(Async::Ready(_)) => {
- self.delay.reset_timeout();
- Err(Error::elapsed())
- }
- Err(e) => Err(Error::timer(e)),
- }
- }
-}
-
-// ===== impl Error =====
-
-impl<T> Error<T> {
- /// Create a new `Error` representing the inner value completing with `Err`.
- pub fn inner(err: T) -> Error<T> {
- Error(Kind::Inner(err))
- }
-
- /// Returns `true` if the error was caused by the inner value completing
- /// with `Err`.
- pub fn is_inner(&self) -> bool {
- match self.0 {
- Kind::Inner(_) => true,
- _ => false,
- }
- }
-
- /// Consumes `self`, returning the inner future error.
- pub fn into_inner(self) -> Option<T> {
- match self.0 {
- Kind::Inner(err) => Some(err),
- _ => None,
- }
- }
-
- /// Create a new `Error` representing the inner value not completing before
- /// the deadline is reached.
- pub fn elapsed() -> Error<T> {
- Error(Kind::Elapsed)
- }
-
- /// Returns `true` if the error was caused by the inner value not completing
- /// before the deadline is reached.
- pub fn is_elapsed(&self) -> bool {
- match self.0 {
- Kind::Elapsed => true,
- _ => false,
- }
- }
-
- /// Creates a new `Error` representing an error encountered by the timer
- /// implementation
- pub fn timer(err: crate::Error) -> Error<T> {
- Error(Kind::Timer(err))
- }
-
- /// Returns `true` if the error was caused by the timer.
- pub fn is_timer(&self) -> bool {
- match self.0 {
- Kind::Timer(_) => true,
- _ => false,
- }
- }
- /// Consumes `self`, returning the error raised by the timer implementation.
- pub fn into_timer(self) -> Option<crate::Error> {
- match self.0 {
- Kind::Timer(err) => Some(err),
- _ => None,
+ // Now check the timer
+ ready!(self.map_unchecked_mut(|me| &mut me.delay).poll(cx));
+ // if delay was ready, timeout elapsed!
+ Poll::Ready(Some(Err(Elapsed(()))))
}
}
}
-impl<T: error::Error> error::Error for Error<T> {
- fn description(&self) -> &str {
- use self::Kind::*;
+// ===== impl Elapsed =====
- match self.0 {
- Inner(ref e) => e.description(),
- Elapsed => "deadline has elapsed",
- Timer(ref e) => e.description(),
- }
- }
-}
-
-impl<T: fmt::Display> fmt::Display for Error<T> {
+impl fmt::Display for Elapsed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- use self::Kind::*;
-
- match self.0 {
- Inner(ref e) => e.fmt(fmt),
- Elapsed => "deadline has elapsed".fmt(fmt),
- Timer(ref e) => e.fmt(fmt),
- }
+ "deadline has elapsed".fmt(fmt)
}
}
+
+impl std::error::Error for Elapsed {}