summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-18 22:57:22 +0300
committerCarl Lerche <me@carllerche.com>2019-12-18 11:57:22 -0800
commit4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch)
treefe10e6fffea1033c595b920935dc723be3cc3ac4 /tokio/src
parentb0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff)
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream.
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/fs/read_dir.rs4
-rw-r--r--tokio/src/io/util/async_buf_read_ext.rs2
-rw-r--r--tokio/src/io/util/lines.rs2
-rw-r--r--tokio/src/io/util/split.rs2
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/net/tcp/incoming.rs2
-rw-r--r--tokio/src/net/tcp/listener.rs4
-rw-r--r--tokio/src/net/unix/incoming.rs2
-rw-r--r--tokio/src/net/unix/listener.rs3
-rw-r--r--tokio/src/signal/unix.rs2
-rw-r--r--tokio/src/signal/windows.rs6
-rw-r--r--tokio/src/stream/iter.rs52
-rw-r--r--tokio/src/stream/map.rs57
-rw-r--r--tokio/src/stream/mod.rs93
-rw-r--r--tokio/src/stream/next.rs31
-rw-r--r--tokio/src/sync/mpsc/bounded.rs2
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs2
-rw-r--r--tokio/src/sync/watch.rs2
-rw-r--r--tokio/src/time/interval.rs2
-rw-r--r--tokio/src/time/throttle.rs4
20 files changed, 256 insertions, 22 deletions
diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs
index d252eabb..06eed384 100644
--- a/tokio/src/fs/read_dir.rs
+++ b/tokio/src/fs/read_dir.rs
@@ -36,7 +36,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
///
/// [`read_dir`]: read_dir
/// [`DirEntry`]: DirEntry
-/// [`Stream`]: futures_core::Stream
+/// [`Stream`]: crate::stream::Stream
/// [`Err`]: std::result::Result::Err
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
@@ -85,7 +85,7 @@ impl ReadDir {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for ReadDir {
+impl crate::stream::Stream for ReadDir {
type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs
index 26ce28ca..b2930652 100644
--- a/tokio/src/io/util/async_buf_read_ext.rs
+++ b/tokio/src/io/util/async_buf_read_ext.rs
@@ -226,8 +226,8 @@ cfg_io_util! {
///
/// ```
/// use tokio::io::AsyncBufReadExt;
+ /// use tokio::stream::StreamExt;
///
- /// use futures::{StreamExt};
/// use std::io::Cursor;
///
/// #[tokio::main]
diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs
index 0f1d946a..f0e75de4 100644
--- a/tokio/src/io/util/lines.rs
+++ b/tokio/src/io/util/lines.rs
@@ -91,7 +91,7 @@ where
}
#[cfg(feature = "stream")]
-impl<R: AsyncBufRead> futures_core::Stream for Lines<R> {
+impl<R: AsyncBufRead> crate::stream::Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs
index a2e168de..f1ed2fd8 100644
--- a/tokio/src/io/util/split.rs
+++ b/tokio/src/io/util/split.rs
@@ -89,7 +89,7 @@ where
}
#[cfg(feature = "stream")]
-impl<R: AsyncBufRead> futures_core::Stream for Split<R> {
+impl<R: AsyncBufRead> crate::stream::Stream for Split<R> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 66fd7975..831a5241 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -241,6 +241,10 @@ cfg_signal! {
pub mod signal;
}
+cfg_stream! {
+ pub mod stream;
+}
+
cfg_sync! {
pub mod sync;
}
diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs
index 3033aefa..0abe047d 100644
--- a/tokio/src/net/tcp/incoming.rs
+++ b/tokio/src/net/tcp/incoming.rs
@@ -28,7 +28,7 @@ impl Incoming<'_> {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for Incoming<'_> {
+impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<TcpStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index 2cddf24b..3e0e0f7d 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -250,9 +250,7 @@ impl TcpListener {
/// # Examples
///
/// ```no_run
- /// use tokio::net::TcpListener;
- ///
- /// use futures::StreamExt;
+ /// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs
index dbe964a8..bede96dd 100644
--- a/tokio/src/net/unix/incoming.rs
+++ b/tokio/src/net/unix/incoming.rs
@@ -27,7 +27,7 @@ impl Incoming<'_> {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for Incoming<'_> {
+impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 311bae2c..4b93bf82 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -104,8 +104,7 @@ impl UnixListener {
///
/// ```no_run
/// use tokio::net::UnixListener;
- ///
- /// use futures::StreamExt;
+ /// use tokio::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index cd326424..bfa2e309 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -482,7 +482,7 @@ impl Signal {
}
cfg_stream! {
- impl futures_core::Stream for Signal {
+ impl crate::stream::Stream for Signal {
type Item = ();
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
index de35643e..def1a1d7 100644
--- a/tokio/src/signal/windows.rs
+++ b/tokio/src/signal/windows.rs
@@ -209,7 +209,7 @@ impl CtrlBreak {
}
cfg_stream! {
- impl futures_core::Stream for CtrlBreak {
+ impl crate::stream::Stream for CtrlBreak {
type Item = ();
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
@@ -246,9 +246,9 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> {
mod tests {
use super::*;
use crate::runtime::Runtime;
- use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
+ use crate::stream::StreamExt;
- use futures::stream::StreamExt;
+ use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
#[test]
fn ctrl_c() {
diff --git a/tokio/src/stream/iter.rs b/tokio/src/stream/iter.rs
new file mode 100644
index 00000000..dc06495e
--- /dev/null
+++ b/tokio/src/stream/iter.rs
@@ -0,0 +1,52 @@
+use crate::stream::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`iter`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Iter<I> {
+ iter: I,
+}
+
+impl<I> Unpin for Iter<I> {}
+
+/// Converts an `Iterator` into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+///
+/// ```
+/// # async fn dox() {
+/// use tokio::stream::{self, StreamExt};
+///
+/// let mut stream = stream::iter(vec![17, 19]);
+///
+/// assert_eq!(stream.next().await, Some(17));
+/// assert_eq!(stream.next().await, Some(19));
+/// assert_eq!(stream.next().await, None);
+/// # }
+/// ```
+pub fn iter<I>(i: I) -> Iter<I::IntoIter>
+ where I: IntoIterator,
+{
+ Iter {
+ iter: i.into_iter(),
+ }
+}
+
+impl<I> Stream for Iter<I>
+ where I: Iterator,
+{
+ type Item = I::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<I::Item>> {
+ Poll::Ready(self.iter.next())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs
new file mode 100644
index 00000000..a89769de
--- /dev/null
+++ b/tokio/src/stream/map.rs
@@ -0,0 +1,57 @@
+use crate::stream::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map`](super::StreamExt::map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Map<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Map<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Map")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, T, F> Map<St, F>
+ where St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ pub(super) fn new(stream: St, f: F) -> Map<St, F> {
+ Map { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for Map<St, F>
+ where St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ type Item = T;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T>> {
+ self.as_mut()
+ .project().stream
+ .poll_next(cx)
+ .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
new file mode 100644
index 00000000..04e6dc06
--- /dev/null
+++ b/tokio/src/stream/mod.rs
@@ -0,0 +1,93 @@
+//! Stream utilities for Tokio.
+//!
+//! `Stream`s are an asynchoronous version of standard library's Iterator.
+//!
+//! This module provides helpers to work with them.
+
+mod iter;
+pub use iter::{iter, Iter};
+
+mod map;
+use map::Map;
+
+mod next;
+use next::Next;
+
+pub use futures_core::Stream;
+
+/// An extension trait for `Stream`s that provides a variety of convenient
+/// combinator functions.
+pub trait StreamExt: Stream {
+ /// Creates a future that resolves to the next item in the stream.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn next(&mut self) -> Option<Self::Item>;
+ /// ```
+ ///
+ /// Note that because `next` doesn't take ownership over the stream,
+ /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
+ /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
+ /// be done by boxing the stream using [`Box::pin`] or
+ /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
+ /// crate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=3);
+ ///
+ /// assert_eq!(stream.next().await, Some(1));
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, Some(3));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn next(&mut self) -> Next<'_, Self>
+ where
+ Self: Unpin,
+ {
+ Next::new(self)
+ }
+
+ /// Maps this stream's items to a different type, returning a new stream of
+ /// the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available. It is executed inline with calls to
+ /// [`poll_next`](Stream::poll_next).
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let mut stream = stream.map(|x| x + 3);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn map<T, F>(self, f: F) -> Map<Self, F>
+ where
+ F: FnMut(Self::Item) -> T,
+ Self: Sized,
+ {
+ Map::new(self, f)
+ }
+}
+
+impl<T: ?Sized> StreamExt for T where T: Stream {}
diff --git a/tokio/src/stream/next.rs b/tokio/src/stream/next.rs
new file mode 100644
index 00000000..3b0a1dd8
--- /dev/null
+++ b/tokio/src/stream/next.rs
@@ -0,0 +1,31 @@
+use crate::stream::Stream;
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Future for the [`next`](super::StreamExt::next) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Next<'a, St: ?Sized> {
+ stream: &'a mut St,
+}
+
+impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
+
+impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
+ pub(super) fn new(stream: &'a mut St) -> Self {
+ Next { stream }
+ }
+}
+
+impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
+ type Output = Option<St::Item>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ Pin::new(&mut self.stream).poll_next(cx)
+ }
+}
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 7294e4d5..7c7a5abb 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -177,7 +177,7 @@ impl<T> Receiver<T> {
impl<T> Unpin for Receiver<T> {}
cfg_stream! {
- impl<T> futures_core::Stream for Receiver<T> {
+ impl<T> crate::stream::Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 4a6ba8ee..63d04370 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -148,7 +148,7 @@ impl<T> UnboundedReceiver<T> {
}
#[cfg(feature = "stream")]
-impl<T> futures_core::Stream for UnboundedReceiver<T> {
+impl<T> crate::stream::Stream for UnboundedReceiver<T> {
type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index 124027f9..d6b82982 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -268,7 +268,7 @@ impl<T: Clone> Receiver<T> {
}
#[cfg(feature = "stream")]
-impl<T: Clone> futures_core::Stream for Receiver<T> {
+impl<T: Clone> crate::stream::Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs
index 70daf47d..b1c3ee55 100644
--- a/tokio/src/time/interval.rs
+++ b/tokio/src/time/interval.rs
@@ -130,7 +130,7 @@ impl Interval {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for Interval {
+impl crate::stream::Stream for Interval {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
diff --git a/tokio/src/time/throttle.rs b/tokio/src/time/throttle.rs
index 2daa30fc..ccb28ad8 100644
--- a/tokio/src/time/throttle.rs
+++ b/tokio/src/time/throttle.rs
@@ -1,5 +1,6 @@
//! Slow down a stream by enforcing a delay between items.
+use crate::stream::Stream;
use crate::time::{Delay, Duration, Instant};
use std::future::Future;
@@ -7,7 +8,6 @@ use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
-use futures_core::Stream;
use pin_project_lite::pin_project;
/// Slow down a stream by enforcing a delay between items.
@@ -17,8 +17,8 @@ use pin_project_lite::pin_project;
///
/// Create a throttled stream.
/// ```rust,norun
-/// use futures::stream::StreamExt;
/// use std::time::Duration;
+/// use tokio::stream::StreamExt;
/// use tokio::time::throttle;
///
/// # async fn dox() {