summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 12:32:19 -0800
committerGitHub <noreply@github.com>2020-01-11 12:32:19 -0800
commit8471e0a0ee7f6c973fb517ccb7efcf6c7e2ddc6f (patch)
tree16e9793243fdccfe4276a19ad4f3d536e9830c7a
parent0ba6e9abdbe1b42997d183adf5a39488c9543200 (diff)
stream: add `empty()` and `pending()` (#2092)
`stream::empty()` is the asynchronous equivalent to `std::iter::empty()`. `pending()` provides a stream that never becomes ready.
-rw-r--r--tokio/src/stream/empty.rs48
-rw-r--r--tokio/src/stream/mod.rs6
-rw-r--r--tokio/src/stream/pending.rs52
-rw-r--r--tokio/tests/stream_empty.rs11
-rw-r--r--tokio/tests/stream_pending.rs14
5 files changed, 131 insertions, 0 deletions
diff --git a/tokio/src/stream/empty.rs b/tokio/src/stream/empty.rs
new file mode 100644
index 00000000..a320d44b
--- /dev/null
+++ b/tokio/src/stream/empty.rs
@@ -0,0 +1,48 @@
+use crate::stream::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`empty`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Empty<T>(PhantomData<T>);
+
+impl<T> Unpin for Empty<T> {}
+
+/// Creates a stream that yields nothing.
+///
+/// The returned stream is immediately ready and returns `None`. Use
+/// [`stream::pending()`](super::pending()) to obtain a stream that is never
+/// ready.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// use tokio::stream::{self, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut none = stream::empty::<i32>();
+///
+/// assert_eq!(None, none.next().await);
+/// }
+/// ```
+pub const fn empty<T>() -> Empty<T> {
+ Empty(PhantomData)
+}
+
+impl<T> Stream for Empty<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+ Poll::Ready(None)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, Some(0))
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index 2bbb6802..b7b02d02 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -10,6 +10,9 @@ use all::AllFuture;
mod any;
use any::AnyFuture;
+mod empty;
+pub use empty::{empty, Empty};
+
mod filter;
use filter::Filter;
@@ -31,6 +34,9 @@ use merge::Merge;
mod next;
use next::Next;
+mod pending;
+pub use pending::{pending, Pending};
+
mod try_next;
use try_next::TryNext;
diff --git a/tokio/src/stream/pending.rs b/tokio/src/stream/pending.rs
new file mode 100644
index 00000000..8d954a98
--- /dev/null
+++ b/tokio/src/stream/pending.rs
@@ -0,0 +1,52 @@
+use crate::stream::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`pending`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Pending<T>(PhantomData<T>);
+
+impl<T> Unpin for Pending<T> {}
+
+/// Creates a stream that is never ready
+///
+/// The returned stream is never ready. Attempting to call
+/// [`next()`](crate::stream::StreamExt::next) will never complete. Use
+/// [`stream::empty()`](super::empty()) to obtain a stream that is is
+/// immediately empty but returns no values.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```no_run
+/// use tokio::stream::{self, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut never = stream::empty::<i32>();
+///
+/// // This will never complete
+/// never.next().await;
+///
+/// unreachable!();
+/// }
+/// ```
+pub const fn pending<T>() -> Pending<T> {
+ Pending(PhantomData)
+}
+
+impl<T> Stream for Pending<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, None)
+ }
+}
diff --git a/tokio/tests/stream_empty.rs b/tokio/tests/stream_empty.rs
new file mode 100644
index 00000000..f278076d
--- /dev/null
+++ b/tokio/tests/stream_empty.rs
@@ -0,0 +1,11 @@
+use tokio::stream::{self, Stream, StreamExt};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut stream = stream::empty::<i32>();
+
+ for _ in 0..2 {
+ assert_eq!(stream.size_hint(), (0, Some(0)));
+ assert_eq!(None, stream.next().await);
+ }
+}
diff --git a/tokio/tests/stream_pending.rs b/tokio/tests/stream_pending.rs
new file mode 100644
index 00000000..f4d3080d
--- /dev/null
+++ b/tokio/tests/stream_pending.rs
@@ -0,0 +1,14 @@
+use tokio::stream::{self, Stream, StreamExt};
+use tokio_test::{assert_pending, task};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut stream = stream::pending::<i32>();
+
+ for _ in 0..2 {
+ assert_eq!(stream.size_hint(), (0, None));
+
+ let mut next = task::spawn(async { stream.next().await });
+ assert_pending!(next.poll());
+ }
+}