summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 12:32:19 -0800
committerGitHub <noreply@github.com>2020-01-11 12:32:19 -0800
commit8471e0a0ee7f6c973fb517ccb7efcf6c7e2ddc6f (patch)
tree16e9793243fdccfe4276a19ad4f3d536e9830c7a /tokio/src/stream
parent0ba6e9abdbe1b42997d183adf5a39488c9543200 (diff)
stream: add `empty()` and `pending()` (#2092)
`stream::empty()` is the asynchronous equivalent to `std::iter::empty()`. `pending()` provides a stream that never becomes ready.
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/empty.rs48
-rw-r--r--tokio/src/stream/mod.rs6
-rw-r--r--tokio/src/stream/pending.rs52
3 files changed, 106 insertions, 0 deletions
diff --git a/tokio/src/stream/empty.rs b/tokio/src/stream/empty.rs
new file mode 100644
index 00000000..a320d44b
--- /dev/null
+++ b/tokio/src/stream/empty.rs
@@ -0,0 +1,48 @@
+use crate::stream::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`empty`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Empty<T>(PhantomData<T>);
+
+impl<T> Unpin for Empty<T> {}
+
+/// Creates a stream that yields nothing.
+///
+/// The returned stream is immediately ready and returns `None`. Use
+/// [`stream::pending()`](super::pending()) to obtain a stream that is never
+/// ready.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// use tokio::stream::{self, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut none = stream::empty::<i32>();
+///
+/// assert_eq!(None, none.next().await);
+/// }
+/// ```
+pub const fn empty<T>() -> Empty<T> {
+ Empty(PhantomData)
+}
+
+impl<T> Stream for Empty<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+ Poll::Ready(None)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, Some(0))
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index 2bbb6802..b7b02d02 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -10,6 +10,9 @@ use all::AllFuture;
mod any;
use any::AnyFuture;
+mod empty;
+pub use empty::{empty, Empty};
+
mod filter;
use filter::Filter;
@@ -31,6 +34,9 @@ use merge::Merge;
mod next;
use next::Next;
+mod pending;
+pub use pending::{pending, Pending};
+
mod try_next;
use try_next::TryNext;
diff --git a/tokio/src/stream/pending.rs b/tokio/src/stream/pending.rs
new file mode 100644
index 00000000..8d954a98
--- /dev/null
+++ b/tokio/src/stream/pending.rs
@@ -0,0 +1,52 @@
+use crate::stream::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`pending`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Pending<T>(PhantomData<T>);
+
+impl<T> Unpin for Pending<T> {}
+
+/// Creates a stream that is never ready
+///
+/// The returned stream is never ready. Attempting to call
+/// [`next()`](crate::stream::StreamExt::next) will never complete. Use
+/// [`stream::empty()`](super::empty()) to obtain a stream that is is
+/// immediately empty but returns no values.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```no_run
+/// use tokio::stream::{self, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut never = stream::empty::<i32>();
+///
+/// // This will never complete
+/// never.next().await;
+///
+/// unreachable!();
+/// }
+/// ```
+pub const fn pending<T>() -> Pending<T> {
+ Pending(PhantomData)
+}
+
+impl<T> Stream for Pending<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (0, None)
+ }
+}