diff options
Diffstat (limited to 'tokio/src/time/timeout.rs')
-rw-r--r-- | tokio/src/time/timeout.rs | 195 |
1 files changed, 70 insertions, 125 deletions
diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 2cc35082..3a66a826 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -1,73 +1,104 @@ -//! Allows a future or stream to execute for a maximum amount of time. +//! Allows a future to execute for a maximum amount of time. //! //! See [`Timeout`] documentation for more details. //! //! [`Timeout`]: struct.Timeout.html -use crate::time::clock::now; -use crate::time::{Delay, Duration, Instant}; +use crate::time::{delay_until, Delay, Duration, Instant}; -use futures_core::ready; use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; -/// Allows a `Future` or `Stream` to execute for a limited amount of time. +/// Require a `Future` to complete before the specified duration has elapsed. /// -/// If the future or stream completes before the timeout has expired, then -/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an -/// [`Error`]. +/// If the future completes before the duration has elapsed, then the completed +/// value is returned. Otherwise, an error is returned. /// -/// # Futures and Streams +/// # Cancelation /// -/// The exact behavor depends on if the inner value is a `Future` or a `Stream`. -/// In the case of a `Future`, `Timeout` will require the future to complete by -/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item -/// to take the entire timeout before returning an error. +/// Cancelling a timeout is done by dropping the future. No additional cleanup +/// or other work is required. /// -/// In order to set an upper bound on the processing of the *entire* stream, -/// then a timeout should be set on the future that processes the stream. For -/// example: +/// The original future may be obtained by calling [`Timeout::into_inner`]. This +/// consumes the `Timeout`. /// -/// ```rust,no_run -/// use tokio::prelude::*; -/// use tokio::sync::mpsc; +/// # Examples /// -/// use std::thread; -/// use std::time::Duration; +/// Create a new `Timeout` set to expire in 10 milliseconds. /// -/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { -/// let (mut tx, rx) = mpsc::unbounded_channel(); +/// ```rust +/// use tokio::time::timeout; +/// use tokio::sync::oneshot; /// -/// thread::spawn(move || { -/// tx.try_send(()).unwrap(); -/// thread::sleep(Duration::from_millis(10)); -/// tx.try_send(()).unwrap(); -/// }); +/// use std::time::Duration; /// -/// let process = rx.for_each(|item| { -/// // do something with `item` -/// # drop(item); -/// # tokio::future::ready(()) -/// }); +/// # async fn dox() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); /// /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. -/// process.timeout(Duration::from_millis(10)).await?; -/// # Ok(()) +/// if let Err(_) = timeout(Duration::from_millis(10), rx).await { +/// println!("did not receive value within 10 ms"); +/// } /// # } /// ``` +pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T> +where + T: Future, +{ + let delay = Delay::new_timeout(Instant::now() + duration, duration); + Timeout::new_with_delay(future, delay) +} + +/// Require a `Future` to complete before the specified instant in time. +/// +/// If the future completes before the instant is reached, then the completed +/// value is returned. Otherwise, an error is returned. /// /// # Cancelation /// -/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup +/// Cancelling a timeout is done by dropping the future. No additional cleanup /// or other work is required. /// -/// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This +/// The original future may be obtained by calling [`Timeout::into_inner`]. This /// consumes the `Timeout`. /// -/// [`Error`]: struct.Error.html -/// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter +/// # Examples +/// +/// Create a new `Timeout` set to expire in 10 milliseconds. +/// +/// ```rust +/// use tokio::time::{Instant, timeout_at}; +/// use tokio::sync::oneshot; +/// +/// use std::time::Duration; +/// +/// # async fn dox() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the +/// // future. +/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await { +/// println!("did not receive value within 10 ms"); +/// } +/// # } +/// ``` +pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T> +where + T: Future, +{ + let delay = delay_until(deadline); + + Timeout { + value: future, + delay, + } +} + +/// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at). #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Timeout<T> { @@ -80,39 +111,6 @@ pub struct Timeout<T> { pub struct Elapsed(()); impl<T> Timeout<T> { - /// Create a new `Timeout` that allows `value` to execute for a duration of - /// at most `timeout`. - /// - /// The exact behavior depends on if `value` is a `Future` or a `Stream`. - /// - /// See [type] level documentation for more details. - /// - /// [type]: # - /// - /// # Examples - /// - /// Create a new `Timeout` set to expire in 10 milliseconds. - /// - /// ```rust - /// use tokio::time::Timeout; - /// use tokio::sync::oneshot; - /// - /// use std::time::Duration; - /// - /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { - /// let (tx, rx) = oneshot::channel(); - /// # tx.send(()).unwrap(); - /// - /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. - /// Timeout::new(rx, Duration::from_millis(10)).await??; - /// # Ok(()) - /// # } - /// ``` - pub fn new(value: T, timeout: Duration) -> Timeout<T> { - let delay = Delay::new_timeout(now() + timeout, timeout); - Timeout::new_with_delay(value, delay) - } - pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> { Timeout { value, delay } } @@ -133,24 +131,6 @@ impl<T> Timeout<T> { } } -impl<T: Future> Timeout<T> { - /// Create a new `Timeout` that completes when `future` completes or when - /// `deadline` is reached. - /// - /// This function differs from `new` in that: - /// - /// * It only accepts `Future` arguments. - /// * It sets an explicit `Instant` at which the timeout expires. - pub fn new_at(future: T, deadline: Instant) -> Timeout<T> { - let delay = Delay::new(deadline); - - Timeout { - value: future, - delay, - } - } -} - impl<T> Future for Timeout<T> where T: Future, @@ -179,41 +159,6 @@ where } } -impl<T> futures_core::Stream for Timeout<T> -where - T: futures_core::Stream, -{ - type Item = Result<T::Item, Elapsed>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { - // Safety: T might be !Unpin, but we never move neither `value` - // nor `delay`. - // - // ... X_X - unsafe { - // First, try polling the future - let v = self - .as_mut() - .map_unchecked_mut(|me| &mut me.value) - .poll_next(cx); - - if let Poll::Ready(v) = v { - if v.is_some() { - self.as_mut().get_unchecked_mut().delay.reset_timeout(); - } - return Poll::Ready(v.map(Ok)); - } - - // Now check the timer - ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx)); - - // if delay was ready, timeout elapsed! - self.as_mut().get_unchecked_mut().delay.reset_timeout(); - Poll::Ready(Some(Err(Elapsed(())))) - } - } -} - // ===== impl Elapsed ===== impl fmt::Display for Elapsed { |