summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream/mod.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 12:31:59 -0800
committerGitHub <noreply@github.com>2020-01-11 12:31:59 -0800
commit0ba6e9abdbe1b42997d183adf5a39488c9543200 (patch)
tree66e10180e665d4598e256cfb61f2c19160f655cd /tokio/src/stream/mod.rs
parenta939dc48b0da9b62e361f4e82e79f48e70caa4be (diff)
stream: add `StreamExt::merge` (#2091)
Provides an equivalent to stream `select()` from futures-rs. `merge` best describes the operation (vs. `select`). `futures-rs` named the operation "select" for historical reasons and did not rename it back to `merge` in 0.3. The operation is most commonly named `merge` else where as well (e.g. ReactiveX).
Diffstat (limited to 'tokio/src/stream/mod.rs')
-rw-r--r--tokio/src/stream/mod.rs76
1 files changed, 76 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs
index a204af3c..2bbb6802 100644
--- a/tokio/src/stream/mod.rs
+++ b/tokio/src/stream/mod.rs
@@ -25,6 +25,9 @@ pub use iter::{iter, Iter};
mod map;
use map::Map;
+mod merge;
+use merge::Merge;
+
mod next;
use next::Next;
@@ -149,6 +152,79 @@ pub trait StreamExt: Stream {
Map::new(self, f)
}
+ /// Combine two streams into one by interleaving the output of both as it
+ /// is produced.
+ ///
+ /// Values are produced from the merged stream in the order they arrive from
+ /// the two source streams. If both source streams provide values
+ /// simultaneously, the merge stream alternates between them. This provides
+ /// some level of fairness.
+ ///
+ /// The merged stream completes once **both** source streams complete. When
+ /// one source stream completes before the other, the merge stream
+ /// exclusively polls the remaining stream.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::StreamExt;
+ /// use tokio::sync::mpsc;
+ /// use tokio::time;
+ ///
+ /// use std::time::Duration;
+ ///
+ /// # /*
+ /// #[tokio::main]
+ /// # */
+ /// # #[tokio::main(basic_scheduler)]
+ /// async fn main() {
+ /// # time::pause();
+ /// let (mut tx1, rx1) = mpsc::channel(10);
+ /// let (mut tx2, rx2) = mpsc::channel(10);
+ ///
+ /// let mut rx = rx1.merge(rx2);
+ ///
+ /// tokio::spawn(async move {
+ /// // Send some values immediately
+ /// tx1.send(1).await.unwrap();
+ /// tx1.send(2).await.unwrap();
+ ///
+ /// // Let the other task send values
+ /// time::delay_for(Duration::from_millis(20)).await;
+ ///
+ /// tx1.send(4).await.unwrap();
+ /// });
+ ///
+ /// tokio::spawn(async move {
+ /// // Wait for the first task to send values
+ /// time::delay_for(Duration::from_millis(5)).await;
+ ///
+ /// tx2.send(3).await.unwrap();
+ ///
+ /// time::delay_for(Duration::from_millis(25)).await;
+ ///
+ /// // Send the final value
+ /// tx2.send(5).await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(1, rx.next().await.unwrap());
+ /// assert_eq!(2, rx.next().await.unwrap());
+ /// assert_eq!(3, rx.next().await.unwrap());
+ /// assert_eq!(4, rx.next().await.unwrap());
+ /// assert_eq!(5, rx.next().await.unwrap());
+ ///
+ /// // The merged stream is consumed
+ /// assert!(rx.next().await.is_none());
+ /// }
+ /// ```
+ fn merge<U>(self, other: U) -> Merge<Self, U>
+ where
+ U: Stream<Item = Self::Item>,
+ Self: Sized,
+ {
+ Merge::new(self, other)
+ }
+
/// Filters the values produced by this stream according to the provided
/// predicate.
///