summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorJuan Alvarez <j@yabit.io>2020-01-24 17:22:56 -0600
committerCarl Lerche <me@carllerche.com>2020-01-24 15:22:56 -0800
commit12be90e3fff4041ea6398fc8cd834c3ec173bce5 (patch)
tree3de6160d9c15889ac2346139c5d214c1c7f1d67a /tokio/src/stream
parent0d49e112b2a7fc3cc268c1c140d0130d865af760 (diff)
stream: add StreamExt::timeout() (#2149)
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/mod.rs69
-rw-r--r--tokio/src/stream/timeout.rs65
2 files changed, 134 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index 3c09b658..82771eee 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -59,6 +59,12 @@ use take::Take;
mod take_while;
use take_while::TakeWhile;
+cfg_time! {
+ mod timeout;
+ use timeout::Timeout;
+ use std::time::Duration;
+}
+
pub use futures_core::Stream;
/// An extension trait for `Stream`s that provides a variety of convenient
@@ -680,6 +686,69 @@ pub trait StreamExt: Stream {
{
Collect::new(self)
}
+
+ /// Applies a per-item timeout to the passed stream.
+ ///
+ /// `timeout()` takes a `Duration` that represents the maximum amount of
+ /// time each element of the stream has to complete before timing out.
+ ///
+ /// If the wrapped stream yields a value before the deadline is reached, the
+ /// value is returned. Otherwise, an error is returned. The caller may decide
+ /// to continue consuming the stream and will eventually get the next source
+ /// stream value once it becomes available.
+ ///
+ /// # Notes
+ ///
+ /// This function consumes the stream passed into it and returns a
+ /// wrapped version of it.
+ ///
+ /// Polling the returned stream will continue to poll the inner stream even
+ /// if one or more items time out.
+ ///
+ /// # Examples
+ ///
+ /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ /// use std::time::Duration;
+ /// # let int_stream = stream::iter(1..=3);
+ ///
+ /// let mut int_stream = int_stream.timeout(Duration::from_secs(1));
+ ///
+ /// // When no items time out, we get the 3 elements in succession:
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If the second item times out, we get an error and continue polling the stream:
+ /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert!(int_stream.try_next().await.is_err());
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ ///
+ /// // If we want to stop consuming the source stream the first time an
+ /// // element times out, we can use the `take_while` operator:
+ /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+ /// let mut int_stream = int_stream.take_while(Result::is_ok);
+ ///
+ /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+ /// assert_eq!(int_stream.try_next().await, Ok(None));
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn timeout(self, duration: Duration) -> Timeout<Self>
+ where
+ Self: Sized,
+ {
+ Timeout::new(self, duration)
+ }
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
diff --git a/tokio/src/stream/timeout.rs b/tokio/src/stream/timeout.rs
new file mode 100644
index 00000000..b8a2024f
--- /dev/null
+++ b/tokio/src/stream/timeout.rs
@@ -0,0 +1,65 @@
+use crate::stream::{Fuse, Stream};
+use crate::time::{Delay, Elapsed, Instant};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct Timeout<S> {
+ #[pin]
+ stream: Fuse<S>,
+ deadline: Delay,
+ duration: Duration,
+ poll_deadline: bool,
+ }
+}
+
+impl<S: Stream> Timeout<S> {
+ pub(super) fn new(stream: S, duration: Duration) -> Self {
+ let next = Instant::now() + duration;
+ let deadline = Delay::new_timeout(next, duration);
+
+ Timeout {
+ stream: Fuse::new(stream),
+ deadline,
+ duration,
+ poll_deadline: true,
+ }
+ }
+}
+
+impl<S: Stream> Stream for Timeout<S> {
+ type Item = Result<S::Item, Elapsed>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match self.as_mut().project().stream.poll_next(cx) {
+ Poll::Ready(v) => {
+ if v.is_some() {
+ let next = Instant::now() + self.duration;
+ self.as_mut().project().deadline.reset(next);
+ *self.as_mut().project().poll_deadline = true;
+ }
+ return Poll::Ready(v.map(Ok));
+ }
+ Poll::Pending => {}
+ };
+
+ if self.poll_deadline {
+ ready!(Pin::new(self.as_mut().project().deadline).poll(cx));
+ *self.as_mut().project().poll_deadline = false;
+ return Poll::Ready(Some(Err(Elapsed::new())));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}