use crate::stream::{Fuse, Stream}; use crate::time::{error::Elapsed, Instant, Sleep}; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; use std::time::Duration; pin_project! { /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Timeout { #[pin] stream: Fuse, deadline: Sleep, duration: Duration, poll_deadline: bool, } } impl Timeout { pub(super) fn new(stream: S, duration: Duration) -> Self { let next = Instant::now() + duration; let deadline = Sleep::new_timeout(next); Timeout { stream: Fuse::new(stream), deadline, duration, poll_deadline: true, } } } impl Stream for Timeout { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.as_mut().project().stream.poll_next(cx) { Poll::Ready(v) => { if v.is_some() { let next = Instant::now() + self.duration; self.as_mut().project().deadline.reset(next); *self.as_mut().project().poll_deadline = true; } return Poll::Ready(v.map(Ok)); } Poll::Pending => {} }; if self.poll_deadline { ready!(Pin::new(self.as_mut().project().deadline).poll(cx)); *self.as_mut().project().poll_deadline = false; return Poll::Ready(Some(Err(Elapsed::new()))); } Poll::Pending } fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() } }