From a8540948254ec69c630bacd0b4a58a20d701b7ac Mon Sep 17 00:00:00 2001 From: Bhargav Date: Sat, 21 Dec 2019 14:38:05 -0800 Subject: sync: impl `Stream` for broadcast::Receiver (#2012) --- tokio/src/sync/broadcast.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'tokio/src/sync/broadcast.rs') diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fd9029a7..ce803724 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -207,7 +207,7 @@ pub struct SendError(pub T); /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum RecvError { /// There are no more active senders implying no further messages will ever /// be sent. @@ -225,7 +225,7 @@ pub enum RecvError { /// /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum TryRecvError { /// The channel is currently empty. There are still active /// [`Sender`][Sender] handles, so data may yet become available. @@ -861,6 +861,25 @@ where } } +#[cfg(feature = "stream")] +impl crate::stream::Stream for Receiver +where + T: Clone, +{ + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_recv(cx).map(|v| match v { + Ok(v) => Some(Ok(v)), + lag @ Err(RecvError::Lagged(_)) => Some(lag), + Err(RecvError::Closed) => None, + }) + } +} + impl Drop for Receiver { fn drop(&mut self) { let mut tail = self.shared.tail.lock().unwrap(); -- cgit v1.2.3