summaryrefslogtreecommitdiffstats
path: root/tokio/src/stream/collect.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-13 14:44:06 -0800
committerGitHub <noreply@github.com>2020-01-13 14:44:06 -0800
commiteb1a8e1792b2c4b296be47a0681421c90bbdbf7a (patch)
tree602f3642f60167f8753c90cb170dcbaa34247ffb /tokio/src/stream/collect.rs
parent5b091fa3f0c3a06047d02ca6892f75c3e15040df (diff)
stream: add `StreamExt::collect()` (#2109)
Provides an asynchronous equivalent to `Iterator::collect()`. A sealed `FromStream` trait is added. Stabilization is pending Rust supporting `async` trait fns.
Diffstat (limited to 'tokio/src/stream/collect.rs')
-rw-r--r--tokio/src/stream/collect.rs246
1 files changed, 246 insertions, 0 deletions
diff --git a/tokio/src/stream/collect.rs b/tokio/src/stream/collect.rs
new file mode 100644
index 00000000..e8a58147
--- /dev/null
+++ b/tokio/src/stream/collect.rs
@@ -0,0 +1,246 @@
+use crate::stream::Stream;
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use core::future::Future;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+// Do not export this struct until `FromStream` can be unsealed.
+pin_project! {
+ /// Stream returned by the [`collect`](super::StreamExt::collect) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct Collect<T, U>
+ where
+ T: Stream,
+ U: FromStream<T::Item>,
+ {
+ #[pin]
+ stream: T,
+ collection: U::Collection,
+ }
+}
+
+/// Convert from a [`Stream`](crate::stream::Stream).
+///
+/// This trait is not intended to be used directly. Instead, call
+/// [`StreamExt::collect()`](super::StreamExt::collect).
+///
+/// # Implementing
+///
+/// Currently, this trait may not be implemented by third parties. The trait is
+/// sealed in order to make changes in the future. Stabilization is pending
+/// enhancements to the Rust langague.
+pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
+
+impl<T, U> Collect<T, U>
+where
+ T: Stream,
+ U: FromStream<T::Item>,
+{
+ pub(super) fn new(stream: T) -> Collect<T, U> {
+ let (lower, upper) = stream.size_hint();
+ let collection = U::initialize(lower, upper);
+
+ Collect { stream, collection }
+ }
+}
+
+impl<T, U> Future for Collect<T, U>
+where
+ T: Stream,
+ U: FromStream<T::Item>,
+{
+ type Output = U;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
+ use Poll::Ready;
+
+ loop {
+ let mut me = self.as_mut().project();
+
+ let item = match ready!(me.stream.poll_next(cx)) {
+ Some(item) => item,
+ None => {
+ return Ready(U::finalize(&mut me.collection));
+ }
+ };
+
+ if !U::extend(&mut me.collection, item) {
+ return Ready(U::finalize(&mut me.collection));
+ }
+ }
+ }
+}
+
+// ===== FromStream implementations
+
+impl FromStream<()> for () {}
+
+impl sealed::FromStreamPriv<()> for () {
+ type Collection = ();
+
+ fn initialize(_lower: usize, _upper: Option<usize>) {}
+
+ fn extend(_collection: &mut (), _item: ()) -> bool {
+ true
+ }
+
+ fn finalize(_collection: &mut ()) {}
+}
+
+impl<T: AsRef<str>> FromStream<T> for String {}
+
+impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
+ type Collection = String;
+
+ fn initialize(_lower: usize, _upper: Option<usize>) -> String {
+ String::new()
+ }
+
+ fn extend(collection: &mut String, item: T) -> bool {
+ collection.push_str(item.as_ref());
+ true
+ }
+
+ fn finalize(collection: &mut String) -> String {
+ mem::replace(collection, String::new())
+ }
+}
+
+impl<T> FromStream<T> for Vec<T> {}
+
+impl<T> sealed::FromStreamPriv<T> for Vec<T> {
+ type Collection = Vec<T>;
+
+ fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> {
+ Vec::with_capacity(lower)
+ }
+
+ fn extend(collection: &mut Vec<T>, item: T) -> bool {
+ collection.push(item);
+ true
+ }
+
+ fn finalize(collection: &mut Vec<T>) -> Vec<T> {
+ mem::replace(collection, vec![])
+ }
+}
+
+impl<T> FromStream<T> for Box<[T]> {}
+
+impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
+ type Collection = Vec<T>;
+
+ fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> {
+ <Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper)
+ }
+
+ fn extend(collection: &mut Vec<T>, item: T) -> bool {
+ <Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item)
+ }
+
+ fn finalize(collection: &mut Vec<T>) -> Box<[T]> {
+ <Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice()
+ }
+}
+
+impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
+
+impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
+where
+ U: FromStream<T>,
+{
+ type Collection = Result<U::Collection, E>;
+
+ fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> {
+ Ok(U::initialize(lower, upper))
+ }
+
+ fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool {
+ assert!(collection.is_ok());
+ match item {
+ Ok(item) => {
+ let collection = collection.as_mut().ok().expect("invalid state");
+ U::extend(collection, item)
+ }
+ Err(err) => {
+ *collection = Err(err);
+ false
+ }
+ }
+ }
+
+ fn finalize(collection: &mut Self::Collection) -> Result<U, E> {
+ if let Ok(collection) = collection.as_mut() {
+ Ok(U::finalize(collection))
+ } else {
+ let res = mem::replace(collection, Ok(U::initialize(0, Some(0))));
+
+ if let Err(err) = res {
+ Err(err)
+ } else {
+ unreachable!();
+ }
+ }
+ }
+}
+
+impl<T: Buf> FromStream<T> for Bytes {}
+
+impl<T: Buf> sealed::FromStreamPriv<T> for Bytes {
+ type Collection = BytesMut;
+
+ fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
+ BytesMut::new()
+ }
+
+ fn extend(collection: &mut BytesMut, item: T) -> bool {
+ collection.put(item);
+ true
+ }
+
+ fn finalize(collection: &mut BytesMut) -> Bytes {
+ mem::replace(collection, BytesMut::new()).freeze()
+ }
+}
+
+impl<T: Buf> FromStream<T> for BytesMut {}
+
+impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut {
+ type Collection = BytesMut;
+
+ fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
+ BytesMut::new()
+ }
+
+ fn extend(collection: &mut BytesMut, item: T) -> bool {
+ collection.put(item);
+ true
+ }
+
+ fn finalize(collection: &mut BytesMut) -> BytesMut {
+ mem::replace(collection, BytesMut::new())
+ }
+}
+
+pub(crate) mod sealed {
+ #[doc(hidden)]
+ pub trait FromStreamPriv<T> {
+ /// Intermediate type used during collection process
+ type Collection;
+
+ /// Initialize the collection
+ fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection;
+
+ /// Extend the collection with the received item
+ ///
+ /// Return `true` to continue streaming, `false` complete collection.
+ fn extend(collection: &mut Self::Collection, item: T) -> bool;
+
+ /// Finalize collection into target type.
+ fn finalize(collection: &mut Self::Collection) -> Self;
+ }
+}