summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 13:52:51 -0800
committerGitHub <noreply@github.com>2020-01-11 13:52:51 -0800
commit64d23899118dfc8f1d4d7a9b60c015e43260df80 (patch)
tree8379048fc077bdcab3df7890f32310e7d3ee4d09 /tokio
parent8471e0a0ee7f6c973fb517ccb7efcf6c7e2ddc6f (diff)
stream: add stream::once (#2094)
An async equivalent to `iter::once`
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/stream/mod.rs3
-rw-r--r--tokio/src/stream/once.rs52
-rw-r--r--tokio/tests/stream_once.rs12
3 files changed, 67 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index b7b02d02..fada4442 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -34,6 +34,9 @@ use merge::Merge;
mod next;
use next::Next;
+mod once;
+pub use once::{once, Once};
+
mod pending;
pub use pending::{pending, Pending};
diff --git a/tokio/src/stream/once.rs b/tokio/src/stream/once.rs
new file mode 100644
index 00000000..04a642f3
--- /dev/null
+++ b/tokio/src/stream/once.rs
@@ -0,0 +1,52 @@
+use crate::stream::{self, Iter, Stream};
+
+use core::option;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`once`] function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Once<T> {
+ iter: Iter<option::IntoIter<T>>,
+}
+
+impl<I> Unpin for Once<I> {}
+
+/// Creates a stream that emits an element exactly once.
+///
+/// The returned stream is immediately ready and emits the provided value once.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::stream::{self, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// // one is the loneliest number
+/// let mut one = stream::once(1);
+///
+/// assert_eq!(Some(1), one.next().await);
+///
+/// // just one, that's all we get
+/// assert_eq!(None, one.next().await);
+/// }
+/// ```
+pub fn once<T>(value: T) -> Once<T> {
+ Once {
+ iter: stream::iter(Some(value).into_iter()),
+ }
+}
+
+impl<T> Stream for Once<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Pin::new(&mut self.iter).poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.iter.size_hint()
+ }
+}
diff --git a/tokio/tests/stream_once.rs b/tokio/tests/stream_once.rs
new file mode 100644
index 00000000..bb4635ac
--- /dev/null
+++ b/tokio/tests/stream_once.rs
@@ -0,0 +1,12 @@
+use tokio::stream::{self, Stream, StreamExt};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut one = stream::once(1);
+
+ assert_eq!(one.size_hint(), (1, Some(1)));
+ assert_eq!(Some(1), one.next().await);
+
+ assert_eq!(one.size_hint(), (0, Some(0)));
+ assert_eq!(None, one.next().await);
+}