summaryrefslogtreecommitdiffstats
path: root/tokio/src/time/timeout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/time/timeout.rs')
-rw-r--r--tokio/src/time/timeout.rs232
1 files changed, 232 insertions, 0 deletions
diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs
new file mode 100644
index 00000000..e1fdb252
--- /dev/null
+++ b/tokio/src/time/timeout.rs
@@ -0,0 +1,232 @@
+//! Allows a future or stream to execute for a maximum amount of time.
+//!
+//! See [`Timeout`] documentation for more details.
+//!
+//! [`Timeout`]: struct.Timeout.html
+
+use crate::time::clock::now;
+use crate::time::Delay;
+
+use futures_core::ready;
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
+use std::time::{Duration, Instant};
+
+/// Allows a `Future` or `Stream` to execute for a limited amount of time.
+///
+/// If the future or stream completes before the timeout has expired, then
+/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
+/// [`Error`].
+///
+/// # Futures and Streams
+///
+/// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
+/// In the case of a `Future`, `Timeout` will require the future to complete by
+/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
+/// to take the entire timeout before returning an error.
+///
+/// In order to set an upper bound on the processing of the *entire* stream,
+/// then a timeout should be set on the future that processes the stream. For
+/// example:
+///
+/// ```rust,no_run
+/// use tokio::prelude::*;
+/// use tokio::sync::mpsc;
+///
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let (mut tx, rx) = mpsc::unbounded_channel();
+///
+/// thread::spawn(move || {
+/// tx.try_send(()).unwrap();
+/// thread::sleep(Duration::from_millis(10));
+/// tx.try_send(()).unwrap();
+/// });
+///
+/// let process = rx.for_each(|item| {
+/// // do something with `item`
+/// # drop(item);
+/// # tokio::future::ready(())
+/// });
+///
+/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
+/// process.timeout(Duration::from_millis(10)).await?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # Cancelation
+///
+/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
+/// or other work is required.
+///
+/// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This
+/// consumes the `Timeout`.
+///
+/// [`Error`]: struct.Error.html
+/// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct Timeout<T> {
+ value: T,
+ delay: Delay,
+}
+
+/// Error returned by `Timeout`.
+#[derive(Debug)]
+pub struct Elapsed(());
+
+impl<T> Timeout<T> {
+ /// Create a new `Timeout` that allows `value` to execute for a duration of
+ /// at most `timeout`.
+ ///
+ /// The exact behavior depends on if `value` is a `Future` or a `Stream`.
+ ///
+ /// See [type] level documentation for more details.
+ ///
+ /// [type]: #
+ ///
+ /// # Examples
+ ///
+ /// Create a new `Timeout` set to expire in 10 milliseconds.
+ ///
+ /// ```rust
+ /// use tokio::time::Timeout;
+ /// use tokio::sync::oneshot;
+ ///
+ /// use std::time::Duration;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let (tx, rx) = oneshot::channel();
+ /// # tx.send(()).unwrap();
+ ///
+ /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
+ /// Timeout::new(rx, Duration::from_millis(10)).await??;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn new(value: T, timeout: Duration) -> Timeout<T> {
+ let delay = Delay::new_timeout(now() + timeout, timeout);
+ Timeout::new_with_delay(value, delay)
+ }
+
+ pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
+ Timeout { value, delay }
+ }
+
+ /// Gets a reference to the underlying value in this timeout.
+ pub fn get_ref(&self) -> &T {
+ &self.value
+ }
+
+ /// Gets a mutable reference to the underlying value in this timeout.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.value
+ }
+
+ /// Consumes this timeout, returning the underlying value.
+ pub fn into_inner(self) -> T {
+ self.value
+ }
+}
+
+impl<T: Future> Timeout<T> {
+ /// Create a new `Timeout` that completes when `future` completes or when
+ /// `deadline` is reached.
+ ///
+ /// This function differs from `new` in that:
+ ///
+ /// * It only accepts `Future` arguments.
+ /// * It sets an explicit `Instant` at which the timeout expires.
+ pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
+ let delay = Delay::new(deadline);
+
+ Timeout {
+ value: future,
+ delay,
+ }
+ }
+}
+
+impl<T> Future for Timeout<T>
+where
+ T: Future,
+{
+ type Output = Result<T::Output, Elapsed>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ // First, try polling the future
+
+ // Safety: we never move `self.value`
+ unsafe {
+ let p = self.as_mut().map_unchecked_mut(|me| &mut me.value);
+ if let Poll::Ready(v) = p.poll(cx) {
+ return Poll::Ready(Ok(v));
+ }
+ }
+
+ // Now check the timer
+ // Safety: X_X!
+ unsafe {
+ match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) {
+ Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+ }
+}
+
+impl<T> futures_core::Stream for Timeout<T>
+where
+ T: futures_core::Stream,
+{
+ type Item = Result<T::Item, Elapsed>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ // Safety: T might be !Unpin, but we never move neither `value`
+ // nor `delay`.
+ //
+ // ... X_X
+ unsafe {
+ // First, try polling the future
+ let v = self
+ .as_mut()
+ .map_unchecked_mut(|me| &mut me.value)
+ .poll_next(cx);
+
+ if let Poll::Ready(v) = v {
+ if v.is_some() {
+ self.as_mut().get_unchecked_mut().delay.reset_timeout();
+ }
+ return Poll::Ready(v.map(Ok));
+ }
+
+ // Now check the timer
+ ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx));
+
+ // if delay was ready, timeout elapsed!
+ self.as_mut().get_unchecked_mut().delay.reset_timeout();
+ Poll::Ready(Some(Err(Elapsed(()))))
+ }
+ }
+}
+
+// ===== impl Elapsed =====
+
+impl fmt::Display for Elapsed {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "deadline has elapsed".fmt(fmt)
+ }
+}
+
+impl std::error::Error for Elapsed {}
+
+impl From<Elapsed> for std::io::Error {
+ fn from(_err: Elapsed) -> std::io::Error {
+ std::io::ErrorKind::TimedOut.into()
+ }
+}