diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-11 12:31:59 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-11 12:31:59 -0800 |
commit | 0ba6e9abdbe1b42997d183adf5a39488c9543200 (patch) | |
tree | 66e10180e665d4598e256cfb61f2c19160f655cd /tokio/src/stream/merge.rs | |
parent | a939dc48b0da9b62e361f4e82e79f48e70caa4be (diff) |
stream: add `StreamExt::merge` (#2091)
Provides an equivalent to stream `select()` from futures-rs. `merge`
best describes the operation (vs. `select`). `futures-rs` named the
operation "select" for historical reasons and did not rename it back to
`merge` in 0.3. The operation is most commonly named `merge` else where
as well (e.g. ReactiveX).
Diffstat (limited to 'tokio/src/stream/merge.rs')
-rw-r--r-- | tokio/src/stream/merge.rs | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/tokio/src/stream/merge.rs b/tokio/src/stream/merge.rs new file mode 100644 index 00000000..4850cd40 --- /dev/null +++ b/tokio/src/stream/merge.rs @@ -0,0 +1,97 @@ +use crate::stream::{Fuse, Stream}; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`merge`](super::StreamExt::merge) method. + pub struct Merge<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: Fuse<U>, + // When `true`, poll `a` first, otherwise, `poll` b`. + a_first: bool, + } +} + +impl<T, U> Merge<T, U> { + pub(super) fn new(a: T, b: U) -> Merge<T, U> + where + T: Stream, + U: Stream, + { + Merge { + a: Fuse::new(a), + b: Fuse::new(b), + a_first: true, + } + } +} + +impl<T, U> Stream for Merge<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let me = self.project(); + let a_first = *me.a_first; + + // Toggle the flag + *me.a_first = !a_first; + + if a_first { + poll_next(me.a, me.b, cx) + } else { + poll_next(me.b, me.a, cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (a_lower, a_upper) = self.a.size_hint(); + let (b_lower, b_upper) = self.b.size_hint(); + + let upper = match (a_upper, b_upper) { + (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper), + _ => None, + }; + + (a_lower + b_lower, upper) + } +} + +fn poll_next<T, U>( + first: Pin<&mut T>, + second: Pin<&mut U>, + cx: &mut Context<'_>, +) -> Poll<Option<T::Item>> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + use Poll::*; + + let mut done = true; + + match first.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + match second.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + if done { + Ready(None) + } else { + Pending + } +} |