diff options
Diffstat (limited to 'tokio/src/sync/broadcast.rs')
-rw-r--r-- | tokio/src/sync/broadcast.rs | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fd9029a7..ce803724 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -207,7 +207,7 @@ pub struct SendError<T>(pub T); /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum RecvError { /// There are no more active senders implying no further messages will ever /// be sent. @@ -225,7 +225,7 @@ pub enum RecvError { /// /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum TryRecvError { /// The channel is currently empty. There are still active /// [`Sender`][Sender] handles, so data may yet become available. @@ -861,6 +861,25 @@ where } } +#[cfg(feature = "stream")] +impl<T> crate::stream::Stream for Receiver<T> +where + T: Clone, +{ + type Item = Result<T, RecvError>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<T, RecvError>>> { + self.poll_recv(cx).map(|v| match v { + Ok(v) => Some(Ok(v)), + lag @ Err(RecvError::Lagged(_)) => Some(lag), + Err(RecvError::Closed) => None, + }) + } +} + impl<T> Drop for Receiver<T> { fn drop(&mut self) { let mut tail = self.shared.tail.lock().unwrap(); |