diff options
Diffstat (limited to 'tokio/src/stream/map.rs')
-rw-r--r-- | tokio/src/stream/map.rs | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/tokio/src/stream/map.rs b/tokio/src/stream/map.rs new file mode 100644 index 00000000..a89769de --- /dev/null +++ b/tokio/src/stream/map.rs @@ -0,0 +1,57 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Map<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, T, F> Map<St, F> + where St: Stream, + F: FnMut(St::Item) -> T, +{ + pub(super) fn new(stream: St, f: F) -> Map<St, F> { + Map { stream, f } + } +} + +impl<St, F, T> Stream for Map<St, F> + where St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<T>> { + self.as_mut() + .project().stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} |