summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-12-18 10:13:15 -0800
committerGitHub <noreply@github.com>2019-12-18 10:13:15 -0800
commit7c010ed030d0084d38451bbebf727c6695c2dbac (patch)
treed2507c71f9b6c1cdb988097592daa9566ec2ec51 /tokio/src/sync/tests
parent42c942de1433c37946f81d185459b8ee54928d3d (diff)
sync: add broadcast channel (#1943)
Adds a broadcast channel implementation. A broadcast channel is a multi-producer, multi-consumer channel where each consumer receives a clone of every value sent. This is useful for implementing pub / sub style patterns. Implemented as a ring buffer, a Vec of the specified capacity is allocated on initialization of the channel. Values are pushed into slots. When the channel is full, a send overwrites the oldest value. Receivers detect this and return an error on the next call to receive. This prevents unbounded buffering and does not make the channel vulnerable to the slowest consumer. Closes: #1585
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r--tokio/src/sync/tests/loom_broadcast.rs144
-rw-r--r--tokio/src/sync/tests/mod.rs1
2 files changed, 145 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs
new file mode 100644
index 00000000..53f76a25
--- /dev/null
+++ b/tokio/src/sync/tests/loom_broadcast.rs
@@ -0,0 +1,144 @@
+use crate::sync::broadcast;
+use crate::sync::broadcast::RecvError::{Closed, Lagged};
+
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+use tokio_test::{assert_err, assert_ok};
+
+// An `Arc` is used as the value in order to detect memory leaks.
+#[test]
+fn broadcast_two() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
+ let mut rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(*v, "hello");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(*v, "world");
+
+ match assert_err!(rx1.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx2.recv().await);
+ assert_eq!(*v, "hello");
+
+ let v = assert_ok!(rx2.recv().await);
+ assert_eq!(*v, "world");
+
+ match assert_err!(rx2.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ assert_ok!(tx.send(Arc::new("hello")));
+ assert_ok!(tx.send(Arc::new("world")));
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn broadcast_wrap() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel(2);
+ let mut rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let mut num = 0;
+
+ loop {
+ match rx1.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => {
+ num += n as usize
+ },
+ }
+ }
+
+ assert_eq!(num, 3);
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ let mut num = 0;
+
+ loop {
+ match rx2.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => {
+ num += n as usize
+ }
+ }
+ }
+
+ assert_eq!(num, 3);
+ });
+ });
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+ assert_ok!(tx.send("three"));
+
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn drop_rx() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "one");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "two");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "three");
+
+ match assert_err!(rx1.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ drop(rx2);
+ });
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+ assert_ok!(tx.send("three"));
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs
index 0b50cc95..2ee140cb 100644
--- a/tokio/src/sync/tests/mod.rs
+++ b/tokio/src/sync/tests/mod.rs
@@ -5,6 +5,7 @@ cfg_not_loom! {
cfg_loom! {
mod loom_atomic_waker;
+ mod loom_broadcast;
mod loom_list;
mod loom_mpsc;
mod loom_oneshot;