summaryrefslogtreecommitdiffstats
path: root/tokio/src/time/timeout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/time/timeout.rs')
-rw-r--r--tokio/src/time/timeout.rs195
1 files changed, 70 insertions, 125 deletions
diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs
index 2cc35082..3a66a826 100644
--- a/tokio/src/time/timeout.rs
+++ b/tokio/src/time/timeout.rs
@@ -1,73 +1,104 @@
-//! Allows a future or stream to execute for a maximum amount of time.
+//! Allows a future to execute for a maximum amount of time.
//!
//! See [`Timeout`] documentation for more details.
//!
//! [`Timeout`]: struct.Timeout.html
-use crate::time::clock::now;
-use crate::time::{Delay, Duration, Instant};
+use crate::time::{delay_until, Delay, Duration, Instant};
-use futures_core::ready;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
-/// Allows a `Future` or `Stream` to execute for a limited amount of time.
+/// Require a `Future` to complete before the specified duration has elapsed.
///
-/// If the future or stream completes before the timeout has expired, then
-/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
-/// [`Error`].
+/// If the future completes before the duration has elapsed, then the completed
+/// value is returned. Otherwise, an error is returned.
///
-/// # Futures and Streams
+/// # Cancelation
///
-/// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
-/// In the case of a `Future`, `Timeout` will require the future to complete by
-/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
-/// to take the entire timeout before returning an error.
+/// Cancelling a timeout is done by dropping the future. No additional cleanup
+/// or other work is required.
///
-/// In order to set an upper bound on the processing of the *entire* stream,
-/// then a timeout should be set on the future that processes the stream. For
-/// example:
+/// The original future may be obtained by calling [`Timeout::into_inner`]. This
+/// consumes the `Timeout`.
///
-/// ```rust,no_run
-/// use tokio::prelude::*;
-/// use tokio::sync::mpsc;
+/// # Examples
///
-/// use std::thread;
-/// use std::time::Duration;
+/// Create a new `Timeout` set to expire in 10 milliseconds.
///
-/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
-/// let (mut tx, rx) = mpsc::unbounded_channel();
+/// ```rust
+/// use tokio::time::timeout;
+/// use tokio::sync::oneshot;
///
-/// thread::spawn(move || {
-/// tx.try_send(()).unwrap();
-/// thread::sleep(Duration::from_millis(10));
-/// tx.try_send(()).unwrap();
-/// });
+/// use std::time::Duration;
///
-/// let process = rx.for_each(|item| {
-/// // do something with `item`
-/// # drop(item);
-/// # tokio::future::ready(())
-/// });
+/// # async fn dox() {
+/// let (tx, rx) = oneshot::channel();
+/// # tx.send(()).unwrap();
///
/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
-/// process.timeout(Duration::from_millis(10)).await?;
-/// # Ok(())
+/// if let Err(_) = timeout(Duration::from_millis(10), rx).await {
+/// println!("did not receive value within 10 ms");
+/// }
/// # }
/// ```
+pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
+where
+ T: Future,
+{
+ let delay = Delay::new_timeout(Instant::now() + duration, duration);
+ Timeout::new_with_delay(future, delay)
+}
+
+/// Require a `Future` to complete before the specified instant in time.
+///
+/// If the future completes before the instant is reached, then the completed
+/// value is returned. Otherwise, an error is returned.
///
/// # Cancelation
///
-/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
+/// Cancelling a timeout is done by dropping the future. No additional cleanup
/// or other work is required.
///
-/// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This
+/// The original future may be obtained by calling [`Timeout::into_inner`]. This
/// consumes the `Timeout`.
///
-/// [`Error`]: struct.Error.html
-/// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter
+/// # Examples
+///
+/// Create a new `Timeout` set to expire in 10 milliseconds.
+///
+/// ```rust
+/// use tokio::time::{Instant, timeout_at};
+/// use tokio::sync::oneshot;
+///
+/// use std::time::Duration;
+///
+/// # async fn dox() {
+/// let (tx, rx) = oneshot::channel();
+/// # tx.send(()).unwrap();
+///
+/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the
+/// // future.
+/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await {
+/// println!("did not receive value within 10 ms");
+/// }
+/// # }
+/// ```
+pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T>
+where
+ T: Future,
+{
+ let delay = delay_until(deadline);
+
+ Timeout {
+ value: future,
+ delay,
+ }
+}
+
+/// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Timeout<T> {
@@ -80,39 +111,6 @@ pub struct Timeout<T> {
pub struct Elapsed(());
impl<T> Timeout<T> {
- /// Create a new `Timeout` that allows `value` to execute for a duration of
- /// at most `timeout`.
- ///
- /// The exact behavior depends on if `value` is a `Future` or a `Stream`.
- ///
- /// See [type] level documentation for more details.
- ///
- /// [type]: #
- ///
- /// # Examples
- ///
- /// Create a new `Timeout` set to expire in 10 milliseconds.
- ///
- /// ```rust
- /// use tokio::time::Timeout;
- /// use tokio::sync::oneshot;
- ///
- /// use std::time::Duration;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let (tx, rx) = oneshot::channel();
- /// # tx.send(()).unwrap();
- ///
- /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
- /// Timeout::new(rx, Duration::from_millis(10)).await??;
- /// # Ok(())
- /// # }
- /// ```
- pub fn new(value: T, timeout: Duration) -> Timeout<T> {
- let delay = Delay::new_timeout(now() + timeout, timeout);
- Timeout::new_with_delay(value, delay)
- }
-
pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
Timeout { value, delay }
}
@@ -133,24 +131,6 @@ impl<T> Timeout<T> {
}
}
-impl<T: Future> Timeout<T> {
- /// Create a new `Timeout` that completes when `future` completes or when
- /// `deadline` is reached.
- ///
- /// This function differs from `new` in that:
- ///
- /// * It only accepts `Future` arguments.
- /// * It sets an explicit `Instant` at which the timeout expires.
- pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
- let delay = Delay::new(deadline);
-
- Timeout {
- value: future,
- delay,
- }
- }
-}
-
impl<T> Future for Timeout<T>
where
T: Future,
@@ -179,41 +159,6 @@ where
}
}
-impl<T> futures_core::Stream for Timeout<T>
-where
- T: futures_core::Stream,
-{
- type Item = Result<T::Item, Elapsed>;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
- // Safety: T might be !Unpin, but we never move neither `value`
- // nor `delay`.
- //
- // ... X_X
- unsafe {
- // First, try polling the future
- let v = self
- .as_mut()
- .map_unchecked_mut(|me| &mut me.value)
- .poll_next(cx);
-
- if let Poll::Ready(v) = v {
- if v.is_some() {
- self.as_mut().get_unchecked_mut().delay.reset_timeout();
- }
- return Poll::Ready(v.map(Ok));
- }
-
- // Now check the timer
- ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx));
-
- // if delay was ready, timeout elapsed!
- self.as_mut().get_unchecked_mut().delay.reset_timeout();
- Poll::Ready(Some(Err(Elapsed(()))))
- }
- }
-}
-
// ===== impl Elapsed =====
impl fmt::Display for Elapsed {