diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-09 20:51:06 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-09 20:51:06 -0800 |
commit | cfd9b36d89e6b665c11248a86de8934cb4a7bdff (patch) | |
tree | e1b31ca2b1d94e31b0d6c4de7bc6b2bdb75af7fa /tokio/src/stream/mod.rs | |
parent | f5c20cd2280834191a86557e093389e6f1a983c3 (diff) |
stream: add `StreamExt::fuse` (#2085)
Diffstat (limited to 'tokio/src/stream/mod.rs')
-rw-r--r-- | tokio/src/stream/mod.rs | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index b427e3c4..a204af3c 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -16,6 +16,9 @@ use filter::Filter; mod filter_map; use filter_map::FilterMap; +mod fuse; +use fuse::Fuse; + mod iter; pub use iter::{iter, Iter}; @@ -222,6 +225,71 @@ pub trait StreamExt: Stream { FilterMap::new(self, f) } + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + Fuse::new(self) + } + /// Creates a new stream of at most `n` items of the underlying stream. /// /// Once `n` items have been yielded from this stream then it will always |