summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream/mod.rs
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-18 22:57:22 +0300
committerCarl Lerche <me@carllerche.com>2019-12-18 11:57:22 -0800
commit4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch)
treefe10e6fffea1033c595b920935dc723be3cc3ac4 /tokio/src/stream/mod.rs
parentb0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff)
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream.
Diffstat (limited to 'tokio/src/stream/mod.rs')
-rw-r--r--tokio/src/stream/mod.rs93
1 files changed, 93 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
new file mode 100644
index 00000000..04e6dc06
--- /dev/null
+++ b/tokio/src/stream/mod.rs
@@ -0,0 +1,93 @@
+//! Stream utilities for Tokio.
+//!
+//! `Stream`s are an asynchoronous version of standard library's Iterator.
+//!
+//! This module provides helpers to work with them.
+
+mod iter;
+pub use iter::{iter, Iter};
+
+mod map;
+use map::Map;
+
+mod next;
+use next::Next;
+
+pub use futures_core::Stream;
+
+/// An extension trait for `Stream`s that provides a variety of convenient
+/// combinator functions.
+pub trait StreamExt: Stream {
+ /// Creates a future that resolves to the next item in the stream.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn next(&mut self) -> Option<Self::Item>;
+ /// ```
+ ///
+ /// Note that because `next` doesn't take ownership over the stream,
+ /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
+ /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
+ /// be done by boxing the stream using [`Box::pin`] or
+ /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
+ /// crate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=3);
+ ///
+ /// assert_eq!(stream.next().await, Some(1));
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, Some(3));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn next(&mut self) -> Next<'_, Self>
+ where
+ Self: Unpin,
+ {
+ Next::new(self)
+ }
+
+ /// Maps this stream's items to a different type, returning a new stream of
+ /// the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available. It is executed inline with calls to
+ /// [`poll_next`](Stream::poll_next).
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let mut stream = stream.map(|x| x + 3);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn map<T, F>(self, f: F) -> Map<Self, F>
+ where
+ F: FnMut(Self::Item) -> T,
+ Self: Sized,
+ {
+ Map::new(self, f)
+ }
+}
+
+impl<T: ?Sized> StreamExt for T where T: Stream {}