diff options
author | Blas Rodriguez Irizar <rodrigblas@gmail.com> | 2020-09-02 11:52:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 11:52:31 +0200 |
commit | 5cdb6f8fd6be35f971d8ef7ea8984f4d01965620 (patch) | |
tree | 924787133275b3fd92d3b6e53382e2e32024c79f /tokio/src/stream | |
parent | 5a1a6dc90c6d5a7eb5f31ae215f9ec383d6767aa (diff) |
time: move throttle to StreamExt (#2752)
Ref: #2727
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/mod.rs | 30 | ||||
-rw-r--r-- | tokio/src/stream/throttle.rs | 97 |
2 files changed, 126 insertions, 1 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 7b061efe..bc6eea23 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -71,7 +71,9 @@ use take_while::TakeWhile; cfg_time! { mod timeout; use timeout::Timeout; - use std::time::Duration; + use crate::time::Duration; + mod throttle; + use crate::stream::throttle::{throttle, Throttle}; } pub use futures_core::Stream; @@ -819,6 +821,32 @@ pub trait StreamExt: Stream { { Timeout::new(self, duration) } + /// Slows down a stream by enforcing a delay between items. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio::stream::StreamExt; + /// + /// # async fn dox() { + /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + throttle(duration, self) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} diff --git a/tokio/src/stream/throttle.rs b/tokio/src/stream/throttle.rs new file mode 100644 index 00000000..1200b384 --- /dev/null +++ b/tokio/src/stream/throttle.rs @@ -0,0 +1,97 @@ +//! Slow down a stream by enforcing a delay between items. + +use crate::stream::Stream; +use crate::time::{Delay, Duration, Instant}; + +use std::future::Future; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{self, Poll}; + +use pin_project_lite::pin_project; + +pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> +where + T: Stream, +{ + let delay = if duration == Duration::from_millis(0) { + None + } else { + Some(Delay::new_timeout(Instant::now() + duration, duration)) + }; + + Throttle { + delay, + duration, + has_delayed: true, + stream, + } +} + +pin_project! { + /// Stream for the [`throttle`](throttle) function. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Throttle<T> { + // `None` when duration is zero. + delay: Option<Delay>, + duration: Duration, + + // Set to true when `delay` has returned ready, but `stream` hasn't. + has_delayed: bool, + + // The stream to throttle + #[pin] + stream: T, + } +} + +// XXX: are these safe if `T: !Unpin`? +impl<T: Unpin> Throttle<T> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } +} + +impl<T: Stream> Stream for Throttle<T> { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + if !self.has_delayed && self.delay.is_some() { + ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx)); + *self.as_mut().project().has_delayed = true; + } + + let value = ready!(self.as_mut().project().stream.poll_next(cx)); + + if value.is_some() { + let dur = self.duration; + if let Some(ref mut delay) = self.as_mut().project().delay { + delay.reset(Instant::now() + dur); + } + + *self.as_mut().project().has_delayed = false; + } + + Poll::Ready(value) + } +} |