use crate::stream::Stream; use core::future::Future; use core::marker::PhantomPinned; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; // Do not export this struct until `FromStream` can be unsealed. pin_project! { /// Future returned by the [`collect`](super::StreamExt::collect) method. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Collect where T: Stream, U: FromStream, { #[pin] stream: T, collection: U::InternalCollection, // Make this future `!Unpin` for compatibility with async trait methods. #[pin] _pin: PhantomPinned, } } /// Convert from a [`Stream`](crate::stream::Stream). /// /// This trait is not intended to be used directly. Instead, call /// [`StreamExt::collect()`](super::StreamExt::collect). /// /// # Implementing /// /// Currently, this trait may not be implemented by third parties. The trait is /// sealed in order to make changes in the future. Stabilization is pending /// enhancements to the Rust language. pub trait FromStream: sealed::FromStreamPriv {} impl Collect where T: Stream, U: FromStream, { pub(super) fn new(stream: T) -> Collect { let (lower, upper) = stream.size_hint(); let collection = U::initialize(sealed::Internal, lower, upper); Collect { stream, collection, _pin: PhantomPinned, } } } impl Future for Collect where T: Stream, U: FromStream, { type Output = U; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { use Poll::Ready; loop { let mut me = self.as_mut().project(); let item = match ready!(me.stream.poll_next(cx)) { Some(item) => item, None => { return Ready(U::finalize(sealed::Internal, &mut me.collection)); } }; if !U::extend(sealed::Internal, &mut me.collection, item) { return Ready(U::finalize(sealed::Internal, &mut me.collection)); } } } } // ===== FromStream implementations impl FromStream<()> for () {} impl sealed::FromStreamPriv<()> for () { type InternalCollection = (); fn initialize(_: sealed::Internal, _lower: usize, _upper: Option) {} fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool { true } fn finalize(_: sealed::Internal, _collection: &mut ()) {} } impl> FromStream for String {} impl> sealed::FromStreamPriv for String { type InternalCollection = String; fn initialize(_: sealed::Internal, _lower: usize, _upper: Option) -> String { String::new() } fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool { collection.push_str(item.as_ref()); true } fn finalize(_: sealed::Internal, collection: &mut String) -> String { mem::replace(collection, String::new()) } } impl FromStream for Vec {} impl sealed::FromStreamPriv for Vec { type InternalCollection = Vec; fn initialize(_: sealed::Internal, lower: usize, _upper: Option) -> Vec { Vec::with_capacity(lower) } fn extend(_: sealed::Internal, collection: &mut Vec, item: T) -> bool { collection.push(item); true } fn finalize(_: sealed::Internal, collection: &mut Vec) -> Vec { mem::replace(collection, vec![]) } } impl FromStream for Box<[T]> {} impl sealed::FromStreamPriv for Box<[T]> { type InternalCollection = Vec; fn initialize(_: sealed::Internal, lower: usize, upper: Option) -> Vec { as sealed::FromStreamPriv>::initialize(sealed::Internal, lower, upper) } fn extend(_: sealed::Internal, collection: &mut Vec, item: T) -> bool { as sealed::FromStreamPriv>::extend(sealed::Internal, collection, item) } fn finalize(_: sealed::Internal, collection: &mut Vec) -> Box<[T]> { as sealed::FromStreamPriv>::finalize(sealed::Internal, collection) .into_boxed_slice() } } impl FromStream> for Result where U: FromStream {} impl sealed::FromStreamPriv> for Result where U: FromStream, { type InternalCollection = Result; fn initialize( _: sealed::Internal, lower: usize, upper: Option, ) -> Result { Ok(U::initialize(sealed::Internal, lower, upper)) } fn extend( _: sealed::Internal, collection: &mut Self::InternalCollection, item: Result, ) -> bool { assert!(collection.is_ok()); match item { Ok(item) => { let collection = collection.as_mut().ok().expect("invalid state"); U::extend(sealed::Internal, collection, item) } Err(err) => { *collection = Err(err); false } } } fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result { if let Ok(collection) = collection.as_mut() { Ok(U::finalize(sealed::Internal, collection)) } else { let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); if let Err(err) = res { Err(err) } else { unreachable!(); } } } } pub(crate) mod sealed { #[doc(hidden)] pub trait FromStreamPriv { /// Intermediate type used during collection process /// /// The name of this type is internal and cannot be relied upon. type InternalCollection; /// Initialize the collection fn initialize( internal: Internal, lower: usize, upper: Option, ) -> Self::InternalCollection; /// Extend the collection with the received item /// /// Return `true` to continue streaming, `false` complete collection. fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool; /// Finalize collection into target type. fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self; } #[allow(missing_debug_implementations)] pub struct Internal; }