summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBlas Rodriguez Irizar <rodrigblas@gmail.com>2020-09-02 11:52:31 +0200
committerGitHub <noreply@github.com>2020-09-02 11:52:31 +0200
commit5cdb6f8fd6be35f971d8ef7ea8984f4d01965620 (patch)
tree924787133275b3fd92d3b6e53382e2e32024c79f
parent5a1a6dc90c6d5a7eb5f31ae215f9ec383d6767aa (diff)
time: move throttle to StreamExt (#2752)
Ref: #2727
-rw-r--r--tokio/src/stream/mod.rs30
-rw-r--r--tokio/src/stream/throttle.rs (renamed from tokio/src/time/throttle.rs)22
-rw-r--r--tokio/src/time/mod.rs5
-rw-r--r--tokio/tests/time_throttle.rs8
4 files changed, 33 insertions, 32 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index 7b061efe..bc6eea23 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -71,7 +71,9 @@ use take_while::TakeWhile;
cfg_time! {
mod timeout;
use timeout::Timeout;
- use std::time::Duration;
+ use crate::time::Duration;
+ mod throttle;
+ use crate::stream::throttle::{throttle, Throttle};
}
pub use futures_core::Stream;
@@ -819,6 +821,32 @@ pub trait StreamExt: Stream {
{
Timeout::new(self, duration)
}
+ /// Slows down a stream by enforcing a delay between items.
+ ///
+ /// # Example
+ ///
+ /// Create a throttled stream.
+ /// ```rust,no_run
+ /// use std::time::Duration;
+ /// use tokio::stream::StreamExt;
+ ///
+ /// # async fn dox() {
+ /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
+ ///
+ /// loop {
+ /// // The string will be produced at most every 2 seconds
+ /// println!("{:?}", item_stream.next().await);
+ /// }
+ /// # }
+ /// ```
+ #[cfg(all(feature = "time"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ fn throttle(self, duration: Duration) -> Throttle<Self>
+ where
+ Self: Sized,
+ {
+ throttle(duration, self)
+ }
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
diff --git a/tokio/src/time/throttle.rs b/tokio/src/stream/throttle.rs
index d53a6f76..1200b384 100644
--- a/tokio/src/time/throttle.rs
+++ b/tokio/src/stream/throttle.rs
@@ -10,27 +10,7 @@ use std::task::{self, Poll};
use pin_project_lite::pin_project;
-/// Slows down a stream by enforcing a delay between items.
-/// They will be produced not more often than the specified interval.
-///
-/// # Example
-///
-/// Create a throttled stream.
-/// ```rust,no_run
-/// use std::time::Duration;
-/// use tokio::stream::StreamExt;
-/// use tokio::time::throttle;
-///
-/// # async fn dox() {
-/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
-///
-/// loop {
-/// // The string will be produced at most every 2 seconds
-/// println!("{:?}", item_stream.next().await);
-/// }
-/// # }
-/// ```
-pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
+pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
where
T: Stream,
{
diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs
index c532b2c1..2c6df43c 100644
--- a/tokio/src/time/mod.rs
+++ b/tokio/src/time/mod.rs
@@ -118,11 +118,6 @@ mod timeout;
#[doc(inline)]
pub use timeout::{timeout, timeout_at, Elapsed, Timeout};
-cfg_stream! {
- mod throttle;
- pub use throttle::{throttle, Throttle};
-}
-
mod wheel;
#[cfg(test)]
diff --git a/tokio/tests/time_throttle.rs b/tokio/tests/time_throttle.rs
index 7102d173..c886319f 100644
--- a/tokio/tests/time_throttle.rs
+++ b/tokio/tests/time_throttle.rs
@@ -1,7 +1,8 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::time::{self, throttle};
+use tokio::stream::StreamExt;
+use tokio::time;
use tokio_test::*;
use std::time::Duration;
@@ -10,10 +11,7 @@ use std::time::Duration;
async fn usage() {
time::pause();
- let mut stream = task::spawn(throttle(
- Duration::from_millis(100),
- futures::stream::repeat(()),
- ));
+ let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100)));
assert_ready!(stream.poll_next());
assert_pending!(stream.poll_next());