diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-09 20:51:06 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-09 20:51:06 -0800 |
commit | cfd9b36d89e6b665c11248a86de8934cb4a7bdff (patch) | |
tree | e1b31ca2b1d94e31b0d6c4de7bc6b2bdb75af7fa /tokio/src/stream | |
parent | f5c20cd2280834191a86557e093389e6f1a983c3 (diff) |
stream: add `StreamExt::fuse` (#2085)
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/fuse.rs | 53 | ||||
-rw-r--r-- | tokio/src/stream/mod.rs | 68 |
2 files changed, 121 insertions, 0 deletions
diff --git a/tokio/src/stream/fuse.rs b/tokio/src/stream/fuse.rs new file mode 100644 index 00000000..6c9e02d6 --- /dev/null +++ b/tokio/src/stream/fuse.rs @@ -0,0 +1,53 @@ +use crate::stream::Stream; + +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Stream returned by [`fuse()`][super::StreamExt::fuse]. + #[derive(Debug)] + pub struct Fuse<T> { + #[pin] + stream: Option<T>, + } +} + +impl<T> Fuse<T> +where + T: Stream, +{ + pub(crate) fn new(stream: T) -> Fuse<T> { + Fuse { + stream: Some(stream), + } + } +} + +impl<T> Stream for Fuse<T> +where + T: Stream, +{ + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let res = match Option::as_pin_mut(self.as_mut().project().stream) { + Some(stream) => ready!(stream.poll_next(cx)), + None => return Poll::Ready(None), + }; + + if res.is_none() { + // Do not poll the stream anymore + self.as_mut().project().stream.set(None); + } + + Poll::Ready(res) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self.stream { + Some(ref stream) => stream.size_hint(), + None => (0, Some(0)), + } + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index b427e3c4..a204af3c 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -16,6 +16,9 @@ use filter::Filter; mod filter_map; use filter_map::FilterMap; +mod fuse; +use fuse::Fuse; + mod iter; pub use iter::{iter, Iter}; @@ -222,6 +225,71 @@ pub trait StreamExt: Stream { FilterMap::new(self, f) } + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + Fuse::new(self) + } + /// Creates a new stream of at most `n` items of the underlying stream. /// /// Once `n` items have been yielded from this stream then it will always |