//! Allows a future or stream to execute for a maximum amount of time. //! //! See [`Timeout`] documentation for more details. //! //! [`Timeout`]: struct.Timeout.html use crate::clock::now; use crate::Delay; #[cfg(feature = "async-traits")] use futures_core::ready; use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; use std::time::{Duration, Instant}; /// Allows a `Future` or `Stream` to execute for a limited amount of time. /// /// If the future or stream completes before the timeout has expired, then /// `Timeout` returns the completed value. Otherwise, `Timeout` returns an /// [`Error`]. /// /// # Futures and Streams /// /// The exact behavor depends on if the inner value is a `Future` or a `Stream`. /// In the case of a `Future`, `Timeout` will require the future to complete by /// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item /// to take the entire timeout before returning an error. /// /// In order to set an upper bound on the processing of the *entire* stream, /// then a timeout should be set on the future that processes the stream. For /// example: /// /// ```rust,no_run /// use tokio::prelude::*; /// use tokio::sync::mpsc; /// /// use std::thread; /// use std::time::Duration; /// /// # async fn dox() { /// let (mut tx, rx) = mpsc::unbounded_channel(); /// /// thread::spawn(move || { /// tx.try_send(()).unwrap(); /// thread::sleep(Duration::from_millis(10)); /// tx.try_send(()).unwrap(); /// }); /// /// let process = rx.for_each(|item| { /// // do something with `item` /// # drop(item); /// # tokio::future::ready(()) /// }); /// /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. /// process.timeout(Duration::from_millis(10)).await; /// # } /// ``` /// /// # Cancelation /// /// Cancelling a `Timeout` is done by dropping the value. No additional cleanup /// or other work is required. /// /// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This /// consumes the `Timeout`. /// /// [`Error`]: struct.Error.html /// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Timeout { value: T, delay: Delay, } /// Error returned by `Timeout`. #[derive(Debug)] pub struct Elapsed(()); impl Timeout { /// Create a new `Timeout` that allows `value` to execute for a duration of /// at most `timeout`. /// /// The exact behavior depends on if `value` is a `Future` or a `Stream`. /// /// See [type] level documentation for more details. /// /// [type]: # /// /// # Examples /// /// Create a new `Timeout` set to expire in 10 milliseconds. /// /// ```rust /// use tokio::timer::Timeout; /// use tokio::sync::oneshot; /// /// use std::time::Duration; /// /// # async fn dox() { /// let (tx, rx) = oneshot::channel(); /// # tx.send(()).unwrap(); /// /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. /// Timeout::new(rx, Duration::from_millis(10)).await; /// } /// ``` pub fn new(value: T, timeout: Duration) -> Timeout { let delay = Delay::new_timeout(now() + timeout, timeout); Timeout::new_with_delay(value, delay) } pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout { Timeout { value, delay } } /// Gets a reference to the underlying value in this timeout. pub fn get_ref(&self) -> &T { &self.value } /// Gets a mutable reference to the underlying value in this timeout. pub fn get_mut(&mut self) -> &mut T { &mut self.value } /// Consumes this timeout, returning the underlying value. pub fn into_inner(self) -> T { self.value } } impl Timeout { /// Create a new `Timeout` that completes when `future` completes or when /// `deadline` is reached. /// /// This function differs from `new` in that: /// /// * It only accepts `Future` arguments. /// * It sets an explicit `Instant` at which the timeout expires. pub fn new_at(future: T, deadline: Instant) -> Timeout { let delay = Delay::new(deadline); Timeout { value: future, delay, } } } impl Future for Timeout where T: Future, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { // First, try polling the future // Safety: we never move `self.value` unsafe { let p = self.as_mut().map_unchecked_mut(|me| &mut me.value); if let Poll::Ready(v) = p.poll(cx) { return Poll::Ready(Ok(v)); } } // Now check the timer // Safety: X_X! unsafe { match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) { Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))), Poll::Pending => Poll::Pending, } } } } #[cfg(feature = "async-traits")] impl futures_core::Stream for Timeout where T: futures_core::Stream, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { // Safety: T might be !Unpin, but we never move neither `value` // nor `delay`. // // ... X_X unsafe { // First, try polling the future let v = self .as_mut() .map_unchecked_mut(|me| &mut me.value) .poll_next(cx); if let Poll::Ready(v) = v { if v.is_some() { self.as_mut().get_unchecked_mut().delay.reset_timeout(); } return Poll::Ready(v.map(Ok)); } // Now check the timer ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx)); // if delay was ready, timeout elapsed! self.as_mut().get_unchecked_mut().delay.reset_timeout(); Poll::Ready(Some(Err(Elapsed(())))) } } } // ===== impl Elapsed ===== impl fmt::Display for Elapsed { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { "deadline has elapsed".fmt(fmt) } } impl std::error::Error for Elapsed {} impl From for std::io::Error { fn from(_err: Elapsed) -> std::io::Error { std::io::ErrorKind::TimedOut.into() } }