summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream/merge.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 12:31:59 -0800
committerGitHub <noreply@github.com>2020-01-11 12:31:59 -0800
commit0ba6e9abdbe1b42997d183adf5a39488c9543200 (patch)
tree66e10180e665d4598e256cfb61f2c19160f655cd /tokio/src/stream/merge.rs
parenta939dc48b0da9b62e361f4e82e79f48e70caa4be (diff)
stream: add `StreamExt::merge` (#2091)
Provides an equivalent to stream `select()` from futures-rs. `merge` best describes the operation (vs. `select`). `futures-rs` named the operation "select" for historical reasons and did not rename it back to `merge` in 0.3. The operation is most commonly named `merge` else where as well (e.g. ReactiveX).
Diffstat (limited to 'tokio/src/stream/merge.rs')
-rw-r--r--tokio/src/stream/merge.rs97
1 files changed, 97 insertions, 0 deletions
diff --git a/tokio/src/stream/merge.rs b/tokio/src/stream/merge.rs
new file mode 100644
index 00000000..4850cd40
--- /dev/null
+++ b/tokio/src/stream/merge.rs
@@ -0,0 +1,97 @@
+use crate::stream::{Fuse, Stream};
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`merge`](super::StreamExt::merge) method.
+ pub struct Merge<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: Fuse<U>,
+ // When `true`, poll `a` first, otherwise, `poll` b`.
+ a_first: bool,
+ }
+}
+
+impl<T, U> Merge<T, U> {
+ pub(super) fn new(a: T, b: U) -> Merge<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Merge {
+ a: Fuse::new(a),
+ b: Fuse::new(b),
+ a_first: true,
+ }
+ }
+}
+
+impl<T, U> Stream for Merge<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ let me = self.project();
+ let a_first = *me.a_first;
+
+ // Toggle the flag
+ *me.a_first = !a_first;
+
+ if a_first {
+ poll_next(me.a, me.b, cx)
+ } else {
+ poll_next(me.b, me.a, cx)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (a_lower, a_upper) = self.a.size_hint();
+ let (b_lower, b_upper) = self.b.size_hint();
+
+ let upper = match (a_upper, b_upper) {
+ (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper),
+ _ => None,
+ };
+
+ (a_lower + b_lower, upper)
+ }
+}
+
+fn poll_next<T, U>(
+ first: Pin<&mut T>,
+ second: Pin<&mut U>,
+ cx: &mut Context<'_>,
+) -> Poll<Option<T::Item>>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ use Poll::*;
+
+ let mut done = true;
+
+ match first.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ match second.poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some(val)),
+ Ready(None) => {}
+ Pending => done = false,
+ }
+
+ if done {
+ Ready(None)
+ } else {
+ Pending
+ }
+}