summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2020-01-06 21:26:53 +0300
committerCarl Lerche <me@carllerche.com>2020-01-06 10:26:53 -0800
commit3540c5b9ee23e29eb04bfefcf4500741555f2141 (patch)
tree52fe6843704e6f3d4c365e2523771c6577d459ae /tokio/src/stream
parent188fc6e0d24acf2cf1b51209e058a5c1a1d50dca (diff)
stream: Add StreamExt::any (#2034)
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/any.rs45
-rw-r--r--tokio/src/stream/mod.rs56
2 files changed, 101 insertions, 0 deletions
diff --git a/tokio/src/stream/any.rs b/tokio/src/stream/any.rs
new file mode 100644
index 00000000..f2ecad5e
--- /dev/null
+++ b/tokio/src/stream/any.rs
@@ -0,0 +1,45 @@
+use crate::stream::Stream;
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Future for the [`any`](super::StreamExt::any) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct AnyFuture<'a, St: ?Sized, F> {
+ stream: &'a mut St,
+ f: F,
+}
+
+impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> {
+ pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+ Self { stream, f }
+ }
+}
+
+impl<St: ?Sized + Unpin, F> Unpin for AnyFuture<'_, St, F> {}
+
+impl<St, F> Future for AnyFuture<'_, St, F>
+where
+ St: ?Sized + Stream + Unpin,
+ F: FnMut(St::Item) -> bool,
+{
+ type Output = bool;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
+
+ match next {
+ Some(v) => {
+ if (&mut self.f)(v) {
+ Poll::Ready(true)
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ None => Poll::Ready(false),
+ }
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index a99dfca3..94ed6bd1 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -7,6 +7,9 @@
mod all;
use all::AllFuture;
+mod any;
+use any::AnyFuture;
+
mod filter;
use filter::Filter;
@@ -322,6 +325,59 @@ pub trait StreamExt: Stream {
{
AllFuture::new(self, f)
}
+
+ /// Tests if any element of the stream matches a predicate.
+ ///
+ /// `any()` takes a closure that returns `true` or `false`. It applies
+ /// this closure to each element of the stream, and if any of them return
+ /// `true`, then so does `any()`. If they all return `false`, it
+ /// returns `false`.
+ ///
+ /// `any()` is short-circuiting; in other words, it will stop processing
+ /// as soon as it finds a `true`, given that no matter what else happens,
+ /// the result will also be `true`.
+ ///
+ /// An empty stream returns `false`.
+ ///
+ /// Basic usage:
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let a = [1, 2, 3];
+ ///
+ /// assert!(stream::iter(&a).any(|&x| x > 0).await);
+ ///
+ /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
+ /// # }
+ /// ```
+ ///
+ /// Stopping at the first `true`:
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio::stream::{self, StreamExt};
+ ///
+ /// let a = [1, 2, 3];
+ ///
+ /// let mut iter = stream::iter(&a);
+ ///
+ /// assert!(iter.any(|&x| x != 2).await);
+ ///
+ /// // we can still use `iter`, as there are more elements.
+ /// assert_eq!(iter.next().await, Some(&2));
+ /// # }
+ /// ```
+ fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
+ where
+ Self: Unpin,
+ F: FnMut(Self::Item) -> bool,
+ {
+ AnyFuture::new(self, f)
+ }
}
impl<St: ?Sized> StreamExt for St where St: Stream {}