diff options
author | Tore Pettersen <toreskog@live.com> | 2020-02-02 20:36:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 11:36:41 -0800 |
commit | 513671f8dece002191feb4f2b1a97bd66306350c (patch) | |
tree | ee64583a3d0dc2b48d4e440fe7bf0de11cad216b /tokio/src/stream | |
parent | 79e4514283f08dfbf14efede84582b4c665c2aea (diff) |
stream: add StreamExt::skip_while (#2205)
async version of Iterator::skip_while
Refs: #2104
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/mod.rs | 34 | ||||
-rw-r--r-- | tokio/src/stream/skip_while.rs | 73 |
2 files changed, 107 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 2ee278bf..307ead5f 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -56,6 +56,9 @@ pub use stream_map::StreamMap; mod skip; use skip::Skip; +mod skip_while; +use skip_while::SkipWhile; + mod try_next; use try_next::TryNext; @@ -479,6 +482,37 @@ pub trait StreamExt: Stream { Skip::new(self, n) } + /// Skip elements from the underlying stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false, the rest of the elements will be yielded. + /// + /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); + /// + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(Some(4), stream.next().await); + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + SkipWhile::new(self, f) + } + /// Tests if every element of the stream matches a predicate. /// /// `all()` takes a closure that returns `true` or `false`. It applies diff --git a/tokio/src/stream/skip_while.rs b/tokio/src/stream/skip_while.rs new file mode 100644 index 00000000..4e050070 --- /dev/null +++ b/tokio/src/stream/skip_while.rs @@ -0,0 +1,73 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct SkipWhile<St, F> { + #[pin] + stream: St, + predicate: Option<F>, + } +} + +impl<St, F> fmt::Debug for SkipWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SkipWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> SkipWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate: Some(predicate), + } + } +} + +impl<St, F> Stream for SkipWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + if let Some(predicate) = this.predicate { + loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + if !(predicate)(&item) { + *this.predicate = None; + return Poll::Ready(Some(item)); + } + } + None => return Poll::Ready(None), + } + } + } else { + this.stream.poll_next(cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + if self.predicate.is_some() { + return (0, upper); + } + + (lower, upper) + } +} |