summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_broadcast.rs
diff options
context:
space:
mode:
authorKevin Leimkuhler <kevin@kleimkuhler.com>2020-04-27 21:04:47 -0700
committerGitHub <noreply@github.com>2020-04-27 21:04:47 -0700
commita81958484941ddcc2f1955fb6873c827f694ec9b (patch)
tree0cfa52747f808e3dfb7e3161c2096ece237670f3 /tokio/tests/sync_broadcast.rs
parent70ed3c7f0436079d798306820918d026819cb73d (diff)
sync: fix slow receivers in broadcast (#2448)
Broadcast uses a ring buffer to store values sent to the channel. In order to deal with slow receivers, the oldest values are overwritten with new values once the buffer wraps. A receiver should be able to calculate how many values it has missed. Additionally, when the broadcast closes, a final value of `None` is sent to the channel. If the buffer has wrapped, this value overwrites the oldest value. This is an issue mainly in a single capacity broadcast when a value is sent and then the sender is dropped. The original value is immediately overwritten with `None` meaning that receivers assume they have lagged behind. **Solution** A value of `None` is no longer sent to the channel when the final sender has been dropped. This solves the single capacity broadcast case by completely removing the behavior of overwriting values when the channel is closed. Now, when the final sender is dropped a closed bit is set on the next slot that the channel is supposed to send to. In the case of a fast receiver, if it finds a slot where the closed bit is set, it knows the channel is closed without locking the tail. In the case of a slow receiver, it must first find out if it has missed any values. This is similar to before, but must be able to account for channel closure. If the channel is not closed, the oldest value may be located at index `n`. If the channel is closed, the oldest value is located at index `n - 1`. Knowing the index where the oldest value is located, a receiver can calculate how many values it may have missed and starts to catch up. Closes #2425
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r--tokio/tests/sync_broadcast.rs104
1 files changed, 103 insertions, 1 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs
index e9e7b366..4fb7c0aa 100644
--- a/tokio/tests/sync_broadcast.rs
+++ b/tokio/tests/sync_broadcast.rs
@@ -40,6 +40,15 @@ macro_rules! assert_lagged {
};
}
+macro_rules! assert_closed {
+ ($e:expr) => {
+ match assert_err!($e) {
+ broadcast::TryRecvError::Closed => {}
+ _ => panic!("did not lag"),
+ }
+ };
+}
+
trait AssertSend: Send {}
impl AssertSend for broadcast::Sender<i32> {}
impl AssertSend for broadcast::Receiver<i32> {}
@@ -229,7 +238,8 @@ fn lagging_rx() {
assert_ok!(tx.send("three"));
// Lagged too far
- assert_lagged!(rx2.try_recv(), 1);
+ let x = dbg!(rx2.try_recv());
+ assert_lagged!(x, 1);
// Calling again gets the next value
assert_eq!("two", assert_recv!(rx2));
@@ -349,6 +359,98 @@ fn unconsumed_messages_are_dropped() {
assert_eq!(1, Arc::strong_count(&msg));
}
+#[test]
+fn single_capacity_recvs() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ assert_ok!(tx.send(1));
+
+ assert_eq!(assert_recv!(rx), 1);
+ assert_empty!(rx);
+}
+
+#[test]
+fn single_capacity_recvs_after_drop_1() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ assert_ok!(tx.send(1));
+ drop(tx);
+
+ assert_eq!(assert_recv!(rx), 1);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn single_capacity_recvs_after_drop_2() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ drop(tx);
+
+ assert_lagged!(rx.try_recv(), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn dropping_sender_does_not_overwrite() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ drop(tx);
+
+ assert_eq!(assert_recv!(rx), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn lagging_receiver_recovers_after_wrap_closed_1() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ assert_ok!(tx.send(3));
+ drop(tx);
+
+ assert_lagged!(rx.try_recv(), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn lagging_receiver_recovers_after_wrap_closed_2() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ assert_ok!(tx.send(3));
+ assert_ok!(tx.send(4));
+ drop(tx);
+
+ assert_lagged!(rx.try_recv(), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_eq!(assert_recv!(rx), 4);
+ assert_closed!(rx.try_recv());
+}
+
+#[test]
+fn lagging_receiver_recovers_after_wrap_open() {
+ let (tx, mut rx) = broadcast::channel(2);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ assert_ok!(tx.send(3));
+
+ assert_lagged!(rx.try_recv(), 1);
+ assert_eq!(assert_recv!(rx), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_empty!(rx);
+}
+
fn is_closed(err: broadcast::RecvError) -> bool {
match err {
broadcast::RecvError::Closed => true,