summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-09 20:51:06 -0800
committerGitHub <noreply@github.com>2020-01-09 20:51:06 -0800
commitcfd9b36d89e6b665c11248a86de8934cb4a7bdff (patch)
treee1b31ca2b1d94e31b0d6c4de7bc6b2bdb75af7fa /tokio/src/stream
parentf5c20cd2280834191a86557e093389e6f1a983c3 (diff)
stream: add `StreamExt::fuse` (#2085)
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/fuse.rs53
-rw-r--r--tokio/src/stream/mod.rs68
2 files changed, 121 insertions, 0 deletions
diff --git a/tokio/src/stream/fuse.rs b/tokio/src/stream/fuse.rs
new file mode 100644
index 00000000..6c9e02d6
--- /dev/null
+++ b/tokio/src/stream/fuse.rs
@@ -0,0 +1,53 @@
+use crate::stream::Stream;
+
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// Stream returned by [`fuse()`][super::StreamExt::fuse].
+ #[derive(Debug)]
+ pub struct Fuse<T> {
+ #[pin]
+ stream: Option<T>,
+ }
+}
+
+impl<T> Fuse<T>
+where
+ T: Stream,
+{
+ pub(crate) fn new(stream: T) -> Fuse<T> {
+ Fuse {
+ stream: Some(stream),
+ }
+ }
+}
+
+impl<T> Stream for Fuse<T>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let res = match Option::as_pin_mut(self.as_mut().project().stream) {
+ Some(stream) => ready!(stream.poll_next(cx)),
+ None => return Poll::Ready(None),
+ };
+
+ if res.is_none() {
+ // Do not poll the stream anymore
+ self.as_mut().project().stream.set(None);
+ }
+
+ Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self.stream {
+ Some(ref stream) => stream.size_hint(),
+ None => (0, Some(0)),
+ }
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index b427e3c4..a204af3c 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -16,6 +16,9 @@ use filter::Filter;
mod filter_map;
use filter_map::FilterMap;
+mod fuse;
+use fuse::Fuse;
+
mod iter;
pub use iter::{iter, Iter};
@@ -222,6 +225,71 @@ pub trait StreamExt: Stream {
FilterMap::new(self, f)
}
+ /// Creates a stream which ends after the first `None`.
+ ///
+ /// After a stream returns `None`, behavior is undefined. Future calls to
+ /// `poll_next` may or may not return `Some(T)` again or they may panic.
+ /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
+ /// return `None` forever.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{Stream, StreamExt};
+ ///
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ ///
+ /// // a stream which alternates between Some and None
+ /// struct Alternate {
+ /// state: i32,
+ /// }
+ ///
+ /// impl Stream for Alternate {
+ /// type Item = i32;
+ ///
+ /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
+ /// let val = self.state;
+ /// self.state = self.state + 1;
+ ///
+ /// // if it's even, Some(i32), else None
+ /// if val % 2 == 0 {
+ /// Poll::Ready(Some(val))
+ /// } else {
+ /// Poll::Ready(None)
+ /// }
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut stream = Alternate { state: 0 };
+ ///
+ /// // the stream goes back and forth
+ /// assert_eq!(stream.next().await, Some(0));
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, None);
+ ///
+ /// // however, once it is fused
+ /// let mut stream = stream.fuse();
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, None);
+ ///
+ /// // it will always return `None` after the first time.
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, None);
+ /// }
+ /// ```
+ fn fuse(self) -> Fuse<Self>
+ where
+ Self: Sized,
+ {
+ Fuse::new(self)
+ }
+
/// Creates a new stream of at most `n` items of the underlying stream.
///
/// Once `n` items have been yielded from this stream then it will always