diff options
author | Bhargav <bIgBV@users.noreply.github.com> | 2019-12-21 14:38:05 -0800 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-21 14:38:05 -0800 |
commit | a8540948254ec69c630bacd0b4a58a20d701b7ac (patch) | |
tree | 2ba875d544525b3d13bbed20b93fad65f14780b0 /tokio/src/sync/broadcast.rs | |
parent | 3d1b4b30585532e9708e322e1a0876d2933af71a (diff) |
sync: impl `Stream` for broadcast::Receiver (#2012)
Diffstat (limited to 'tokio/src/sync/broadcast.rs')
-rw-r--r-- | tokio/src/sync/broadcast.rs | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fd9029a7..ce803724 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -207,7 +207,7 @@ pub struct SendError<T>(pub T); /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum RecvError { /// There are no more active senders implying no further messages will ever /// be sent. @@ -225,7 +225,7 @@ pub enum RecvError { /// /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum TryRecvError { /// The channel is currently empty. There are still active /// [`Sender`][Sender] handles, so data may yet become available. @@ -861,6 +861,25 @@ where } } +#[cfg(feature = "stream")] +impl<T> crate::stream::Stream for Receiver<T> +where + T: Clone, +{ + type Item = Result<T, RecvError>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<T, RecvError>>> { + self.poll_recv(cx).map(|v| match v { + Ok(v) => Some(Ok(v)), + lag @ Err(RecvError::Lagged(_)) => Some(lag), + Err(RecvError::Closed) => None, + }) + } +} + impl<T> Drop for Receiver<T> { fn drop(&mut self) { let mut tail = self.shared.tail.lock().unwrap(); |