summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream/mod.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-09 20:51:06 -0800
committerGitHub <noreply@github.com>2020-01-09 20:51:06 -0800
commitcfd9b36d89e6b665c11248a86de8934cb4a7bdff (patch)
treee1b31ca2b1d94e31b0d6c4de7bc6b2bdb75af7fa /tokio/src/stream/mod.rs
parentf5c20cd2280834191a86557e093389e6f1a983c3 (diff)
stream: add `StreamExt::fuse` (#2085)
Diffstat (limited to 'tokio/src/stream/mod.rs')
-rw-r--r--tokio/src/stream/mod.rs68
1 files changed, 68 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index b427e3c4..a204af3c 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -16,6 +16,9 @@ use filter::Filter;
mod filter_map;
use filter_map::FilterMap;
+mod fuse;
+use fuse::Fuse;
+
mod iter;
pub use iter::{iter, Iter};
@@ -222,6 +225,71 @@ pub trait StreamExt: Stream {
FilterMap::new(self, f)
}
+ /// Creates a stream which ends after the first `None`.
+ ///
+ /// After a stream returns `None`, behavior is undefined. Future calls to
+ /// `poll_next` may or may not return `Some(T)` again or they may panic.
+ /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
+ /// return `None` forever.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::{Stream, StreamExt};
+ ///
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ ///
+ /// // a stream which alternates between Some and None
+ /// struct Alternate {
+ /// state: i32,
+ /// }
+ ///
+ /// impl Stream for Alternate {
+ /// type Item = i32;
+ ///
+ /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
+ /// let val = self.state;
+ /// self.state = self.state + 1;
+ ///
+ /// // if it's even, Some(i32), else None
+ /// if val % 2 == 0 {
+ /// Poll::Ready(Some(val))
+ /// } else {
+ /// Poll::Ready(None)
+ /// }
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut stream = Alternate { state: 0 };
+ ///
+ /// // the stream goes back and forth
+ /// assert_eq!(stream.next().await, Some(0));
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, Some(2));
+ /// assert_eq!(stream.next().await, None);
+ ///
+ /// // however, once it is fused
+ /// let mut stream = stream.fuse();
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, None);
+ ///
+ /// // it will always return `None` after the first time.
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, None);
+ /// assert_eq!(stream.next().await, None);
+ /// }
+ /// ```
+ fn fuse(self) -> Fuse<Self>
+ where
+ Self: Sized,
+ {
+ Fuse::new(self)
+ }
+
/// Creates a new stream of at most `n` items of the underlying stream.
///
/// Once `n` items have been yielded from this stream then it will always