summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_broadcast.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-12-18 10:13:15 -0800
committerGitHub <noreply@github.com>2019-12-18 10:13:15 -0800
commit7c010ed030d0084d38451bbebf727c6695c2dbac (patch)
treed2507c71f9b6c1cdb988097592daa9566ec2ec51 /tokio/tests/sync_broadcast.rs
parent42c942de1433c37946f81d185459b8ee54928d3d (diff)
sync: add broadcast channel (#1943)
Adds a broadcast channel implementation. A broadcast channel is a multi-producer, multi-consumer channel where each consumer receives a clone of every value sent. This is useful for implementing pub / sub style patterns. Implemented as a ring buffer, a Vec of the specified capacity is allocated on initialization of the channel. Values are pushed into slots. When the channel is full, a send overwrites the oldest value. Receivers detect this and return an error on the next call to receive. This prevents unbounded buffering and does not make the channel vulnerable to the slowest consumer. Closes: #1585
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r--tokio/tests/sync_broadcast.rs339
1 files changed, 339 insertions, 0 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs
new file mode 100644
index 00000000..257595ca
--- /dev/null
+++ b/tokio/tests/sync_broadcast.rs
@@ -0,0 +1,339 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "sync")]
+
+use tokio::sync::broadcast;
+use tokio_test::task;
+use tokio_test::{
+ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
+};
+
+use std::sync::Arc;
+
+macro_rules! assert_recv {
+ ($e:expr) => {
+ match $e.try_recv() {
+ Ok(value) => value,
+ Err(e) => panic!("expected recv; got = {:?}", e),
+ }
+ };
+}
+
+macro_rules! assert_empty {
+ ($e:expr) => {
+ match $e.try_recv() {
+ Ok(value) => panic!("expected empty; got = {:?}", value),
+ Err(broadcast::TryRecvError::Empty) => {}
+ Err(e) => panic!("expected empty; got = {:?}", e),
+ }
+ };
+}
+
+macro_rules! assert_lagged {
+ ($e:expr, $n:expr) => {
+ match assert_err!($e) {
+ broadcast::TryRecvError::Lagged(n) => {
+ assert_eq!(n, $n);
+ }
+ _ => panic!("did not lag"),
+ }
+ };
+}
+
+trait AssertSend: Send {}
+impl AssertSend for broadcast::Sender<i32> {}
+impl AssertSend for broadcast::Receiver<i32> {}
+
+#[test]
+fn send_try_recv_bounded() {
+ let (tx, mut rx) = broadcast::channel(16);
+
+ assert_empty!(rx);
+
+ let n = assert_ok!(tx.send("hello"));
+ assert_eq!(n, 1);
+
+ let val = assert_recv!(rx);
+ assert_eq!(val, "hello");
+
+ assert_empty!(rx);
+}
+
+#[test]
+fn send_two_recv() {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ assert_empty!(rx1);
+ assert_empty!(rx2);
+
+ let n = assert_ok!(tx.send("hello"));
+ assert_eq!(n, 2);
+
+ let val = assert_recv!(rx1);
+ assert_eq!(val, "hello");
+
+ let val = assert_recv!(rx2);
+ assert_eq!(val, "hello");
+
+ assert_empty!(rx1);
+ assert_empty!(rx2);
+}
+
+#[test]
+fn send_recv_bounded() {
+ let (tx, mut rx) = broadcast::channel(16);
+
+ let mut recv = task::spawn(rx.recv());
+
+ assert_pending!(recv.poll());
+
+ assert_ok!(tx.send("hello"));
+
+ assert!(recv.is_woken());
+ let val = assert_ready_ok!(recv.poll());
+ assert_eq!(val, "hello");
+}
+
+#[test]
+fn send_two_recv_bounded() {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ let mut recv1 = task::spawn(rx1.recv());
+ let mut recv2 = task::spawn(rx2.recv());
+
+ assert_pending!(recv1.poll());
+ assert_pending!(recv2.poll());
+
+ assert_ok!(tx.send("hello"));
+
+ assert!(recv1.is_woken());
+ assert!(recv2.is_woken());
+
+ let val1 = assert_ready_ok!(recv1.poll());
+ let val2 = assert_ready_ok!(recv2.poll());
+ assert_eq!(val1, "hello");
+ assert_eq!(val2, "hello");
+
+ drop((recv1, recv2));
+
+ let mut recv1 = task::spawn(rx1.recv());
+ let mut recv2 = task::spawn(rx2.recv());
+
+ assert_pending!(recv1.poll());
+
+ assert_ok!(tx.send("world"));
+
+ assert!(recv1.is_woken());
+ assert!(!recv2.is_woken());
+
+ let val1 = assert_ready_ok!(recv1.poll());
+ let val2 = assert_ready_ok!(recv2.poll());
+ assert_eq!(val1, "world");
+ assert_eq!(val2, "world");
+}
+
+#[test]
+fn send_slow_rx() {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ {
+ let mut recv2 = task::spawn(rx2.recv());
+
+ {
+ let mut recv1 = task::spawn(rx1.recv());
+
+ assert_pending!(recv1.poll());
+ assert_pending!(recv2.poll());
+
+ assert_ok!(tx.send("one"));
+
+ assert!(recv1.is_woken());
+ assert!(recv2.is_woken());
+
+ assert_ok!(tx.send("two"));
+
+ let val = assert_ready_ok!(recv1.poll());
+ assert_eq!(val, "one");
+ }
+
+ let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
+ assert_eq!(val, "two");
+
+ let mut recv1 = task::spawn(rx1.recv());
+
+ assert_pending!(recv1.poll());
+
+ assert_ok!(tx.send("three"));
+
+ assert!(recv1.is_woken());
+
+ let val = assert_ready_ok!(recv1.poll());
+ assert_eq!(val, "three");
+
+ let val = assert_ready_ok!(recv2.poll());
+ assert_eq!(val, "one");
+ }
+
+ let val = assert_recv!(rx2);
+ assert_eq!(val, "two");
+
+ let val = assert_recv!(rx2);
+ assert_eq!(val, "three");
+}
+
+#[test]
+fn drop_rx_while_values_remain() {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+
+ assert_recv!(rx1);
+ assert_recv!(rx2);
+
+ drop(rx2);
+ drop(rx1);
+}
+
+#[test]
+fn lagging_rx() {
+ let (tx, mut rx1) = broadcast::channel(2);
+ let mut rx2 = tx.subscribe();
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+
+ assert_eq!("one", assert_recv!(rx1));
+
+ assert_ok!(tx.send("three"));
+
+ // Lagged too far
+ assert_lagged!(rx2.try_recv(), 1);
+
+ // Calling again gets the next value
+ assert_eq!("two", assert_recv!(rx2));
+
+ assert_eq!("two", assert_recv!(rx1));
+ assert_eq!("three", assert_recv!(rx1));
+
+ assert_ok!(tx.send("four"));
+ assert_ok!(tx.send("five"));
+
+ assert_lagged!(rx2.try_recv(), 1);
+
+ assert_ok!(tx.send("six"));
+
+ assert_lagged!(rx2.try_recv(), 1);
+}
+
+#[test]
+fn send_no_rx() {
+ let (tx, _) = broadcast::channel(16);
+
+ assert_err!(tx.send("hello"));
+
+ let mut rx = tx.subscribe();
+
+ assert_ok!(tx.send("world"));
+
+ let val = assert_recv!(rx);
+ assert_eq!("world", val);
+}
+
+#[test]
+#[should_panic]
+fn zero_capacity() {
+ broadcast::channel::<()>(0);
+}
+
+#[test]
+#[should_panic]
+fn capacity_too_big() {
+ use std::usize;
+
+ broadcast::channel::<()>(1 + (usize::MAX >> 1));
+}
+
+#[test]
+fn panic_in_clone() {
+ use std::panic::{self, AssertUnwindSafe};
+
+ #[derive(Eq, PartialEq, Debug)]
+ struct MyVal(usize);
+
+ impl Clone for MyVal {
+ fn clone(&self) -> MyVal {
+ assert_ne!(0, self.0);
+ MyVal(self.0)
+ }
+ }
+
+ let (tx, mut rx) = broadcast::channel(16);
+
+ assert_ok!(tx.send(MyVal(0)));
+ assert_ok!(tx.send(MyVal(1)));
+
+ let res = panic::catch_unwind(AssertUnwindSafe(|| {
+ let _ = rx.try_recv();
+ }));
+
+ assert_err!(res);
+
+ let val = assert_recv!(rx);
+ assert_eq!(val, MyVal(1));
+}
+
+#[test]
+fn dropping_tx_notifies_rx() {
+ let (tx, mut rx1) = broadcast::channel::<()>(16);
+ let mut rx2 = tx.subscribe();
+
+ let tx2 = tx.clone();
+
+ let mut recv1 = task::spawn(rx1.recv());
+ let mut recv2 = task::spawn(rx2.recv());
+
+ assert_pending!(recv1.poll());
+ assert_pending!(recv2.poll());
+
+ drop(tx);
+
+ assert_pending!(recv1.poll());
+ assert_pending!(recv2.poll());
+
+ drop(tx2);
+
+ assert!(recv1.is_woken());
+ assert!(recv2.is_woken());
+
+ let err = assert_ready_err!(recv1.poll());
+ assert!(is_closed(err));
+
+ let err = assert_ready_err!(recv2.poll());
+ assert!(is_closed(err));
+}
+
+#[test]
+fn unconsumed_messages_are_dropped() {
+ let (tx, rx) = broadcast::channel(16);
+
+ let msg = Arc::new(());
+
+ assert_ok!(tx.send(msg.clone()));
+
+ assert_eq!(2, Arc::strong_count(&msg));
+
+ drop(rx);
+
+ assert_eq!(1, Arc::strong_count(&msg));
+}
+
+fn is_closed(err: broadcast::RecvError) -> bool {
+ match err {
+ broadcast::RecvError::Closed => true,
+ _ => false,
+ }
+}