From 0545b349e1fbf6663eb628cd4273c5baddf232fe Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Thu, 23 Jan 2020 18:03:10 +0100 Subject: stream: add `StreamExt::fold()` (#2122) --- tokio/src/stream/fold.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ tokio/src/stream/mod.rs | 27 +++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 tokio/src/stream/fold.rs (limited to 'tokio/src/stream') diff --git a/tokio/src/stream/fold.rs b/tokio/src/stream/fold.rs new file mode 100644 index 00000000..7b9fead3 --- /dev/null +++ b/tokio/src/stream/fold.rs @@ -0,0 +1,51 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future returned by the [`fold`](super::StreamExt::fold) method. + #[derive(Debug)] + pub struct FoldFuture { + #[pin] + stream: St, + acc: Option, + f: F, + } +} + +impl FoldFuture { + pub(super) fn new(stream: St, init: B, f: F) -> Self { + Self { + stream, + acc: Some(init), + f, + } + } +} + +impl Future for FoldFuture +where + St: Stream, + F: FnMut(B, St::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut me = self.project(); + loop { + let next = ready!(me.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let old = me.acc.take().unwrap(); + let new = (me.f)(old, v); + *me.acc = Some(new); + } + None => return Poll::Ready(me.acc.take().unwrap()), + } + } + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 081fe817..3c09b658 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -26,6 +26,9 @@ use filter::Filter; mod filter_map; use filter_map::FilterMap; +mod fold; +use fold::FoldFuture; + mod fuse; use fuse::Fuse; @@ -582,6 +585,30 @@ pub trait StreamExt: Stream { Chain::new(self, other) } + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// # Examples + /// Basic usage: + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, *}; + /// + /// let s = stream::iter(vec![1u8, 2, 3]); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # } + /// ``` + fn fold(self, init: B, f: F) -> FoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + /// Drain stream pushing all emitted values into a collection. /// /// `collect` streams all values, awaiting as needed. Values are pushed into -- cgit v1.2.3