diff options
author | Artem Vorotnikov <artem@vorotnikov.me> | 2019-12-18 22:57:22 +0300 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-18 11:57:22 -0800 |
commit | 4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch) | |
tree | fe10e6fffea1033c595b920935dc723be3cc3ac4 /tokio | |
parent | b0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff) |
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions
to make working with streams easier. This patch includes two functions:
* `next`: a future returning the item in the stream.
* `map`: transform each item in the stream.
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/fs/read_dir.rs | 4 | ||||
-rw-r--r-- | tokio/src/io/util/async_buf_read_ext.rs | 2 | ||||
-rw-r--r-- | tokio/src/io/util/lines.rs | 2 | ||||
-rw-r--r-- | tokio/src/io/util/split.rs | 2 | ||||
-rw-r--r-- | tokio/src/lib.rs | 4 | ||||
-rw-r--r-- | tokio/src/net/tcp/incoming.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/tcp/listener.rs | 4 | ||||
-rw-r--r-- | tokio/src/net/unix/incoming.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/unix/listener.rs | 3 | ||||
-rw-r--r-- | tokio/src/signal/unix.rs | 2 | ||||
-rw-r--r-- | tokio/src/signal/windows.rs | 6 | ||||
-rw-r--r-- | tokio/src/stream/iter.rs | 52 | ||||
-rw-r--r-- | tokio/src/stream/map.rs | 57 | ||||
-rw-r--r-- | tokio/src/stream/mod.rs | 93 | ||||
-rw-r--r-- | tokio/src/stream/next.rs | 31 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/watch.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/interval.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/throttle.rs | 4 | ||||
-rw-r--r-- | tokio/tests/fs_dir.rs | 2 | ||||
-rw-r--r-- | tokio/tests/io_lines.rs | 2 | ||||
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 4 | ||||
-rw-r--r-- | tokio/tests/sync_watch.rs | 2 | ||||
-rw-r--r-- | tokio/tests/time_interval.rs | 2 |
25 files changed, 262 insertions, 28 deletions
diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index d252eabb..06eed384 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -36,7 +36,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> { /// /// [`read_dir`]: read_dir /// [`DirEntry`]: DirEntry -/// [`Stream`]: futures_core::Stream +/// [`Stream`]: crate::stream::Stream /// [`Err`]: std::result::Result::Err #[derive(Debug)] #[must_use = "streams do nothing unless polled"] @@ -85,7 +85,7 @@ impl ReadDir { } #[cfg(feature = "stream")] -impl futures_core::Stream for ReadDir { +impl crate::stream::Stream for ReadDir { type Item = io::Result<DirEntry>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs index 26ce28ca..b2930652 100644 --- a/tokio/src/io/util/async_buf_read_ext.rs +++ b/tokio/src/io/util/async_buf_read_ext.rs @@ -226,8 +226,8 @@ cfg_io_util! { /// /// ``` /// use tokio::io::AsyncBufReadExt; + /// use tokio::stream::StreamExt; /// - /// use futures::{StreamExt}; /// use std::io::Cursor; /// /// #[tokio::main] diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 0f1d946a..f0e75de4 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -91,7 +91,7 @@ where } #[cfg(feature = "stream")] -impl<R: AsyncBufRead> futures_core::Stream for Lines<R> { +impl<R: AsyncBufRead> crate::stream::Stream for Lines<R> { type Item = io::Result<String>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index a2e168de..f1ed2fd8 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -89,7 +89,7 @@ where } #[cfg(feature = "stream")] -impl<R: AsyncBufRead> futures_core::Stream for Split<R> { +impl<R: AsyncBufRead> crate::stream::Stream for Split<R> { type Item = io::Result<Vec<u8>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 66fd7975..831a5241 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -241,6 +241,10 @@ cfg_signal! { pub mod signal; } +cfg_stream! { + pub mod stream; +} + cfg_sync! { pub mod sync; } diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs index 3033aefa..0abe047d 100644 --- a/tokio/src/net/tcp/incoming.rs +++ b/tokio/src/net/tcp/incoming.rs @@ -28,7 +28,7 @@ impl Incoming<'_> { } #[cfg(feature = "stream")] -impl futures_core::Stream for Incoming<'_> { +impl crate::stream::Stream for Incoming<'_> { type Item = io::Result<TcpStream>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 2cddf24b..3e0e0f7d 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -250,9 +250,7 @@ impl TcpListener { /// # Examples /// /// ```no_run - /// use tokio::net::TcpListener; - /// - /// use futures::StreamExt; + /// use tokio::{net::TcpListener, stream::StreamExt}; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs index dbe964a8..bede96dd 100644 --- a/tokio/src/net/unix/incoming.rs +++ b/tokio/src/net/unix/incoming.rs @@ -27,7 +27,7 @@ impl Incoming<'_> { } #[cfg(feature = "stream")] -impl futures_core::Stream for Incoming<'_> { +impl crate::stream::Stream for Incoming<'_> { type Item = io::Result<UnixStream>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 311bae2c..4b93bf82 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -104,8 +104,7 @@ impl UnixListener { /// /// ```no_run /// use tokio::net::UnixListener; - /// - /// use futures::StreamExt; + /// use tokio::stream::StreamExt; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index cd326424..bfa2e309 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -482,7 +482,7 @@ impl Signal { } cfg_stream! { - impl futures_core::Stream for Signal { + impl crate::stream::Stream for Signal { type Item = (); fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index de35643e..def1a1d7 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -209,7 +209,7 @@ impl CtrlBreak { } cfg_stream! { - impl futures_core::Stream for CtrlBreak { + impl crate::stream::Stream for CtrlBreak { type Item = (); fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { @@ -246,9 +246,9 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> { mod tests { use super::*; use crate::runtime::Runtime; - use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; + use crate::stream::StreamExt; - use futures::stream::StreamExt; + use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; #[test] fn ctrl_c() { diff --git a/tokio/src/stream/iter.rs b/tokio/src/stream/iter.rs new file mode 100644 index 00000000..dc06495e --- /dev/null +++ b/tokio/src/stream/iter.rs @@ -0,0 +1,52 @@ +use crate::stream::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`iter`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Iter<I> { + iter: I, +} + +impl<I> Unpin for Iter<I> {} + +/// Converts an `Iterator` into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +/// +/// ``` +/// # async fn dox() { +/// use tokio::stream::{self, StreamExt}; +/// +/// let mut stream = stream::iter(vec![17, 19]); +/// +/// assert_eq!(stream.next().await, Some(17)); +/// assert_eq!(stream.next().await, Some(19)); +/// assert_eq!(stream.next().await, None); +/// # } +/// ``` +pub fn iter<I>(i: I) -> Iter<I::IntoIter> + where I: IntoIterator, +{ + Iter { + iter: i.into_iter(), + } +} + +impl<I> Stream for Iter<I> + where I: Iterator, +{ + type Item = I::Item; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<I::Item>> { + Poll::Ready(self.iter.next()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs new file mode 100644 index 00000000..a89769de --- /dev/null +++ b/tokio/src/stream/map.rs @@ -0,0 +1,57 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Map<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, T, F> Map<St, F> + where St: Stream, + F: FnMut(St::Item) -> T, +{ + pub(super) fn new(stream: St, f: F) -> Map<St, F> { + Map { stream, f } + } +} + +impl<St, F, T> Stream for Map<St, F> + where St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { + self.as_mut() + .project().stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs new file mode 100644 index 00000000..04e6dc06 --- /dev/null +++ b/tokio/src/stream/mod.rs @@ -0,0 +1,93 @@ +//! Stream utilities for Tokio. +//! +//! `Stream`s are an asynchoronous version of standard library's Iterator. +//! +//! This module provides helpers to work with them. + +mod iter; +pub use iter::{iter, Iter}; + +mod map; +use map::Map; + +mod next; +use next::Next; + +pub use futures_core::Stream; + +/// An extension trait for `Stream`s that provides a variety of convenient +/// combinator functions. +pub trait StreamExt: Stream { + /// Creates a future that resolves to the next item in the stream. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option<Self::Item>; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map<T, F>(self, f: F) -> Map<Self, F> + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } +} + +impl<T: ?Sized> StreamExt for T where T: Stream {} diff --git a/tokio/src/stream/next.rs b/tokio/src/stream/next.rs new file mode 100644 index 00000000..3b0a1dd8 --- /dev/null +++ b/tokio/src/stream/next.rs @@ -0,0 +1,31 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`next`](super::StreamExt::next) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, +} + +impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} + +impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { stream } + } +} + +impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { + type Output = Option<St::Item>; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + Pin::new(&mut self.stream).poll_next(cx) + } +} diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 7294e4d5..7c7a5abb 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -177,7 +177,7 @@ impl<T> Receiver<T> { impl<T> Unpin for Receiver<T> {} cfg_stream! { - impl<T> futures_core::Stream for Receiver<T> { + impl<T> crate::stream::Stream for Receiver<T> { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 4a6ba8ee..63d04370 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -148,7 +148,7 @@ impl<T> UnboundedReceiver<T> { } #[cfg(feature = "stream")] -impl<T> futures_core::Stream for UnboundedReceiver<T> { +impl<T> crate::stream::Stream for UnboundedReceiver<T> { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 124027f9..d6b82982 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -268,7 +268,7 @@ impl<T: Clone> Receiver<T> { } #[cfg(feature = "stream")] -impl<T: Clone> futures_core::Stream for Receiver<T> { +impl<T: Clone> crate::stream::Stream for Receiver<T> { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 70daf47d..b1c3ee55 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -130,7 +130,7 @@ impl Interval { } #[cfg(feature = "stream")] -impl futures_core::Stream for Interval { +impl crate::stream::Stream for Interval { type Item = Instant; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> { diff --git a/tokio/src/time/throttle.rs b/tokio/src/time/throttle.rs index 2daa30fc..ccb28ad8 100644 --- a/tokio/src/time/throttle.rs +++ b/tokio/src/time/throttle.rs @@ -1,5 +1,6 @@ //! Slow down a stream by enforcing a delay between items. +use crate::stream::Stream; use crate::time::{Delay, Duration, Instant}; use std::future::Future; @@ -7,7 +8,6 @@ use std::marker::Unpin; use std::pin::Pin; use std::task::{self, Poll}; -use futures_core::Stream; use pin_project_lite::pin_project; /// Slow down a stream by enforcing a delay between items. @@ -17,8 +17,8 @@ use pin_project_lite::pin_project; /// /// Create a throttled stream. /// ```rust,norun -/// use futures::stream::StreamExt; /// use std::time::Duration; +/// use tokio::stream::StreamExt; /// use tokio::time::throttle; /// /// # async fn dox() { diff --git a/tokio/tests/fs_dir.rs b/tokio/tests/fs_dir.rs index c8b32fc6..eaff59da 100644 --- a/tokio/tests/fs_dir.rs +++ b/tokio/tests/fs_dir.rs @@ -71,7 +71,7 @@ async fn read_inherent() { #[tokio::test] async fn read_stream() { - use futures::StreamExt; + use tokio::stream::StreamExt; let base_dir = tempdir().unwrap(); diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs index 3775cae0..2f6b3393 100644 --- a/tokio/tests/io_lines.rs +++ b/tokio/tests/io_lines.rs @@ -20,7 +20,7 @@ async fn lines_inherent() { #[tokio::test] async fn lines_stream() { - use futures::StreamExt; + use tokio::stream::StreamExt; let rd: &[u8] = b"hello\r\nworld\n\n"; let mut st = rd.lines(); diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index e1f99595..7e5c60e2 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -42,7 +42,7 @@ fn send_recv_with_buffer() { #[tokio::test] async fn send_recv_stream_with_buffer() { - use futures::StreamExt; + use tokio::stream::StreamExt; let (mut tx, mut rx) = mpsc::channel::<i32>(16); @@ -147,7 +147,7 @@ async fn async_send_recv_unbounded() { #[tokio::test] async fn send_recv_stream_unbounded() { - use futures::StreamExt; + use tokio::stream::StreamExt; let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 409615d4..2bc5bb2a 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -195,7 +195,7 @@ fn poll_close() { #[test] fn stream_impl() { - use futures::StreamExt; + use tokio::stream::StreamExt; let (tx, mut rx) = watch::channel("one"); diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index 6a1c756f..1123681f 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -45,7 +45,7 @@ async fn usage() { #[tokio::test] async fn usage_stream() { - use futures::StreamExt; + use tokio::stream::StreamExt; let start = Instant::now(); let mut interval = time::interval(ms(10)); |