use crate::stream::{Fuse, Stream}; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { /// Stream returned by the [`merge`](super::StreamExt::merge) method. pub struct Merge { #[pin] a: Fuse, #[pin] b: Fuse, // When `true`, poll `a` first, otherwise, `poll` b`. a_first: bool, } } impl Merge { pub(super) fn new(a: T, b: U) -> Merge where T: Stream, U: Stream, { Merge { a: Fuse::new(a), b: Fuse::new(b), a_first: true, } } } impl Stream for Merge where T: Stream, U: Stream, { type Item = T::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let me = self.project(); let a_first = *me.a_first; // Toggle the flag *me.a_first = !a_first; if a_first { poll_next(me.a, me.b, cx) } else { poll_next(me.b, me.a, cx) } } fn size_hint(&self) -> (usize, Option) { super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) } } fn poll_next( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll> where T: Stream, U: Stream, { use Poll::*; let mut done = true; match first.poll_next(cx) { Ready(Some(val)) => return Ready(Some(val)), Ready(None) => {} Pending => done = false, } match second.poll_next(cx) { Ready(Some(val)) => return Ready(Some(val)), Ready(None) => {} Pending => done = false, } if done { Ready(None) } else { Pending } }