diff options
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r-- | tokio/tests/sync_broadcast.rs | 104 |
1 files changed, 103 insertions, 1 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index e9e7b366..4fb7c0aa 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -40,6 +40,15 @@ macro_rules! assert_lagged { }; } +macro_rules! assert_closed { + ($e:expr) => { + match assert_err!($e) { + broadcast::TryRecvError::Closed => {} + _ => panic!("did not lag"), + } + }; +} + trait AssertSend: Send {} impl AssertSend for broadcast::Sender<i32> {} impl AssertSend for broadcast::Receiver<i32> {} @@ -229,7 +238,8 @@ fn lagging_rx() { assert_ok!(tx.send("three")); // Lagged too far - assert_lagged!(rx2.try_recv(), 1); + let x = dbg!(rx2.try_recv()); + assert_lagged!(x, 1); // Calling again gets the next value assert_eq!("two", assert_recv!(rx2)); @@ -349,6 +359,98 @@ fn unconsumed_messages_are_dropped() { assert_eq!(1, Arc::strong_count(&msg)); } +#[test] +fn single_capacity_recvs() { + let (tx, mut rx) = broadcast::channel(1); + + assert_ok!(tx.send(1)); + + assert_eq!(assert_recv!(rx), 1); + assert_empty!(rx); +} + +#[test] +fn single_capacity_recvs_after_drop_1() { + let (tx, mut rx) = broadcast::channel(1); + + assert_ok!(tx.send(1)); + drop(tx); + + assert_eq!(assert_recv!(rx), 1); + assert_closed!(rx.try_recv()); +} + +#[test] +fn single_capacity_recvs_after_drop_2() { + let (tx, mut rx) = broadcast::channel(1); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + drop(tx); + + assert_lagged!(rx.try_recv(), 1); + assert_eq!(assert_recv!(rx), 2); + assert_closed!(rx.try_recv()); +} + +#[test] +fn dropping_sender_does_not_overwrite() { + let (tx, mut rx) = broadcast::channel(2); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + drop(tx); + + assert_eq!(assert_recv!(rx), 1); + assert_eq!(assert_recv!(rx), 2); + assert_closed!(rx.try_recv()); +} + +#[test] +fn lagging_receiver_recovers_after_wrap_closed_1() { + let (tx, mut rx) = broadcast::channel(2); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + assert_ok!(tx.send(3)); + drop(tx); + + assert_lagged!(rx.try_recv(), 1); + assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx), 3); + assert_closed!(rx.try_recv()); +} + +#[test] +fn lagging_receiver_recovers_after_wrap_closed_2() { + let (tx, mut rx) = broadcast::channel(2); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + assert_ok!(tx.send(3)); + assert_ok!(tx.send(4)); + drop(tx); + + assert_lagged!(rx.try_recv(), 2); + assert_eq!(assert_recv!(rx), 3); + assert_eq!(assert_recv!(rx), 4); + assert_closed!(rx.try_recv()); +} + +#[test] +fn lagging_receiver_recovers_after_wrap_open() { + let (tx, mut rx) = broadcast::channel(2); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + assert_ok!(tx.send(3)); + + assert_lagged!(rx.try_recv(), 1); + assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx), 3); + assert_empty!(rx); +} + fn is_closed(err: broadcast::RecvError) -> bool { match err { broadcast::RecvError::Closed => true, |