diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-11 12:32:19 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-11 12:32:19 -0800 |
commit | 8471e0a0ee7f6c973fb517ccb7efcf6c7e2ddc6f (patch) | |
tree | 16e9793243fdccfe4276a19ad4f3d536e9830c7a /tokio/src/stream | |
parent | 0ba6e9abdbe1b42997d183adf5a39488c9543200 (diff) |
stream: add `empty()` and `pending()` (#2092)
`stream::empty()` is the asynchronous equivalent to
`std::iter::empty()`. `pending()` provides a stream that never becomes
ready.
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/empty.rs | 48 | ||||
-rw-r--r-- | tokio/src/stream/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/stream/pending.rs | 52 |
3 files changed, 106 insertions, 0 deletions
diff --git a/tokio/src/stream/empty.rs b/tokio/src/stream/empty.rs new file mode 100644 index 00000000..a320d44b --- /dev/null +++ b/tokio/src/stream/empty.rs @@ -0,0 +1,48 @@ +use crate::stream::Stream; + +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`empty`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Empty<T>(PhantomData<T>); + +impl<T> Unpin for Empty<T> {} + +/// Creates a stream that yields nothing. +/// +/// The returned stream is immediately ready and returns `None`. Use +/// [`stream::pending()`](super::pending()) to obtain a stream that is never +/// ready. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut none = stream::empty::<i32>(); +/// +/// assert_eq!(None, none.next().await); +/// } +/// ``` +pub const fn empty<T>() -> Empty<T> { + Empty(PhantomData) +} + +impl<T> Stream for Empty<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { + Poll::Ready(None) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, Some(0)) + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 2bbb6802..b7b02d02 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -10,6 +10,9 @@ use all::AllFuture; mod any; use any::AnyFuture; +mod empty; +pub use empty::{empty, Empty}; + mod filter; use filter::Filter; @@ -31,6 +34,9 @@ use merge::Merge; mod next; use next::Next; +mod pending; +pub use pending::{pending, Pending}; + mod try_next; use try_next::TryNext; diff --git a/tokio/src/stream/pending.rs b/tokio/src/stream/pending.rs new file mode 100644 index 00000000..8d954a98 --- /dev/null +++ b/tokio/src/stream/pending.rs @@ -0,0 +1,52 @@ +use crate::stream::Stream; + +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`pending`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Pending<T>(PhantomData<T>); + +impl<T> Unpin for Pending<T> {} + +/// Creates a stream that is never ready +/// +/// The returned stream is never ready. Attempting to call +/// [`next()`](crate::stream::StreamExt::next) will never complete. Use +/// [`stream::empty()`](super::empty()) to obtain a stream that is is +/// immediately empty but returns no values. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ```no_run +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut never = stream::empty::<i32>(); +/// +/// // This will never complete +/// never.next().await; +/// +/// unreachable!(); +/// } +/// ``` +pub const fn pending<T>() -> Pending<T> { + Pending(PhantomData) +} + +impl<T> Stream for Pending<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, None) + } +} |