diff options
author | Tore Pettersen <toreskog@live.com> | 2020-02-01 23:03:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-01 14:03:34 -0800 |
commit | 1a5de2c79d9d825c367f7881646306fe7f9aaa0c (patch) | |
tree | 462a5de5f2059904fc4f3db9c30ed72ec1bbf7b2 /tokio/src/stream | |
parent | ab24a655adc1eb0d0c6951d5df2b815c671ac7d2 (diff) |
stream: add StreamExt::skip (#2204)
skip version of take
Refs: #2104
Diffstat (limited to 'tokio/src/stream')
-rw-r--r-- | tokio/src/stream/mod.rs | 28 | ||||
-rw-r--r-- | tokio/src/stream/skip.rs | 63 |
2 files changed, 91 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 3cc7e68f..2ee278bf 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -53,6 +53,9 @@ pub use pending::{pending, Pending}; mod stream_map; pub use stream_map::StreamMap; +mod skip; +use skip::Skip; + mod try_next; use try_next::TryNext; @@ -451,6 +454,31 @@ pub trait StreamExt: Stream { TakeWhile::new(self, f) } + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip<Self> + where + Self: Sized, + { + Skip::new(self, n) + } + /// Tests if every element of the stream matches a predicate. /// /// `all()` takes a closure that returns `true` or `false`. It applies diff --git a/tokio/src/stream/skip.rs b/tokio/src/stream/skip.rs new file mode 100644 index 00000000..39540cc9 --- /dev/null +++ b/tokio/src/stream/skip.rs @@ -0,0 +1,63 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip`](super::StreamExt::skip) method. + #[must_use = "streams do nothing unless polled"] + pub struct Skip<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Skip<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Skip") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Skip<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Skip<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if self.remaining == 0 { + return Poll::Ready(Some(e)); + } + *self.as_mut().project().remaining -= 1; + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_sub(self.remaining); + let upper = upper.map(|x| x.saturating_sub(self.remaining)); + + (lower, upper) + } +} |