summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/broadcast.rs
diff options
context:
space:
mode:
authorBhargav <bIgBV@users.noreply.github.com>2019-12-21 14:38:05 -0800
committerCarl Lerche <me@carllerche.com>2019-12-21 14:38:05 -0800
commita8540948254ec69c630bacd0b4a58a20d701b7ac (patch)
tree2ba875d544525b3d13bbed20b93fad65f14780b0 /tokio/src/sync/broadcast.rs
parent3d1b4b30585532e9708e322e1a0876d2933af71a (diff)
sync: impl `Stream` for broadcast::Receiver (#2012)
Diffstat (limited to 'tokio/src/sync/broadcast.rs')
-rw-r--r--tokio/src/sync/broadcast.rs23
1 files changed, 21 insertions, 2 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index fd9029a7..ce803724 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -207,7 +207,7 @@ pub struct SendError<T>(pub T);
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
-#[derive(Debug)]
+#[derive(Debug, PartialEq)]
pub enum RecvError {
/// There are no more active senders implying no further messages will ever
/// be sent.
@@ -225,7 +225,7 @@ pub enum RecvError {
///
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
-#[derive(Debug)]
+#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// The channel is currently empty. There are still active
/// [`Sender`][Sender] handles, so data may yet become available.
@@ -861,6 +861,25 @@ where
}
}
+#[cfg(feature = "stream")]
+impl<T> crate::stream::Stream for Receiver<T>
+where
+ T: Clone,
+{
+ type Item = Result<T, RecvError>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<T, RecvError>>> {
+ self.poll_recv(cx).map(|v| match v {
+ Ok(v) => Some(Ok(v)),
+ lag @ Err(RecvError::Lagged(_)) => Some(lag),
+ Err(RecvError::Closed) => None,
+ })
+ }
+}
+
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut tail = self.shared.tail.lock().unwrap();