diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-11 16:33:52 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-11 16:33:52 -0800 |
commit | 7c3f1cb4a3d6076cb5e1aedf2311f62c8a7a2fd7 (patch) | |
tree | 0d0a2eb5d8a60438b14fa191ddc933f1a1fbe15a /tokio/src/stream/chain.rs | |
parent | 64d23899118dfc8f1d4d7a9b60c015e43260df80 (diff) |
stream: add `StreamExt::chain` (#2093)
Asynchronous equivalent to `Iterator::chain`.
Diffstat (limited to 'tokio/src/stream/chain.rs')
-rw-r--r-- | tokio/src/stream/chain.rs | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/tokio/src/stream/chain.rs b/tokio/src/stream/chain.rs new file mode 100644 index 00000000..5f0324a4 --- /dev/null +++ b/tokio/src/stream/chain.rs @@ -0,0 +1,57 @@ +use crate::stream::{Fuse, Stream}; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`chain`](super::StreamExt::chain) method. + pub struct Chain<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: U, + } +} + +impl<T, U> Chain<T, U> { + pub(super) fn new(a: T, b: U) -> Chain<T, U> + where + T: Stream, + U: Stream, + { + Chain { a: Fuse::new(a), b } + } +} + +impl<T, U> Stream for Chain<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + use Poll::Ready; + + let me = self.project(); + + if let Some(v) = ready!(me.a.poll_next(cx)) { + return Ready(Some(v)); + } + + me.b.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (a_lower, a_upper) = self.a.size_hint(); + let (b_lower, b_upper) = self.b.size_hint(); + + let upper = match (a_upper, b_upper) { + (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper), + _ => None, + }; + + (a_lower + b_lower, upper) + } +} |