diff options
author | Carl Lerche <me@carllerche.com> | 2019-12-18 10:13:15 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-18 10:13:15 -0800 |
commit | 7c010ed030d0084d38451bbebf727c6695c2dbac (patch) | |
tree | d2507c71f9b6c1cdb988097592daa9566ec2ec51 /tokio/tests/sync_broadcast.rs | |
parent | 42c942de1433c37946f81d185459b8ee54928d3d (diff) |
sync: add broadcast channel (#1943)
Adds a broadcast channel implementation. A broadcast channel is a
multi-producer, multi-consumer channel where each consumer receives a
clone of every value sent. This is useful for implementing pub / sub
style patterns.
Implemented as a ring buffer, a Vec of the specified capacity is
allocated on initialization of the channel. Values are pushed into
slots.
When the channel is full, a send overwrites the oldest value. Receivers
detect this and return an error on the next call to receive. This
prevents unbounded buffering and does not make the channel vulnerable to
the slowest consumer.
Closes: #1585
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r-- | tokio/tests/sync_broadcast.rs | 339 |
1 files changed, 339 insertions, 0 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs new file mode 100644 index 00000000..257595ca --- /dev/null +++ b/tokio/tests/sync_broadcast.rs @@ -0,0 +1,339 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "sync")] + +use tokio::sync::broadcast; +use tokio_test::task; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; + +use std::sync::Arc; + +macro_rules! assert_recv { + ($e:expr) => { + match $e.try_recv() { + Ok(value) => value, + Err(e) => panic!("expected recv; got = {:?}", e), + } + }; +} + +macro_rules! assert_empty { + ($e:expr) => { + match $e.try_recv() { + Ok(value) => panic!("expected empty; got = {:?}", value), + Err(broadcast::TryRecvError::Empty) => {} + Err(e) => panic!("expected empty; got = {:?}", e), + } + }; +} + +macro_rules! assert_lagged { + ($e:expr, $n:expr) => { + match assert_err!($e) { + broadcast::TryRecvError::Lagged(n) => { + assert_eq!(n, $n); + } + _ => panic!("did not lag"), + } + }; +} + +trait AssertSend: Send {} +impl AssertSend for broadcast::Sender<i32> {} +impl AssertSend for broadcast::Receiver<i32> {} + +#[test] +fn send_try_recv_bounded() { + let (tx, mut rx) = broadcast::channel(16); + + assert_empty!(rx); + + let n = assert_ok!(tx.send("hello")); + assert_eq!(n, 1); + + let val = assert_recv!(rx); + assert_eq!(val, "hello"); + + assert_empty!(rx); +} + +#[test] +fn send_two_recv() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + assert_empty!(rx1); + assert_empty!(rx2); + + let n = assert_ok!(tx.send("hello")); + assert_eq!(n, 2); + + let val = assert_recv!(rx1); + assert_eq!(val, "hello"); + + let val = assert_recv!(rx2); + assert_eq!(val, "hello"); + + assert_empty!(rx1); + assert_empty!(rx2); +} + +#[test] +fn send_recv_bounded() { + let (tx, mut rx) = broadcast::channel(16); + + let mut recv = task::spawn(rx.recv()); + + assert_pending!(recv.poll()); + + assert_ok!(tx.send("hello")); + + assert!(recv.is_woken()); + let val = assert_ready_ok!(recv.poll()); + assert_eq!(val, "hello"); +} + +#[test] +fn send_two_recv_bounded() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + let mut recv1 = task::spawn(rx1.recv()); + let mut recv2 = task::spawn(rx2.recv()); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + assert_ok!(tx.send("hello")); + + assert!(recv1.is_woken()); + assert!(recv2.is_woken()); + + let val1 = assert_ready_ok!(recv1.poll()); + let val2 = assert_ready_ok!(recv2.poll()); + assert_eq!(val1, "hello"); + assert_eq!(val2, "hello"); + + drop((recv1, recv2)); + + let mut recv1 = task::spawn(rx1.recv()); + let mut recv2 = task::spawn(rx2.recv()); + + assert_pending!(recv1.poll()); + + assert_ok!(tx.send("world")); + + assert!(recv1.is_woken()); + assert!(!recv2.is_woken()); + + let val1 = assert_ready_ok!(recv1.poll()); + let val2 = assert_ready_ok!(recv2.poll()); + assert_eq!(val1, "world"); + assert_eq!(val2, "world"); +} + +#[test] +fn send_slow_rx() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + { + let mut recv2 = task::spawn(rx2.recv()); + + { + let mut recv1 = task::spawn(rx1.recv()); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + assert_ok!(tx.send("one")); + + assert!(recv1.is_woken()); + assert!(recv2.is_woken()); + + assert_ok!(tx.send("two")); + + let val = assert_ready_ok!(recv1.poll()); + assert_eq!(val, "one"); + } + + let val = assert_ready_ok!(task::spawn(rx1.recv()).poll()); + assert_eq!(val, "two"); + + let mut recv1 = task::spawn(rx1.recv()); + + assert_pending!(recv1.poll()); + + assert_ok!(tx.send("three")); + + assert!(recv1.is_woken()); + + let val = assert_ready_ok!(recv1.poll()); + assert_eq!(val, "three"); + + let val = assert_ready_ok!(recv2.poll()); + assert_eq!(val, "one"); + } + + let val = assert_recv!(rx2); + assert_eq!(val, "two"); + + let val = assert_recv!(rx2); + assert_eq!(val, "three"); +} + +#[test] +fn drop_rx_while_values_remain() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + + assert_recv!(rx1); + assert_recv!(rx2); + + drop(rx2); + drop(rx1); +} + +#[test] +fn lagging_rx() { + let (tx, mut rx1) = broadcast::channel(2); + let mut rx2 = tx.subscribe(); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + + assert_eq!("one", assert_recv!(rx1)); + + assert_ok!(tx.send("three")); + + // Lagged too far + assert_lagged!(rx2.try_recv(), 1); + + // Calling again gets the next value + assert_eq!("two", assert_recv!(rx2)); + + assert_eq!("two", assert_recv!(rx1)); + assert_eq!("three", assert_recv!(rx1)); + + assert_ok!(tx.send("four")); + assert_ok!(tx.send("five")); + + assert_lagged!(rx2.try_recv(), 1); + + assert_ok!(tx.send("six")); + + assert_lagged!(rx2.try_recv(), 1); +} + +#[test] +fn send_no_rx() { + let (tx, _) = broadcast::channel(16); + + assert_err!(tx.send("hello")); + + let mut rx = tx.subscribe(); + + assert_ok!(tx.send("world")); + + let val = assert_recv!(rx); + assert_eq!("world", val); +} + +#[test] +#[should_panic] +fn zero_capacity() { + broadcast::channel::<()>(0); +} + +#[test] +#[should_panic] +fn capacity_too_big() { + use std::usize; + + broadcast::channel::<()>(1 + (usize::MAX >> 1)); +} + +#[test] +fn panic_in_clone() { + use std::panic::{self, AssertUnwindSafe}; + + #[derive(Eq, PartialEq, Debug)] + struct MyVal(usize); + + impl Clone for MyVal { + fn clone(&self) -> MyVal { + assert_ne!(0, self.0); + MyVal(self.0) + } + } + + let (tx, mut rx) = broadcast::channel(16); + + assert_ok!(tx.send(MyVal(0))); + assert_ok!(tx.send(MyVal(1))); + + let res = panic::catch_unwind(AssertUnwindSafe(|| { + let _ = rx.try_recv(); + })); + + assert_err!(res); + + let val = assert_recv!(rx); + assert_eq!(val, MyVal(1)); +} + +#[test] +fn dropping_tx_notifies_rx() { + let (tx, mut rx1) = broadcast::channel::<()>(16); + let mut rx2 = tx.subscribe(); + + let tx2 = tx.clone(); + + let mut recv1 = task::spawn(rx1.recv()); + let mut recv2 = task::spawn(rx2.recv()); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + drop(tx); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + drop(tx2); + + assert!(recv1.is_woken()); + assert!(recv2.is_woken()); + + let err = assert_ready_err!(recv1.poll()); + assert!(is_closed(err)); + + let err = assert_ready_err!(recv2.poll()); + assert!(is_closed(err)); +} + +#[test] +fn unconsumed_messages_are_dropped() { + let (tx, rx) = broadcast::channel(16); + + let msg = Arc::new(()); + + assert_ok!(tx.send(msg.clone())); + + assert_eq!(2, Arc::strong_count(&msg)); + + drop(rx); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +fn is_closed(err: broadcast::RecvError) -> bool { + match err { + broadcast::RecvError::Closed => true, + _ => false, + } +} |