summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-18 22:57:22 +0300
committerCarl Lerche <me@carllerche.com>2019-12-18 11:57:22 -0800
commit4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch)
treefe10e6fffea1033c595b920935dc723be3cc3ac4 /tokio/src/stream
parentb0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff)
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream.
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/iter.rs52
-rw-r--r--tokio/src/stream/map.rs57
-rw-r--r--tokio/src/stream/mod.rs93
-rw-r--r--tokio/src/stream/next.rs31
4 files changed, 233 insertions, 0 deletions
diff --git a/tokio/src/stream/iter.rs b/tokio/src/stream/iter.rs
new file mode 100644
index 00000000..dc06495e
--- /dev/null
+++ b/tokio/src/stream/iter.rs
@@ -0,0 +1,52 @@
+use crate::stream::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`iter`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Iter<I> {
+ iter: I,
+}
+
+impl<I> Unpin for Iter<I> {}
+
+/// Converts an `Iterator` into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+///
+/// ```
+/// # async fn dox() {
+/// use tokio::stream::{self, StreamExt};
+///
+/// let mut stream = stream::iter(vec![17, 19]);
+///
+/// assert_eq!(stream.next().await, Some(17));
+/// assert_eq!(stream.next().await, Some(19));
+/// assert_eq!(stream.next().await, None);
+/// # }
+/// ```
+pub fn iter<I>(i: I) -> Iter<I::IntoIter>
+ where I: IntoIterator,
+{
+ Iter {
+ iter: i.into_iter(),
+ }
+}
+
+impl<I> Stream for Iter<I>
+ where I: Iterator,
+{
+ type Item = I::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<I::Item>> {
+ Poll::Ready(self.iter.next())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs
new file mode 100644
index 00000000..a89769de
--- /dev/null
+++ b/tokio/src/stream/map.rs
@@ -0,0 +1,57 @@
+use crate::stream::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map`](super::StreamExt::map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Map<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Map<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Map")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, T, F> Map<St, F>
+ where St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ pub(super) fn new(stream: St, f: F) -> Map<St, F> {
+ Map { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for Map<St, F>
+ where St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ type Item = T;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T>> {
+ self.as_mut()
+ .project().stream
+ .poll_next(cx)
+ .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
new file mode 100644
index 00000000..04e6dc06
--- /dev/null
+++ b/tokio/src/stream/mod.rs
@@ -0,0 +1,93 @@
+//! Stream utilities for Tokio.
+//!
+//! `Stream`s are an asynchoronous version of standard library's Iterator.
+//!
+//! This module provides helpers to work with them.
+
+mod iter;
+pub use iter::{iter, Iter};
+
+mod map;
+use map::Map;
+
+mod next;
+use next::Next;
+
+pub use futures_core::Stream;
+
+/// An extension trait for `Stream`s that provides a variety of convenient
+/// combinator functions.
+pub trait StreamExt: Stream {
+ /// Creates a future that resolves to the next item in the stream.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn next(&mut self) -> Option<Self::Item>;
+ /// ```
+ ///
+ /// Note that because `next` doesn't take ownership over the stream,
+ /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
+ /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
+ /// be done by boxing the stream using [`Box::pin`] or
+ /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
+ /// crate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=3);
+ ///
+ /// assert_eq!(stream.next().await, Some(1));
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, Some(3));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn next(&mut self) -> Next<'_, Self>
+ where
+ Self: Unpin,
+ {
+ Next::new(self)
+ }
+
+ /// Maps this stream's items to a different type, returning a new stream of
+ /// the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available. It is executed inline with calls to
+ /// [`poll_next`](Stream::poll_next).
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let mut stream = stream.map(|x| x + 3);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn map<T, F>(self, f: F) -> Map<Self, F>
+ where
+ F: FnMut(Self::Item) -> T,
+ Self: Sized,
+ {
+ Map::new(self, f)
+ }
+}
+
+impl<T: ?Sized> StreamExt for T where T: Stream {}
diff --git a/tokio/src/stream/next.rs b/tokio/src/stream/next.rs
new file mode 100644
index 00000000..3b0a1dd8
--- /dev/null
+++ b/tokio/src/stream/next.rs
@@ -0,0 +1,31 @@
+use crate::stream::Stream;
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Future for the [`next`](super::StreamExt::next) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Next<'a, St: ?Sized> {
+ stream: &'a mut St,
+}
+
+impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
+
+impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Next { stream }
+ }
+}
+
+impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
+ type Output = Option<St::Item>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ Pin::new(&mut self.stream).poll_next(cx)
+ }
+}