diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/time/interval.rs | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/time/interval.rs')
-rw-r--r-- | tokio/src/time/interval.rs | 150 |
1 files changed, 89 insertions, 61 deletions
diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index e60a8b8c..f9fa1127 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -1,60 +1,93 @@ -use crate::time::{Delay, Duration, Instant}; +use crate::future::poll_fn; +use crate::time::{delay_until, Delay, Duration, Instant}; -use futures_core::ready; -use futures_util::future::poll_fn; use std::future::Future; use std::pin::Pin; -use std::task::{self, Poll}; +use std::task::{Context, Poll}; -/// A stream representing notifications at fixed interval +/// Creates new `Interval` that yields with interval of `duration`. The first +/// tick completes immediately. +/// +/// An interval will tick indefinitely. At any time, the `Interval` value can be +/// dropped. This cancels the interval. +/// +/// This function is equivalent to `interval_at(Instant::now(), period)`. +/// +/// # Panics +/// +/// This function panics if `period` is zero. +/// +/// # Examples +/// +/// ``` +/// use tokio::time::{self, Duration}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut interval = time::interval(Duration::from_millis(10)); +/// +/// interval.tick().await; +/// interval.tick().await; +/// interval.tick().await; +/// +/// // approximately 30ms have elapsed. +/// } +/// ``` +pub fn interval(period: Duration) -> Interval { + assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + + interval_at(Instant::now(), period) +} + +/// Creates new `Interval` that yields with interval of `period` with the +/// first tick completing at `at`. +/// +/// An interval will tick indefinitely. At any time, the `Interval` value can be +/// dropped. This cancels the interval. +/// +/// # Panics +/// +/// This function panics if `period` is zero. +/// +/// # Examples +/// +/// ``` +/// use tokio::time::{interval_at, Duration, Instant}; +/// +/// #[tokio::main] +/// async fn main() { +/// let start = Instant::now() + Duration::from_millis(50); +/// let mut interval = interval_at(start, Duration::from_millis(10)); +/// +/// interval.tick().await; +/// interval.tick().await; +/// interval.tick().await; +/// +/// // approximately 70ms have elapsed. +/// } +/// ``` +pub fn interval_at(start: Instant, period: Duration) -> Interval { + assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + + Interval { + delay: delay_until(start), + period, + } +} + +/// Stream returned by [`instant`](instant) and [`instant_at`](instant_at). #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value. delay: Delay, /// The duration between values yielded by `Interval`. - duration: Duration, + period: Duration, } impl Interval { - /// Create a new `Interval` that starts at `at` and yields every `duration` - /// interval after that. - /// - /// Note that when it starts, it produces item too. - /// - /// The `duration` argument must be a non-zero duration. - /// - /// # Panics - /// - /// This function panics if `duration` is zero. - pub fn new(at: Instant, duration: Duration) -> Interval { - assert!( - duration > Duration::new(0, 0), - "`duration` must be non-zero." - ); - - Interval::new_with_delay(Delay::new(at), duration) - } - - /// Creates new `Interval` that yields with interval of `duration`. - /// - /// The function is shortcut for `Interval::new(tokio::time::clock::now() + duration, duration)`. - /// - /// The `duration` argument must be a non-zero duration. - /// - /// # Panics - /// - /// This function panics if `duration` is zero. - pub fn new_interval(duration: Duration) -> Interval { - Interval::new(Instant::now() + duration, duration) - } - - pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval { - Interval { delay, duration } - } - - #[doc(hidden)] // TODO: remove - pub fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Instant>> { + #[doc(hidden)] // TODO: document + pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> { // Wait for the delay to be done ready!(Pin::new(&mut self.delay).poll(cx)); @@ -63,11 +96,11 @@ impl Interval { // The next interval value is `duration` after the one that just // yielded. - let next = now + self.duration; + let next = now + self.period; self.delay.reset(next); // Return the current instant - Poll::Ready(Some(now)) + Poll::Ready(now) } /// Completes when the next instant in the interval has been reached. @@ -75,37 +108,32 @@ impl Interval { /// # Examples /// /// ``` - /// use tokio::time::Interval; + /// use tokio::time; /// /// use std::time::Duration; /// /// #[tokio::main] /// async fn main() { - /// let mut interval = Interval::new_interval(Duration::from_millis(10)); + /// let mut interval = time::interval(Duration::from_millis(10)); /// - /// interval.next().await; - /// interval.next().await; - /// interval.next().await; + /// interval.tick().await; + /// interval.tick().await; + /// interval.tick().await; /// /// // approximately 30ms have elapsed. /// } /// ``` #[allow(clippy::should_implement_trait)] // TODO: rename (tokio-rs/tokio#1261) - pub async fn next(&mut self) -> Option<Instant> { - poll_fn(|cx| self.poll_next(cx)).await - } -} - -impl futures_core::FusedStream for Interval { - fn is_terminated(&self) -> bool { - false + pub async fn tick(&mut self) -> Instant { + poll_fn(|cx| self.poll_tick(cx)).await } } +#[cfg(feature = "stream")] impl futures_core::Stream for Interval { type Item = Instant; - fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { - Interval::poll_next(self.get_mut(), cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> { + Poll::Ready(Some(ready!(self.poll_tick(cx)))) } } |