diff options
author | Blas Rodriguez Irizar <rodrigblas@gmail.com> | 2020-09-02 11:52:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 11:52:31 +0200 |
commit | 5cdb6f8fd6be35f971d8ef7ea8984f4d01965620 (patch) | |
tree | 924787133275b3fd92d3b6e53382e2e32024c79f | |
parent | 5a1a6dc90c6d5a7eb5f31ae215f9ec383d6767aa (diff) |
time: move throttle to StreamExt (#2752)
Ref: #2727
-rw-r--r-- | tokio/src/stream/mod.rs | 30 | ||||
-rw-r--r-- | tokio/src/stream/throttle.rs (renamed from tokio/src/time/throttle.rs) | 22 | ||||
-rw-r--r-- | tokio/src/time/mod.rs | 5 | ||||
-rw-r--r-- | tokio/tests/time_throttle.rs | 8 |
4 files changed, 33 insertions, 32 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 7b061efe..bc6eea23 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -71,7 +71,9 @@ use take_while::TakeWhile; cfg_time! { mod timeout; use timeout::Timeout; - use std::time::Duration; + use crate::time::Duration; + mod throttle; + use crate::stream::throttle::{throttle, Throttle}; } pub use futures_core::Stream; @@ -819,6 +821,32 @@ pub trait StreamExt: Stream { { Timeout::new(self, duration) } + /// Slows down a stream by enforcing a delay between items. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio::stream::StreamExt; + /// + /// # async fn dox() { + /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + throttle(duration, self) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} diff --git a/tokio/src/time/throttle.rs b/tokio/src/stream/throttle.rs index d53a6f76..1200b384 100644 --- a/tokio/src/time/throttle.rs +++ b/tokio/src/stream/throttle.rs @@ -10,27 +10,7 @@ use std::task::{self, Poll}; use pin_project_lite::pin_project; -/// Slows down a stream by enforcing a delay between items. -/// They will be produced not more often than the specified interval. -/// -/// # Example -/// -/// Create a throttled stream. -/// ```rust,no_run -/// use std::time::Duration; -/// use tokio::stream::StreamExt; -/// use tokio::time::throttle; -/// -/// # async fn dox() { -/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one")); -/// -/// loop { -/// // The string will be produced at most every 2 seconds -/// println!("{:?}", item_stream.next().await); -/// } -/// # } -/// ``` -pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> +pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> where T: Stream, { diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index c532b2c1..2c6df43c 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -118,11 +118,6 @@ mod timeout; #[doc(inline)] pub use timeout::{timeout, timeout_at, Elapsed, Timeout}; -cfg_stream! { - mod throttle; - pub use throttle::{throttle, Throttle}; -} - mod wheel; #[cfg(test)] diff --git a/tokio/tests/time_throttle.rs b/tokio/tests/time_throttle.rs index 7102d173..c886319f 100644 --- a/tokio/tests/time_throttle.rs +++ b/tokio/tests/time_throttle.rs @@ -1,7 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, throttle}; +use tokio::stream::StreamExt; +use tokio::time; use tokio_test::*; use std::time::Duration; @@ -10,10 +11,7 @@ use std::time::Duration; async fn usage() { time::pause(); - let mut stream = task::spawn(throttle( - Duration::from_millis(100), - futures::stream::repeat(()), - )); + let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100))); assert_ready!(stream.poll_next()); assert_pending!(stream.poll_next()); |