From 0ba6e9abdbe1b42997d183adf5a39488c9543200 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 11 Jan 2020 12:31:59 -0800 Subject: stream: add `StreamExt::merge` (#2091) Provides an equivalent to stream `select()` from futures-rs. `merge` best describes the operation (vs. `select`). `futures-rs` named the operation "select" for historical reasons and did not rename it back to `merge` in 0.3. The operation is most commonly named `merge` else where as well (e.g. ReactiveX). --- tokio/src/stream/merge.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++ tokio/src/stream/mod.rs | 76 +++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 tokio/src/stream/merge.rs (limited to 'tokio/src/stream') diff --git a/tokio/src/stream/merge.rs b/tokio/src/stream/merge.rs new file mode 100644 index 00000000..4850cd40 --- /dev/null +++ b/tokio/src/stream/merge.rs @@ -0,0 +1,97 @@ +use crate::stream::{Fuse, Stream}; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`merge`](super::StreamExt::merge) method. + pub struct Merge { + #[pin] + a: Fuse, + #[pin] + b: Fuse, + // When `true`, poll `a` first, otherwise, `poll` b`. + a_first: bool, + } +} + +impl Merge { + pub(super) fn new(a: T, b: U) -> Merge + where + T: Stream, + U: Stream, + { + Merge { + a: Fuse::new(a), + b: Fuse::new(b), + a_first: true, + } + } +} + +impl Stream for Merge +where + T: Stream, + U: Stream, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); + let a_first = *me.a_first; + + // Toggle the flag + *me.a_first = !a_first; + + if a_first { + poll_next(me.a, me.b, cx) + } else { + poll_next(me.b, me.a, cx) + } + } + + fn size_hint(&self) -> (usize, Option) { + let (a_lower, a_upper) = self.a.size_hint(); + let (b_lower, b_upper) = self.b.size_hint(); + + let upper = match (a_upper, b_upper) { + (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper), + _ => None, + }; + + (a_lower + b_lower, upper) + } +} + +fn poll_next( + first: Pin<&mut T>, + second: Pin<&mut U>, + cx: &mut Context<'_>, +) -> Poll> +where + T: Stream, + U: Stream, +{ + use Poll::*; + + let mut done = true; + + match first.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + match second.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + if done { + Ready(None) + } else { + Pending + } +} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index a204af3c..2bbb6802 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -25,6 +25,9 @@ pub use iter::{iter, Iter}; mod map; use map::Map; +mod merge; +use merge::Merge; + mod next; use next::Next; @@ -149,6 +152,79 @@ pub trait StreamExt: Stream { Map::new(self, f) } + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::StreamExt; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(basic_scheduler)] + /// async fn main() { + /// # time::pause(); + /// let (mut tx1, rx1) = mpsc::channel(10); + /// let (mut tx2, rx2) = mpsc::channel(10); + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::delay_for(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::delay_for(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::delay_for(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge(self, other: U) -> Merge + where + U: Stream, + Self: Sized, + { + Merge::new(self, other) + } + /// Filters the values produced by this stream according to the provided /// predicate. /// -- cgit v1.2.3