diff options
author | Carl Lerche <me@carllerche.com> | 2019-12-18 10:13:15 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-18 10:13:15 -0800 |
commit | 7c010ed030d0084d38451bbebf727c6695c2dbac (patch) | |
tree | d2507c71f9b6c1cdb988097592daa9566ec2ec51 /tokio/src/sync/tests | |
parent | 42c942de1433c37946f81d185459b8ee54928d3d (diff) |
sync: add broadcast channel (#1943)
Adds a broadcast channel implementation. A broadcast channel is a
multi-producer, multi-consumer channel where each consumer receives a
clone of every value sent. This is useful for implementing pub / sub
style patterns.
Implemented as a ring buffer, a Vec of the specified capacity is
allocated on initialization of the channel. Values are pushed into
slots.
When the channel is full, a send overwrites the oldest value. Receivers
detect this and return an error on the next call to receive. This
prevents unbounded buffering and does not make the channel vulnerable to
the slowest consumer.
Closes: #1585
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r-- | tokio/src/sync/tests/loom_broadcast.rs | 144 | ||||
-rw-r--r-- | tokio/src/sync/tests/mod.rs | 1 |
2 files changed, 145 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs new file mode 100644 index 00000000..53f76a25 --- /dev/null +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -0,0 +1,144 @@ +use crate::sync::broadcast; +use crate::sync::broadcast::RecvError::{Closed, Lagged}; + +use loom::future::block_on; +use loom::sync::Arc; +use loom::thread; +use tokio_test::{assert_err, assert_ok}; + +// An `Arc` is used as the value in order to detect memory leaks. +#[test] +fn broadcast_two() { + loom::model(|| { + let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16); + let mut rx2 = tx.subscribe(); + + let th1 = thread::spawn(move || { + block_on(async { + let v = assert_ok!(rx1.recv().await); + assert_eq!(*v, "hello"); + + let v = assert_ok!(rx1.recv().await); + assert_eq!(*v, "world"); + + match assert_err!(rx1.recv().await) { + Closed => {} + _ => panic!(), + } + }); + }); + + let th2 = thread::spawn(move || { + block_on(async { + let v = assert_ok!(rx2.recv().await); + assert_eq!(*v, "hello"); + + let v = assert_ok!(rx2.recv().await); + assert_eq!(*v, "world"); + + match assert_err!(rx2.recv().await) { + Closed => {} + _ => panic!(), + } + }); + }); + + assert_ok!(tx.send(Arc::new("hello"))); + assert_ok!(tx.send(Arc::new("world"))); + drop(tx); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} + +#[test] +fn broadcast_wrap() { + loom::model(|| { + let (tx, mut rx1) = broadcast::channel(2); + let mut rx2 = tx.subscribe(); + + let th1 = thread::spawn(move || { + block_on(async { + let mut num = 0; + + loop { + match rx1.recv().await { + Ok(_) => num += 1, + Err(Closed) => break, + Err(Lagged(n)) => { + num += n as usize + }, + } + } + + assert_eq!(num, 3); + }); + }); + + let th2 = thread::spawn(move || { + block_on(async { + let mut num = 0; + + loop { + match rx2.recv().await { + Ok(_) => num += 1, + Err(Closed) => break, + Err(Lagged(n)) => { + num += n as usize + } + } + } + + assert_eq!(num, 3); + }); + }); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + assert_ok!(tx.send("three")); + + drop(tx); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} + +#[test] +fn drop_rx() { + loom::model(|| { + let (tx, mut rx1) = broadcast::channel(16); + let rx2 = tx.subscribe(); + + let th1 = thread::spawn(move || { + block_on(async { + let v = assert_ok!(rx1.recv().await); + assert_eq!(v, "one"); + + let v = assert_ok!(rx1.recv().await); + assert_eq!(v, "two"); + + let v = assert_ok!(rx1.recv().await); + assert_eq!(v, "three"); + + match assert_err!(rx1.recv().await) { + Closed => {} + _ => panic!(), + } + }); + }); + + let th2 = thread::spawn(move || { + drop(rx2); + }); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + assert_ok!(tx.send("three")); + drop(tx); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index 0b50cc95..2ee140cb 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -5,6 +5,7 @@ cfg_not_loom! { cfg_loom! { mod loom_atomic_waker; + mod loom_broadcast; mod loom_list; mod loom_mpsc; mod loom_oneshot; |