summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream/chain.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-11 16:33:52 -0800
committerGitHub <noreply@github.com>2020-01-11 16:33:52 -0800
commit7c3f1cb4a3d6076cb5e1aedf2311f62c8a7a2fd7 (patch)
tree0d0a2eb5d8a60438b14fa191ddc933f1a1fbe15a /tokio/src/stream/chain.rs
parent64d23899118dfc8f1d4d7a9b60c015e43260df80 (diff)
stream: add `StreamExt::chain` (#2093)
Asynchronous equivalent to `Iterator::chain`.
Diffstat (limited to 'tokio/src/stream/chain.rs')
-rw-r--r--tokio/src/stream/chain.rs57
1 files changed, 57 insertions, 0 deletions
diff --git a/tokio/src/stream/chain.rs b/tokio/src/stream/chain.rs
new file mode 100644
index 00000000..5f0324a4
--- /dev/null
+++ b/tokio/src/stream/chain.rs
@@ -0,0 +1,57 @@
+use crate::stream::{Fuse, Stream};
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream returned by the [`chain`](super::StreamExt::chain) method.
+ pub struct Chain<T, U> {
+ #[pin]
+ a: Fuse<T>,
+ #[pin]
+ b: U,
+ }
+}
+
+impl<T, U> Chain<T, U> {
+ pub(super) fn new(a: T, b: U) -> Chain<T, U>
+ where
+ T: Stream,
+ U: Stream,
+ {
+ Chain { a: Fuse::new(a), b }
+ }
+}
+
+impl<T, U> Stream for Chain<T, U>
+where
+ T: Stream,
+ U: Stream<Item = T::Item>,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ use Poll::Ready;
+
+ let me = self.project();
+
+ if let Some(v) = ready!(me.a.poll_next(cx)) {
+ return Ready(Some(v));
+ }
+
+ me.b.poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (a_lower, a_upper) = self.a.size_hint();
+ let (b_lower, b_upper) = self.b.size_hint();
+
+ let upper = match (a_upper, b_upper) {
+ (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper),
+ _ => None,
+ };
+
+ (a_lower + b_lower, upper)
+ }
+}