summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-18 22:57:22 +0300
committerCarl Lerche <me@carllerche.com>2019-12-18 11:57:22 -0800
commit4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch)
treefe10e6fffea1033c595b920935dc723be3cc3ac4
parentb0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff)
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream.
-rw-r--r--examples/chat.rs7
-rw-r--r--examples/connect.rs13
-rw-r--r--examples/print_each_packet.rs2
-rw-r--r--examples/tinydb.rs3
-rw-r--r--examples/tinyhttp.rs3
-rw-r--r--examples/udp-codec.rs3
-rw-r--r--tokio-test/Cargo.toml2
-rw-r--r--tokio-test/src/task.rs3
-rw-r--r--tokio-tls/tests/smoke.rs2
-rw-r--r--tokio-util/Cargo.toml2
-rw-r--r--tokio-util/src/codec/framed.rs3
-rw-r--r--tokio-util/src/codec/framed_read.rs3
-rw-r--r--tokio-util/src/codec/framed_write.rs4
-rw-r--r--tokio-util/src/codec/mod.rs2
-rw-r--r--tokio-util/src/udp/frame.rs4
-rw-r--r--tokio-util/tests/framed.rs3
-rw-r--r--tokio-util/tests/udp.rs3
-rw-r--r--tokio/src/fs/read_dir.rs4
-rw-r--r--tokio/src/io/util/async_buf_read_ext.rs2
-rw-r--r--tokio/src/io/util/lines.rs2
-rw-r--r--tokio/src/io/util/split.rs2
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/net/tcp/incoming.rs2
-rw-r--r--tokio/src/net/tcp/listener.rs4
-rw-r--r--tokio/src/net/unix/incoming.rs2
-rw-r--r--tokio/src/net/unix/listener.rs3
-rw-r--r--tokio/src/signal/unix.rs2
-rw-r--r--tokio/src/signal/windows.rs6
-rw-r--r--tokio/src/stream/iter.rs52
-rw-r--r--tokio/src/stream/map.rs57
-rw-r--r--tokio/src/stream/mod.rs93
-rw-r--r--tokio/src/stream/next.rs31
-rw-r--r--tokio/src/sync/mpsc/bounded.rs2
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs2
-rw-r--r--tokio/src/sync/watch.rs2
-rw-r--r--tokio/src/time/interval.rs2
-rw-r--r--tokio/src/time/throttle.rs4
-rw-r--r--tokio/tests/fs_dir.rs2
-rw-r--r--tokio/tests/io_lines.rs2
-rw-r--r--tokio/tests/sync_mpsc.rs4
-rw-r--r--tokio/tests/sync_watch.rs2
-rw-r--r--tokio/tests/time_interval.rs2
42 files changed, 295 insertions, 57 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 91589072..e1da5f32 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -27,10 +27,11 @@
#![warn(rust_2018_idioms)]
use tokio::net::{TcpListener, TcpStream};
+use tokio::stream::{Stream, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
-use futures::{SinkExt, Stream, StreamExt};
+use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
@@ -163,12 +164,12 @@ impl Stream for Peer {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First poll the `UnboundedReceiver`.
- if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) {
+ if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
return Poll::Ready(Some(Ok(Message::Received(v))));
}
// Secondly poll the `Framed` stream.
- let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx));
+ let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));
Poll::Ready(match result {
// We've received a message we should broadcast to others.
diff --git a/examples/connect.rs b/examples/connect.rs
index d51af88c..75640c62 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
mod tcp {
use super::codec;
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::StreamExt;
+ use futures::{future, Sink, SinkExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
+ use tokio::stream::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
- stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
- let sink = FramedWrite::new(w, codec::Bytes);
+ let mut sink = FramedWrite::new(w, codec::Bytes);
let mut stream = FramedRead::new(r, codec::Bytes)
.filter_map(|i| match i {
Ok(i) => future::ready(Some(i)),
@@ -78,7 +80,7 @@ mod tcp {
})
.map(Ok);
- match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await {
+ match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await {
(Err(e), _) | (_, Err(e)) => Err(e.into()),
_ => Ok(()),
}
@@ -88,8 +90,9 @@ mod tcp {
mod udp {
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
+ use tokio::stream::{Stream, StreamExt};
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::{future, Sink, SinkExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;
diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs
index 4604139b..d650b5bd 100644
--- a/examples/print_each_packet.rs
+++ b/examples/print_each_packet.rs
@@ -55,9 +55,9 @@
#![warn(rust_2018_idioms)]
use tokio::net::TcpListener;
+use tokio::stream::StreamExt;
use tokio_util::codec::{BytesCodec, Decoder};
-use futures::StreamExt;
use std::env;
#[tokio::main]
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index cf867a0a..7c71dedf 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -42,9 +42,10 @@
#![warn(rust_2018_idioms)]
use tokio::net::TcpListener;
+use tokio::stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};
-use futures::{SinkExt, StreamExt};
+use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 5ddf0d48..9ac2806e 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -14,13 +14,14 @@
#![warn(rust_2018_idioms)]
use bytes::BytesMut;
-use futures::{SinkExt, StreamExt};
+use futures::SinkExt;
use http::{header::HeaderValue, Request, Response, StatusCode};
#[macro_use]
extern crate serde_derive;
use serde_json;
use std::{env, error::Error, fmt, io};
use tokio::net::{TcpListener, TcpStream};
+use tokio::stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};
#[tokio::main]
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 6b3f84a0..dc30394f 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -9,12 +9,13 @@
#![warn(rust_2018_idioms)]
use tokio::net::UdpSocket;
+use tokio::stream::StreamExt;
use tokio::{io, time};
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use bytes::Bytes;
-use futures::{FutureExt, SinkExt, StreamExt};
+use futures::{FutureExt, SinkExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml
index a2902dc5..a1e60500 100644
--- a/tokio-test/Cargo.toml
+++ b/tokio-test/Cargo.toml
@@ -20,7 +20,7 @@ Testing utilities for Tokio- and futures-based code
categories = ["asynchronous", "testing"]
[dependencies]
-tokio = { version = "0.2.0", path = "../tokio", features = ["rt-core", "sync", "time", "test-util"] }
+tokio = { version = "0.2.0", path = "../tokio", features = ["rt-core", "stream", "sync", "time", "test-util"] }
bytes = "0.5.0"
futures-core = "0.3.0"
diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs
index 4790de54..71ebe7b4 100644
--- a/tokio-test/src/task.rs
+++ b/tokio-test/src/task.rs
@@ -2,7 +2,6 @@
#![allow(clippy::mutex_atomic)]
-use futures_core::Stream;
use std::future::Future;
use std::mem;
use std::ops;
@@ -10,6 +9,8 @@ use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+use tokio::stream::Stream;
+
/// TOOD: dox
pub fn spawn<T>(task: T) -> Spawn<T> {
Spawn {
diff --git a/tokio-tls/tests/smoke.rs b/tokio-tls/tests/smoke.rs
index 64dda6ab..a575ba85 100644
--- a/tokio-tls/tests/smoke.rs
+++ b/tokio-tls/tests/smoke.rs
@@ -3,7 +3,6 @@
use cfg_if::cfg_if;
use env_logger;
use futures::join;
-use futures::stream::StreamExt;
use native_tls;
use native_tls::{Identity, TlsAcceptor, TlsConnector};
use std::io::Write;
@@ -12,6 +11,7 @@ use std::process::Command;
use std::ptr;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, Error, ErrorKind};
use tokio::net::{TcpListener, TcpStream};
+use tokio::stream::StreamExt;
use tokio_tls;
macro_rules! t {
diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml
index 7f40660a..c2b83565 100644
--- a/tokio-util/Cargo.toml
+++ b/tokio-util/Cargo.toml
@@ -26,7 +26,7 @@ default = []
# Shorthand for enabling everything
full = ["codec", "udp"]
-codec = []
+codec = ["tokio/stream"]
udp = ["tokio/udp"]
[dependencies]
diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs
index 0c5ef9f6..a3715c24 100644
--- a/tokio-util/src/codec/framed.rs
+++ b/tokio-util/src/codec/framed.rs
@@ -3,10 +3,9 @@ use crate::codec::encoder::Encoder;
use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
-use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use tokio::{io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::Stream};
use bytes::BytesMut;
-use futures_core::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::fmt;
diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs
index bd1f625b..ca9464b5 100644
--- a/tokio-util/src/codec/framed_read.rs
+++ b/tokio-util/src/codec/framed_read.rs
@@ -1,10 +1,9 @@
use crate::codec::framed::{Fuse, ProjectFuse};
use crate::codec::Decoder;
-use tokio::io::AsyncRead;
+use tokio::{io::AsyncRead, stream::Stream};
use bytes::BytesMut;
-use futures_core::Stream;
use futures_sink::Sink;
use log::trace;
use pin_project_lite::pin_project;
diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs
index 9aed7ea3..2ef91c7c 100644
--- a/tokio-util/src/codec/framed_write.rs
+++ b/tokio-util/src/codec/framed_write.rs
@@ -2,10 +2,10 @@ use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use crate::codec::framed::{Fuse, ProjectFuse};
-use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use tokio::{io::{AsyncBufRead, AsyncRead, AsyncWrite}, stream::Stream};
use bytes::BytesMut;
-use futures_core::{ready, Stream};
+use futures_core::ready;
use futures_sink::Sink;
use log::trace;
use pin_project_lite::pin_project;
diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs
index b162dd3a..4b1b86fb 100644
--- a/tokio-util/src/codec/mod.rs
+++ b/tokio-util/src/codec/mod.rs
@@ -6,8 +6,8 @@
//!
//! [`AsyncRead`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncRead.html
//! [`AsyncWrite`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncWrite.html
+//! [`Stream`]: https://docs.rs/tokio/*/tokio/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures-sink/*/futures_sink/trait.Sink.html
-//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
mod bytes_codec;
pub use self::bytes_codec::BytesCodec;
diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs
index a6c6f220..18ee106e 100644
--- a/tokio-util/src/udp/frame.rs
+++ b/tokio-util/src/udp/frame.rs
@@ -1,9 +1,9 @@
use crate::codec::{Decoder, Encoder};
-use tokio::net::UdpSocket;
+use tokio::{net::UdpSocket, stream::Stream};
use bytes::{BufMut, BytesMut};
-use futures_core::{ready, Stream};
+use futures_core::ready;
use futures_sink::Sink;
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs
index b98df736..cb82f8df 100644
--- a/tokio-util/tests/framed.rs
+++ b/tokio-util/tests/framed.rs
@@ -1,11 +1,10 @@
#![warn(rust_2018_idioms)]
-use tokio::prelude::*;
+use tokio::{prelude::*, stream::StreamExt};
use tokio_test::assert_ok;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};
use bytes::{Buf, BufMut, BytesMut};
-use futures::StreamExt;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs
index af8002bd..51c65c63 100644
--- a/tokio-util/tests/udp.rs
+++ b/tokio-util/tests/udp.rs
@@ -1,4 +1,4 @@
-use tokio::net::UdpSocket;
+use tokio::{net::UdpSocket, stream::StreamExt};
use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;
@@ -6,7 +6,6 @@ use bytes::{BufMut, BytesMut};
use futures::future::try_join;
use futures::future::FutureExt;
use futures::sink::SinkExt;
-use futures::stream::StreamExt;
use std::io;
#[tokio::test]
diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs
index d252eabb..06eed384 100644
--- a/tokio/src/fs/read_dir.rs
+++ b/tokio/src/fs/read_dir.rs
@@ -36,7 +36,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
///
/// [`read_dir`]: read_dir
/// [`DirEntry`]: DirEntry
-/// [`Stream`]: futures_core::Stream
+/// [`Stream`]: crate::stream::Stream
/// [`Err`]: std::result::Result::Err
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
@@ -85,7 +85,7 @@ impl ReadDir {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for ReadDir {
+impl crate::stream::Stream for ReadDir {
type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs
index 26ce28ca..b2930652 100644
--- a/tokio/src/io/util/async_buf_read_ext.rs
+++ b/tokio/src/io/util/async_buf_read_ext.rs
@@ -226,8 +226,8 @@ cfg_io_util! {
///
/// ```
/// use tokio::io::AsyncBufReadExt;
+ /// use tokio::stream::StreamExt;
///
- /// use futures::{StreamExt};
/// use std::io::Cursor;
///
/// #[tokio::main]
diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs
index 0f1d946a..f0e75de4 100644
--- a/tokio/src/io/util/lines.rs
+++ b/tokio/src/io/util/lines.rs
@@ -91,7 +91,7 @@ where
}
#[cfg(feature = "stream")]
-impl<R: AsyncBufRead> futures_core::Stream for Lines<R> {
+impl<R: AsyncBufRead> crate::stream::Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs
index a2e168de..f1ed2fd8 100644
--- a/tokio/src/io/util/split.rs
+++ b/tokio/src/io/util/split.rs
@@ -89,7 +89,7 @@ where
}
#[cfg(feature = "stream")]
-impl<R: AsyncBufRead> futures_core::Stream for Split<R> {
+impl<R: AsyncBufRead> crate::stream::Stream for Split<R> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 66fd7975..831a5241 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -241,6 +241,10 @@ cfg_signal! {
pub mod signal;
}
+cfg_stream! {
+ pub mod stream;
+}
+
cfg_sync! {
pub mod sync;
}
diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs
index 3033aefa..0abe047d 100644
--- a/tokio/src/net/tcp/incoming.rs
+++ b/tokio/src/net/tcp/incoming.rs
@@ -28,7 +28,7 @@ impl Incoming<'_> {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for Incoming<'_> {
+impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<TcpStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index 2cddf24b..3e0e0f7d 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -250,9 +250,7 @@ impl TcpListener {
/// # Examples
///
/// ```no_run
- /// use tokio::net::TcpListener;
- ///
- /// use futures::StreamExt;
+ /// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs
index dbe964a8..bede96dd 100644
--- a/tokio/src/net/unix/incoming.rs
+++ b/tokio/src/net/unix/incoming.rs
@@ -27,7 +27,7 @@ impl Incoming<'_> {
}
#[cfg(feature = "stream")]
-impl futures_core::Stream for Incoming<'_> {
+impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 311bae2c..4b93bf82 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -104,8 +104,7 @@ impl UnixListener {
///
/// ```no_run
/// use tokio::net::UnixListener;
- ///
- /// use futures::StreamExt;
+ /// use tokio::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index cd326424..bfa2e309 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -482,7 +482,7 @@ impl Signal {
}
cfg_stream! {
- impl futures_core::Stream for Signal {
+ impl crate::stream::Stream for Signal {
type Item = ();
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
index de35643e..def1a1d7 100644
--- a/tokio/src/signal/windows.rs
+++ b/tokio/src/signal/windows.rs
@@ -209,7 +209,7 @@ impl CtrlBreak {
}
cfg_stream! {
- impl futures_core::Stream for CtrlBreak {
+ impl crate::stream::Stream for CtrlBreak {
type Item = ();
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
@@ -246,9 +246,9 @@ pub fn ctrl_break() -> io::Result<CtrlBreak> {
mod tests {
use super::*;
use crate::runtime::Runtime;
- use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
+ use crate::stream::StreamExt;
- use futures::stream::StreamExt;
+ use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
#[test]
fn ctrl_c() {
diff --git a/tokio/src/stream/iter.rs b/tokio/src/stream/iter.rs
new file mode 100644
index 00000000..dc06495e
--- /dev/null
+++ b/tokio/src/stream/iter.rs
@@ -0,0 +1,52 @@
+use crate::stream::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`iter`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Iter<I> {
+ iter: I,
+}
+
+impl<I> Unpin for Iter<I> {}
+
+/// Converts an `Iterator` into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+///
+/// ```
+/// # async fn dox() {
+/// use tokio::stream::{self, StreamExt};
+///
+/// let mut stream = stream::iter(vec![17, 19]);
+///
+/// assert_eq!(stream.next().await, Some(17));
+/// assert_eq!(stream.next().await, Some(19));
+/// assert_eq!(stream.next().await, None);
+/// # }
+/// ```
+pub fn iter<I>(i: I) -> Iter<I::IntoIter>
+ where I: IntoIterator,
+{
+ Iter {
+ iter: i.into_iter(),
+ }
+}
+
+impl<I> Stream for Iter<I>
+ where I: Iterator,
+{
+ type Item = I::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<I::Item>> {
+ Poll::Ready(self.iter.next())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs
new file mode 100644
index 00000000..a89769de
--- /dev/null
+++ b/tokio/src/stream/map.rs
@@ -0,0 +1,57 @@
+use crate::stream::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`map`](super::StreamExt::map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Map<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
+}
+
+impl<St, F> fmt::Debug for Map<St, F>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Map")
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
+
+impl<St, T, F> Map<St, F>
+ where St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ pub(super) fn new(stream: St, f: F) -> Map<St, F> {
+ Map { stream, f }
+ }
+}
+
+impl<St, F, T> Stream for Map<St, F>
+ where St: Stream,
+ F: FnMut(St::Item) -> T,
+{
+ type Item = T;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T>> {
+ self.as_mut()
+ .project().stream
+ .poll_next(cx)
+ .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
new file mode 100644
index 00000000..04e6dc06
--- /dev/null
+++ b/tokio/src/stream/mod.rs
@@ -0,0 +1,93 @@
+//! Stream utilities for Tokio.
+//!
+//! `Stream`s are an asynchoronous version of standard library's Iterator.
+//!
+//! This module provides helpers to work with them.
+
+mod iter;
+pub use iter::{iter, Iter};
+
+mod map;
+use map::Map;
+
+mod next;
+use next::Next;
+
+pub use futures_core::Stream;
+
+/// An extension trait for `Stream`s that provides a variety of convenient
+/// combinator functions.
+pub trait StreamExt: Stream {
+ /// Creates a future that resolves to the next item in the stream.
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn next(&mut self) -> Option<Self::Item>;
+ /// ```
+ ///
+ /// Note that because `next` doesn't take ownership over the stream,
+ /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
+ /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
+ /// be done by boxing the stream using [`Box::pin`] or
+ /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
+ /// crate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let mut stream = stream::iter(1..=3);
+ ///
+ /// assert_eq!(stream.next().await, Some(1));
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, Some(3));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn next(&mut self) -> Next<'_, Self>
+ where
+ Self: Unpin,
+ {
+ Next::new(self)
+ }
+
+ /// Maps this stream's items to a different type, returning a new stream of
+ /// the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available. It is executed inline with calls to
+ /// [`poll_next`](Stream::poll_next).
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `map` methods in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let mut stream = stream.map(|x| x + 3);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn map<T, F>(