summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 12:31:59 -0800
committerGitHub <noreply@github.com>2020-01-11 12:31:59 -0800
commit0ba6e9abdbe1b42997d183adf5a39488c9543200 (patch)
tree66e10180e665d4598e256cfb61f2c19160f655cd /tokio/src/stream
parenta939dc48b0da9b62e361f4e82e79f48e70caa4be (diff)
stream: add `StreamExt::merge` (#2091)
Provides an equivalent to stream `select()` from futures-rs. `merge` best describes the operation (vs. `select`). `futures-rs` named the operation "select" for historical reasons and did not rename it back to `merge` in 0.3. The operation is most commonly named `merge` else where as well (e.g. ReactiveX).
Diffstat (limited to 'tokio/src/stream')
-rw-r--r--tokio/src/stream/merge.rs97
-rw-r--r--tokio/src/stream/mod.rs76
2 files changed, 173 insertions, 0 deletions
diff --git a/tokio/src/stream/merge.rs b/tokio/src/stream/merge.rs
new file mode 100644
index 00000000..4850cd40
--- /dev/null
+++ b/tokio/src/stream/merge.rs
@@ -0,0 +1,97 @@
+use crate::stream::{Fuse, Stream};
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`merge`](super::StreamExt::merge) method.
+ pub struct Merge<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: Fuse<U>,
+ // When `true`, poll `a` first, otherwise, `poll` b`.
+ a_first: bool,
+ }
+}
+
+impl<T, U> Merge<T, U> {
+ pub(super) fn new(a: T, b: U) -> Merge<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Merge {
+ a: Fuse::new(a),
+ b: Fuse::new(b),
+ a_first: true,
+ }
+ }
+}
+
+impl<T, U> Stream for Merge<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let me = self.project();
+ let a_first = *me.a_first;
+
+ // Toggle the flag
+ *me.a_first = !a_first;
+
+ if a_first {
+ poll_next(me.a, me.b, cx)
+ } else {
+ poll_next(me.b, me.a, cx)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (a_lower, a_upper) = self.a.size_hint();
+ let (b_lower, b_upper) = self.b.size_hint();
+
+ let upper = match (a_upper, b_upper) {
+ (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper),
+ _ => None,
+ };
+
+ (a_lower + b_lower, upper)
+ }
+}
+
+fn poll_next<T, U>(
+ first: Pin<&mut T>,
+ second: Pin<&mut U>,
+ cx: &mut Context<'_>,
+) -> Poll<Option<T::Item>>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ use Poll::*;
+
+ let mut done = true;
+
+ match first.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ match second.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ if done {
+ Ready(None)
+ } else {
+ Pending
+ }
+}
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index a204af3c..2bbb6802 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -25,6 +25,9 @@ pub use iter::{iter, Iter};
mod map;
use map::Map;
+mod merge;
+use merge::Merge;
+
mod next;
use next::Next;
@@ -149,6 +152,79 @@ pub trait StreamExt: Stream {
Map::new(self, f)
}
+ /// Combine two streams into one by interleaving the output of both as it
+ /// is produced.
+ ///
+ /// Values are produced from the merged stream in the order they arrive from
+ /// the two source streams. If both source streams provide values
+ /// simultaneously, the merge stream alternates between them. This provides
+ /// some level of fairness.
+ ///
+ /// The merged stream completes once **both** source streams complete. When
+ /// one source stream completes before the other, the merge stream
+ /// exclusively polls the remaining stream.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::StreamExt;
+ /// use tokio::sync::mpsc;
+ /// use tokio::time;
+ ///
+ /// use std::time::Duration;
+ ///
+ /// # /*
+ /// #[tokio::main]
+ /// # */
+ /// # #[tokio::main(basic_scheduler)]
+ /// async fn main() {
+ /// # time::pause();
+ /// let (mut tx1, rx1) = mpsc::channel(10);
+ /// let (mut tx2, rx2) = mpsc::channel(10);
+ ///
+ /// let mut rx = rx1.merge(rx2);
+ ///
+ /// tokio::spawn(async move {
+ /// // Send some values immediately
+ /// tx1.send(1).await.unwrap();
+ /// tx1.send(2).await.unwrap();
+ ///
+ /// // Let the other task send values
+ /// time::delay_for(Duration::from_millis(20)).await;
+ ///
+ /// tx1.send(4).await.unwrap();
+ /// });
+ ///
+ /// tokio::spawn(async move {
+ /// // Wait for the first task to send values
+ /// time::delay_for(Duration::from_millis(5)).await;
+ ///
+ /// tx2.send(3).await.unwrap();
+ ///
+ /// time::delay_for(Duration::from_millis(25)).await;
+ ///
+ /// // Send the final value
+ /// tx2.send(5).await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(1, rx.next().await.unwrap());
+ /// assert_eq!(2, rx.next().await.unwrap());
+ /// assert_eq!(3, rx.next().await.unwrap());
+ /// assert_eq!(4, rx.next().await.unwrap());
+ /// assert_eq!(5, rx.next().await.unwrap());
+ ///
+ /// // The merged stream is consumed
+ /// assert!(rx.next().await.is_none());
+ /// }
+ /// ```
+ fn merge<U>(self, other: U) -> Merge<Self, U>
+ where
+ U: Stream<Item = Self::Item>,
+ Self: Sized,
+ {
+ Merge::new(self, other)
+ }
+
/// Filters the values produced by this stream according to the provided
/// predicate.
///