diff options
author | Juan Alvarez <j@yabit.io> | 2020-01-24 17:22:56 -0600 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2020-01-24 15:22:56 -0800 |
commit | 12be90e3fff4041ea6398fc8cd834c3ec173bce5 (patch) | |
tree | 3de6160d9c15889ac2346139c5d214c1c7f1d67a /tokio/src/stream | |
parent | 0d49e112b2a7fc3cc268c1c140d0130d865af760 (diff) |
stream: add StreamExt::timeout() (#2149)
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/mod.rs | 69 | ||||
-rw-r--r-- | tokio/src/stream/timeout.rs | 65 |
2 files changed, 134 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 3c09b658..82771eee 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -59,6 +59,12 @@ use take::Take; mod take_while; use take_while::TakeWhile; +cfg_time! { + mod timeout; + use timeout::Timeout; + use std::time::Duration; +} + pub use futures_core::Stream; /// An extension trait for `Stream`s that provides a variety of convenient @@ -680,6 +686,69 @@ pub trait StreamExt: Stream { { Collect::new(self) } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let mut int_stream = int_stream.timeout(Duration::from_secs(1)); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout(self, duration: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, duration) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} diff --git a/tokio/src/stream/timeout.rs b/tokio/src/stream/timeout.rs new file mode 100644 index 00000000..b8a2024f --- /dev/null +++ b/tokio/src/stream/timeout.rs @@ -0,0 +1,65 @@ +use crate::stream::{Fuse, Stream}; +use crate::time::{Delay, Elapsed, Instant}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Timeout<S> { + #[pin] + stream: Fuse<S>, + deadline: Delay, + duration: Duration, + poll_deadline: bool, + } +} + +impl<S: Stream> Timeout<S> { + pub(super) fn new(stream: S, duration: Duration) -> Self { + let next = Instant::now() + duration; + let deadline = Delay::new_timeout(next, duration); + + Timeout { + stream: Fuse::new(stream), + deadline, + duration, + poll_deadline: true, + } + } +} + +impl<S: Stream> Stream for Timeout<S> { + type Item = Result<S::Item, Elapsed>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match self.as_mut().project().stream.poll_next(cx) { + Poll::Ready(v) => { + if v.is_some() { + let next = Instant::now() + self.duration; + self.as_mut().project().deadline.reset(next); + *self.as_mut().project().poll_deadline = true; + } + return Poll::Ready(v.map(Ok)); + } + Poll::Pending => {} + }; + + if self.poll_deadline { + ready!(Pin::new(self.as_mut().project().deadline).poll(cx)); + *self.as_mut().project().poll_deadline = false; + return Poll::Ready(Some(Err(Elapsed::new()))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} |