summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_broadcast.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r--tokio/tests/sync_broadcast.rs104
1 files changed, 103 insertions, 1 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs
index e9e7b366..4fb7c0aa 100644
--- a/tokio/tests/sync_broadcast.rs
+++ b/tokio/tests/sync_broadcast.rs
@@ -40,6 +40,15 @@ macro_rules! assert_lagged {
};
}
+macro_rules! assert_closed {
+ ($e:expr) => {
+ match assert_err!($e) {
+ broadcast::TryRecvError::Closed => {}
+ _ => panic!("did not lag"),
+ }
+ };
+}
+
trait AssertSend: Send {}
impl AssertSend for broadcast::Sender<i32> {}
impl AssertSend for broadcast::Receiver<i32> {}
@@ -229,7 +238,8 @@ fn lagging_rx() {
assert_ok!(tx.send("three"));
// Lagged too far
- assert_lagged!(rx2.try_recv(), 1);
+ let x = dbg!(rx2.try_recv());
+ assert_lagged!(x, 1);
// Calling again gets the next value
assert_eq!("two", assert_recv!(rx2));
@@ -349,6 +359,98 @@ fn unconsumed_messages_are_dropped() {
assert_eq!(1, Arc::strong_count(&msg));
}
+#[test]
+fn single_capacity_recvs() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ assert_ok!(tx.send(1));
+
+ assert_eq!(assert_recv!(rx), 1);
+ assert_empty!(rx);
+}
+
+#[test]
+fn single_capacity_recvs_after_drop_1() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ assert_ok!(tx.send(1));
+ drop(tx);
+
+ assert_eq!(assert_recv!(rx), 1);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn single_capacity_recvs_after_drop_2() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ drop(tx);
+
+ assert_lagged!(rx.try_recv(), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn dropping_sender_does_not_overwrite() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ drop(tx);
+
+ assert_eq!(assert_recv!(rx), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn lagging_receiver_recovers_after_wrap_closed_1() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ assert_ok!(tx.send(3));
+ drop(tx);
+
+ assert_lagged!(rx.try_recv(), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn lagging_receiver_recovers_after_wrap_closed_2() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ assert_ok!(tx.send(3));
+ assert_ok!(tx.send(4));
+ drop(tx);
+
+ assert_lagged!(rx.try_recv(), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_eq!(assert_recv!(rx), 4);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn lagging_receiver_recovers_after_wrap_open() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ assert_ok!(tx.send(3));
+
+ assert_lagged!(rx.try_recv(), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_empty!(rx);
+}
+
fn is_closed(err: broadcast::RecvError) -> bool {
match err {
broadcast::RecvError::Closed => true,