summaryrefslogtreecommitdiffstats
path: root/tokio/src/util
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-03-19 14:58:59 -0700
committerGitHub <noreply@github.com>2019-03-19 14:58:59 -0700
commitcdde2e7a273cbab2085b822efcf54c6bec822681 (patch)
tree68c09200286f2266027230ac4ffc51d459857ad7 /tokio/src/util
parent85487727d41574020793fbe0025a9dafc4890a70 (diff)
chore: repo maintenance + no path dependencies (#991)
- Move `tokio` into its own directory. - Remove `path` dependencies. - Run tests with once with crates.io dep and once with patched dep.
Diffstat (limited to 'tokio/src/util')
-rw-r--r--tokio/src/util/enumerate.rs84
-rw-r--r--tokio/src/util/future.rs93
-rw-r--r--tokio/src/util/mod.rs15
-rw-r--r--tokio/src/util/stream.rs95
4 files changed, 287 insertions, 0 deletions
diff --git a/tokio/src/util/enumerate.rs b/tokio/src/util/enumerate.rs
new file mode 100644
index 00000000..8f6926fa
--- /dev/null
+++ b/tokio/src/util/enumerate.rs
@@ -0,0 +1,84 @@
+use futures::{Async, Poll, Sink, StartSend, Stream};
+
+/// A stream combinator which combines the yields the current item
+/// plus its count starting from 0.
+///
+/// This structure is produced by the `Stream::enumerate` method.
+#[derive(Debug)]
+#[must_use = "Does nothing unless polled"]
+pub struct Enumerate<T> {
+ inner: T,
+ count: usize,
+}
+
+impl<T> Enumerate<T> {
+ pub(crate) fn new(stream: T) -> Self {
+ Self {
+ inner: stream,
+ count: 0,
+ }
+ }
+
+ /// Acquires a reference to the underlying stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Acquires a mutable reference to the underlying stream that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this combinator, returning the underlying stream.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<T> Stream for Enumerate<T>
+where
+ T: Stream,
+{
+ type Item = (usize, T::Item);
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, T::Error> {
+ match try_ready!(self.inner.poll()) {
+ Some(item) => {
+ let ret = Some((self.count, item));
+ self.count += 1;
+ Ok(Async::Ready(ret))
+ }
+ None => return Ok(Async::Ready(None)),
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+impl<T> Sink for Enumerate<T>
+where
+ T: Sink,
+{
+ type SinkItem = T::SinkItem;
+ type SinkError = T::SinkError;
+
+ fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
+ self.inner.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
+ self.inner.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), T::SinkError> {
+ self.inner.close()
+ }
+}
diff --git a/tokio/src/util/future.rs b/tokio/src/util/future.rs
new file mode 100644
index 00000000..5a381810
--- /dev/null
+++ b/tokio/src/util/future.rs
@@ -0,0 +1,93 @@
+#[cfg(feature = "timer")]
+#[allow(deprecated)]
+use tokio_timer::Deadline;
+#[cfg(feature = "timer")]
+use tokio_timer::Timeout;
+
+use futures::Future;
+
+#[cfg(feature = "timer")]
+use std::time::{Duration, Instant};
+
+/// An extension trait for `Future` that provides a variety of convenient
+/// combinator functions.
+///
+/// Currently, there only is a [`timeout`] function, but this will increase
+/// over time.
+///
+/// Users are not expected to implement this trait. All types that implement
+/// `Future` already implement `FutureExt`.
+///
+/// This trait can be imported directly or via the Tokio prelude: `use
+/// tokio::prelude::*`.
+///
+/// [`timeout`]: #method.timeout
+pub trait FutureExt: Future {
+ /// Creates a new future which allows `self` until `timeout`.
+ ///
+ /// This combinator creates a new future which wraps the receiving future
+ /// with a timeout. The returned future is allowed to execute until it
+ /// completes or `timeout` has elapsed, whichever happens first.
+ ///
+ /// If the future completes before `timeout` then the future will resolve
+ /// with that item. Otherwise the future will resolve to an error.
+ ///
+ /// The future is guaranteed to be polled at least once, even if `timeout`
+ /// is set to zero.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio;
+ /// # extern crate futures;
+ /// use tokio::prelude::*;
+ /// use std::time::Duration;
+ /// # use futures::future::{self, FutureResult};
+ ///
+ /// # fn long_future() -> FutureResult<(), ()> {
+ /// # future::ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// let future = long_future()
+ /// .timeout(Duration::from_secs(1))
+ /// .map_err(|e| println!("error = {:?}", e));
+ ///
+ /// tokio::run(future);
+ /// # }
+ /// ```
+ #[cfg(feature = "timer")]
+ fn timeout(self, timeout: Duration) -> Timeout<Self>
+ where
+ Self: Sized,
+ {
+ Timeout::new(self, timeout)
+ }
+
+ #[cfg(feature = "timer")]
+ #[deprecated(since = "0.1.8", note = "use `timeout` instead")]
+ #[allow(deprecated)]
+ #[doc(hidden)]
+ fn deadline(self, deadline: Instant) -> Deadline<Self>
+ where
+ Self: Sized,
+ {
+ Deadline::new(self, deadline)
+ }
+}
+
+impl<T: ?Sized> FutureExt for T where T: Future {}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use prelude::future;
+
+ #[cfg(feature = "timer")]
+ #[test]
+ fn timeout_polls_at_least_once() {
+ let base_future = future::result::<(), ()>(Ok(()));
+ let timeouted_future = base_future.timeout(Duration::new(0, 0));
+ assert!(timeouted_future.wait().is_ok());
+ }
+}
diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs
new file mode 100644
index 00000000..58fd3d0b
--- /dev/null
+++ b/tokio/src/util/mod.rs
@@ -0,0 +1,15 @@
+//! Utilities for working with Tokio.
+//!
+//! This module contains utilities that are useful for working with Tokio.
+//! Currently, this only includes [`FutureExt`] and [`StreamExt`], but this
+//! may grow over time.
+//!
+//! [`FutureExt`]: trait.FutureExt.html
+//! [`StreamExt`]: trait.StreamExt.html
+
+mod enumerate;
+mod future;
+mod stream;
+
+pub use self::future::FutureExt;
+pub use self::stream::StreamExt;
diff --git a/tokio/src/util/stream.rs b/tokio/src/util/stream.rs
new file mode 100644
index 00000000..3b7aa268
--- /dev/null
+++ b/tokio/src/util/stream.rs
@@ -0,0 +1,95 @@
+#[cfg(feature = "timer")]
+use tokio_timer::{throttle::Throttle, Timeout};
+
+use futures::Stream;
+
+#[cfg(feature = "timer")]
+use std::time::Duration;
+pub use util::enumerate::Enumerate;
+
+/// An extension trait for `Stream` that provides a variety of convenient
+/// combinator functions.
+///
+/// Currently, there are only [`timeout`] and [`throttle`] functions, but
+/// this will increase over time.
+///
+/// Users are not expected to implement this trait. All types that implement
+/// `Stream` already implement `StreamExt`.
+///
+/// This trait can be imported directly or via the Tokio prelude: `use
+/// tokio::prelude::*`.
+///
+/// [`timeout`]: #method.timeout
+pub trait StreamExt: Stream {
+ /// Throttle down the stream by enforcing a fixed delay between items.
+ ///
+ /// Errors are also delayed.
+ #[cfg(feature = "timer")]
+ fn throttle(self, duration: Duration) -> Throttle<Self>
+ where
+ Self: Sized,
+ {
+ Throttle::new(self, duration)
+ }
+
+ /// Creates a new stream which gives the current iteration count as well
+ /// as the next value.
+ ///
+ /// The stream returned yields pairs `(i, val)`, where `i` is the
+ /// current index of iteration and `val` is the value returned by the
+ /// iterator.
+ ///
+ /// # Overflow Behavior
+ ///
+ /// The method does no guarding against overflows, so counting elements of
+ /// an iterator with more than [`std::usize::MAX`] elements either produces the
+ /// wrong result or panics.
+ fn enumerate(self) -> Enumerate<Self>
+ where
+ Self: Sized,
+ {
+ Enumerate::new(self)
+ }
+
+ /// Creates a new stream which allows `self` until `timeout`.
+ ///
+ /// This combinator creates a new stream which wraps the receiving stream
+ /// with a timeout. For each item, the returned stream is allowed to execute
+ /// until it completes or `timeout` has elapsed, whichever happens first.
+ ///
+ /// If an item completes before `timeout` then the stream will yield
+ /// with that item. Otherwise the stream will yield to an error.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio;
+ /// # extern crate futures;
+ /// use tokio::prelude::*;
+ /// use std::time::Duration;
+ /// # use futures::future::{self, FutureResult};
+ ///
+ /// # fn long_future() -> FutureResult<(), ()> {
+ /// # future::ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// let stream = long_future()
+ /// .into_stream()
+ /// .timeout(Duration::from_secs(1))
+ /// .for_each(|i| future::ok(println!("item = {:?}", i)))
+ /// .map_err(|e| println!("error = {:?}", e));
+ ///
+ /// tokio::run(stream);
+ /// # }
+ /// ```
+ #[cfg(feature = "timer")]
+ fn timeout(self, timeout: Duration) -> Timeout<Self>
+ where
+ Self: Sized,
+ {
+ Timeout::new(self, timeout)
+ }
+}
+
+impl<T: ?Sized> StreamExt for T where T: Stream {}