diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-11 13:52:51 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-11 13:52:51 -0800 |
commit | 64d23899118dfc8f1d4d7a9b60c015e43260df80 (patch) | |
tree | 8379048fc077bdcab3df7890f32310e7d3ee4d09 /tokio | |
parent | 8471e0a0ee7f6c973fb517ccb7efcf6c7e2ddc6f (diff) |
stream: add stream::once (#2094)
An async equivalent to `iter::once`
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/stream/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/stream/once.rs | 52 | ||||
-rw-r--r-- | tokio/tests/stream_once.rs | 12 |
3 files changed, 67 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index b7b02d02..fada4442 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -34,6 +34,9 @@ use merge::Merge; mod next; use next::Next; +mod once; +pub use once::{once, Once}; + mod pending; pub use pending::{pending, Pending}; diff --git a/tokio/src/stream/once.rs b/tokio/src/stream/once.rs new file mode 100644 index 00000000..04a642f3 --- /dev/null +++ b/tokio/src/stream/once.rs @@ -0,0 +1,52 @@ +use crate::stream::{self, Iter, Stream}; + +use core::option; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`once`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Once<T> { + iter: Iter<option::IntoIter<T>>, +} + +impl<I> Unpin for Once<I> {} + +/// Creates a stream that emits an element exactly once. +/// +/// The returned stream is immediately ready and emits the provided value once. +/// +/// # Examples +/// +/// ``` +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// // one is the loneliest number +/// let mut one = stream::once(1); +/// +/// assert_eq!(Some(1), one.next().await); +/// +/// // just one, that's all we get +/// assert_eq!(None, one.next().await); +/// } +/// ``` +pub fn once<T>(value: T) -> Once<T> { + Once { + iter: stream::iter(Some(value).into_iter()), + } +} + +impl<T> Stream for Once<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Pin::new(&mut self.iter).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/tokio/tests/stream_once.rs b/tokio/tests/stream_once.rs new file mode 100644 index 00000000..bb4635ac --- /dev/null +++ b/tokio/tests/stream_once.rs @@ -0,0 +1,12 @@ +use tokio::stream::{self, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut one = stream::once(1); + + assert_eq!(one.size_hint(), (1, Some(1))); + assert_eq!(Some(1), one.next().await); + + assert_eq!(one.size_hint(), (0, Some(0))); + assert_eq!(None, one.next().await); +} |