From 5cdb6f8fd6be35f971d8ef7ea8984f4d01965620 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 2 Sep 2020 11:52:31 +0200 Subject: time: move throttle to StreamExt (#2752) Ref: #2727 --- tokio/src/stream/mod.rs | 30 ++++++++++- tokio/src/stream/throttle.rs | 97 +++++++++++++++++++++++++++++++++++ tokio/src/time/mod.rs | 5 -- tokio/src/time/throttle.rs | 117 ------------------------------------------- tokio/tests/time_throttle.rs | 8 ++- 5 files changed, 129 insertions(+), 128 deletions(-) create mode 100644 tokio/src/stream/throttle.rs delete mode 100644 tokio/src/time/throttle.rs diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 7b061efe..bc6eea23 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -71,7 +71,9 @@ use take_while::TakeWhile; cfg_time! { mod timeout; use timeout::Timeout; - use std::time::Duration; + use crate::time::Duration; + mod throttle; + use crate::stream::throttle::{throttle, Throttle}; } pub use futures_core::Stream; @@ -819,6 +821,32 @@ pub trait StreamExt: Stream { { Timeout::new(self, duration) } + /// Slows down a stream by enforcing a delay between items. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio::stream::StreamExt; + /// + /// # async fn dox() { + /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle + where + Self: Sized, + { + throttle(duration, self) + } } impl StreamExt for St where St: Stream {} diff --git a/tokio/src/stream/throttle.rs b/tokio/src/stream/throttle.rs new file mode 100644 index 00000000..1200b384 --- /dev/null +++ b/tokio/src/stream/throttle.rs @@ -0,0 +1,97 @@ +//! Slow down a stream by enforcing a delay between items. + +use crate::stream::Stream; +use crate::time::{Delay, Duration, Instant}; + +use std::future::Future; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{self, Poll}; + +use pin_project_lite::pin_project; + +pub(super) fn throttle(duration: Duration, stream: T) -> Throttle +where + T: Stream, +{ + let delay = if duration == Duration::from_millis(0) { + None + } else { + Some(Delay::new_timeout(Instant::now() + duration, duration)) + }; + + Throttle { + delay, + duration, + has_delayed: true, + stream, + } +} + +pin_project! { + /// Stream for the [`throttle`](throttle) function. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Throttle { + // `None` when duration is zero. + delay: Option, + duration: Duration, + + // Set to true when `delay` has returned ready, but `stream` hasn't. + has_delayed: bool, + + // The stream to throttle + #[pin] + stream: T, + } +} + +// XXX: are these safe if `T: !Unpin`? +impl Throttle { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } +} + +impl Stream for Throttle { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + if !self.has_delayed && self.delay.is_some() { + ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx)); + *self.as_mut().project().has_delayed = true; + } + + let value = ready!(self.as_mut().project().stream.poll_next(cx)); + + if value.is_some() { + let dur = self.duration; + if let Some(ref mut delay) = self.as_mut().project().delay { + delay.reset(Instant::now() + dur); + } + + *self.as_mut().project().has_delayed = false; + } + + Poll::Ready(value) + } +} diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index c532b2c1..2c6df43c 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -118,11 +118,6 @@ mod timeout; #[doc(inline)] pub use timeout::{timeout, timeout_at, Elapsed, Timeout}; -cfg_stream! { - mod throttle; - pub use throttle::{throttle, Throttle}; -} - mod wheel; #[cfg(test)] diff --git a/tokio/src/time/throttle.rs b/tokio/src/time/throttle.rs deleted file mode 100644 index d53a6f76..00000000 --- a/tokio/src/time/throttle.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! Slow down a stream by enforcing a delay between items. - -use crate::stream::Stream; -use crate::time::{Delay, Duration, Instant}; - -use std::future::Future; -use std::marker::Unpin; -use std::pin::Pin; -use std::task::{self, Poll}; - -use pin_project_lite::pin_project; - -/// Slows down a stream by enforcing a delay between items. -/// They will be produced not more often than the specified interval. -/// -/// # Example -/// -/// Create a throttled stream. -/// ```rust,no_run -/// use std::time::Duration; -/// use tokio::stream::StreamExt; -/// use tokio::time::throttle; -/// -/// # async fn dox() { -/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one")); -/// -/// loop { -/// // The string will be produced at most every 2 seconds -/// println!("{:?}", item_stream.next().await); -/// } -/// # } -/// ``` -pub fn throttle(duration: Duration, stream: T) -> Throttle -where - T: Stream, -{ - let delay = if duration == Duration::from_millis(0) { - None - } else { - Some(Delay::new_timeout(Instant::now() + duration, duration)) - }; - - Throttle { - delay, - duration, - has_delayed: true, - stream, - } -} - -pin_project! { - /// Stream for the [`throttle`](throttle) function. - #[derive(Debug)] - #[must_use = "streams do nothing unless polled"] - pub struct Throttle { - // `None` when duration is zero. - delay: Option, - duration: Duration, - - // Set to true when `delay` has returned ready, but `stream` hasn't. - has_delayed: bool, - - // The stream to throttle - #[pin] - stream: T, - } -} - -// XXX: are these safe if `T: !Unpin`? -impl Throttle { - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &T { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this combinator - /// is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the stream - /// which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut T { - &mut self.stream - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so care - /// should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> T { - self.stream - } -} - -impl Stream for Throttle { - type Item = T::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - if !self.has_delayed && self.delay.is_some() { - ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx)); - *self.as_mut().project().has_delayed = true; - } - - let value = ready!(self.as_mut().project().stream.poll_next(cx)); - - if value.is_some() { - let dur = self.duration; - if let Some(ref mut delay) = self.as_mut().project().delay { - delay.reset(Instant::now() + dur); - } - - *self.as_mut().project().has_delayed = false; - } - - Poll::Ready(value) - } -} diff --git a/tokio/tests/time_throttle.rs b/tokio/tests/time_throttle.rs index 7102d173..c886319f 100644 --- a/tokio/tests/time_throttle.rs +++ b/tokio/tests/time_throttle.rs @@ -1,7 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, throttle}; +use tokio::stream::StreamExt; +use tokio::time; use tokio_test::*; use std::time::Duration; @@ -10,10 +11,7 @@ use std::time::Duration; async fn usage() { time::pause(); - let mut stream = task::spawn(throttle( - Duration::from_millis(100), - futures::stream::repeat(()), - )); + let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100))); assert_ready!(stream.poll_next()); assert_pending!(stream.poll_next()); -- cgit v1.2.3