diff options
author | Artem Vorotnikov <artem@vorotnikov.me> | 2019-12-18 22:57:22 +0300 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-18 11:57:22 -0800 |
commit | 4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch) | |
tree | fe10e6fffea1033c595b920935dc723be3cc3ac4 /tokio/src/stream | |
parent | b0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff) |
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions
to make working with streams easier. This patch includes two functions:
* `next`: a future returning the item in the stream.
* `map`: transform each item in the stream.
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/iter.rs | 52 | ||||
-rw-r--r-- | tokio/src/stream/map.rs | 57 | ||||
-rw-r--r-- | tokio/src/stream/mod.rs | 93 | ||||
-rw-r--r-- | tokio/src/stream/next.rs | 31 |
4 files changed, 233 insertions, 0 deletions
diff --git a/tokio/src/stream/iter.rs b/tokio/src/stream/iter.rs new file mode 100644 index 00000000..dc06495e --- /dev/null +++ b/tokio/src/stream/iter.rs @@ -0,0 +1,52 @@ +use crate::stream::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`iter`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Iter<I> { + iter: I, +} + +impl<I> Unpin for Iter<I> {} + +/// Converts an `Iterator` into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +/// +/// ``` +/// # async fn dox() { +/// use tokio::stream::{self, StreamExt}; +/// +/// let mut stream = stream::iter(vec![17, 19]); +/// +/// assert_eq!(stream.next().await, Some(17)); +/// assert_eq!(stream.next().await, Some(19)); +/// assert_eq!(stream.next().await, None); +/// # } +/// ``` +pub fn iter<I>(i: I) -> Iter<I::IntoIter> + where I: IntoIterator, +{ + Iter { + iter: i.into_iter(), + } +} + +impl<I> Stream for Iter<I> + where I: Iterator, +{ + type Item = I::Item; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<I::Item>> { + Poll::Ready(self.iter.next()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs new file mode 100644 index 00000000..a89769de --- /dev/null +++ b/tokio/src/stream/map.rs @@ -0,0 +1,57 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Map<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, T, F> Map<St, F> + where St: Stream, + F: FnMut(St::Item) -> T, +{ + pub(super) fn new(stream: St, f: F) -> Map<St, F> { + Map { stream, f } + } +} + +impl<St, F, T> Stream for Map<St, F> + where St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { + self.as_mut() + .project().stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs new file mode 100644 index 00000000..04e6dc06 --- /dev/null +++ b/tokio/src/stream/mod.rs @@ -0,0 +1,93 @@ +//! Stream utilities for Tokio. +//! +//! `Stream`s are an asynchoronous version of standard library's Iterator. +//! +//! This module provides helpers to work with them. + +mod iter; +pub use iter::{iter, Iter}; + +mod map; +use map::Map; + +mod next; +use next::Next; + +pub use futures_core::Stream; + +/// An extension trait for `Stream`s that provides a variety of convenient +/// combinator functions. +pub trait StreamExt: Stream { + /// Creates a future that resolves to the next item in the stream. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option<Self::Item>; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map<T, F>(self, f: F) -> Map<Self, F> + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } +} + +impl<T: ?Sized> StreamExt for T where T: Stream {} diff --git a/tokio/src/stream/next.rs b/tokio/src/stream/next.rs new file mode 100644 index 00000000..3b0a1dd8 --- /dev/null +++ b/tokio/src/stream/next.rs @@ -0,0 +1,31 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`next`](super::StreamExt::next) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, +} + +impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} + +impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { stream } + } +} + +impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { + type Output = Option<St::Item>; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + Pin::new(&mut self.stream).poll_next(cx) + } +} |