diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-11 12:31:59 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-11 12:31:59 -0800 |
commit | 0ba6e9abdbe1b42997d183adf5a39488c9543200 (patch) | |
tree | 66e10180e665d4598e256cfb61f2c19160f655cd /tokio/src/stream/mod.rs | |
parent | a939dc48b0da9b62e361f4e82e79f48e70caa4be (diff) |
stream: add `StreamExt::merge` (#2091)
Provides an equivalent to stream `select()` from futures-rs. `merge`
best describes the operation (vs. `select`). `futures-rs` named the
operation "select" for historical reasons and did not rename it back to
`merge` in 0.3. The operation is most commonly named `merge` else where
as well (e.g. ReactiveX).
Diffstat (limited to 'tokio/src/stream/mod.rs')
-rw-r--r-- | tokio/src/stream/mod.rs | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index a204af3c..2bbb6802 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -25,6 +25,9 @@ pub use iter::{iter, Iter}; mod map; use map::Map; +mod merge; +use merge::Merge; + mod next; use next::Next; @@ -149,6 +152,79 @@ pub trait StreamExt: Stream { Map::new(self, f) } + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::StreamExt; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(basic_scheduler)] + /// async fn main() { + /// # time::pause(); + /// let (mut tx1, rx1) = mpsc::channel(10); + /// let (mut tx2, rx2) = mpsc::channel(10); + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::delay_for(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::delay_for(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::delay_for(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge<U>(self, other: U) -> Merge<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Merge::new(self, other) + } + /// Filters the values produced by this stream according to the provided /// predicate. /// |