From 3540c5b9ee23e29eb04bfefcf4500741555f2141 Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Mon, 6 Jan 2020 21:26:53 +0300 Subject: stream: Add StreamExt::any (#2034) --- tokio/src/stream/any.rs | 45 +++++++++++++++++++++++++++++++++++++++ tokio/src/stream/mod.rs | 56 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 tokio/src/stream/any.rs (limited to 'tokio/src/stream') diff --git a/tokio/src/stream/any.rs b/tokio/src/stream/any.rs new file mode 100644 index 00000000..f2ecad5e --- /dev/null +++ b/tokio/src/stream/any.rs @@ -0,0 +1,45 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`any`](super::StreamExt::any) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct AnyFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, +} + +impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { stream, f } + } +} + +impl Unpin for AnyFuture<'_, St, F> {} + +impl Future for AnyFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + + match next { + Some(v) => { + if (&mut self.f)(v) { + Poll::Ready(true) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + None => Poll::Ready(false), + } + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index a99dfca3..94ed6bd1 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -7,6 +7,9 @@ mod all; use all::AllFuture; +mod any; +use any::AnyFuture; + mod filter; use filter::Filter; @@ -322,6 +325,59 @@ pub trait StreamExt: Stream { { AllFuture::new(self, f) } + + /// Tests if any element of the stream matches a predicate. + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).any(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).any(|&x| x > 5).await); + /// # } + /// ``` + /// + /// Stopping at the first `true`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(iter.any(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&2)); + /// # } + /// ``` + fn any(&mut self, f: F) -> AnyFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AnyFuture::new(self, f) + } } impl StreamExt for St where St: Stream {} -- cgit v1.2.3