#![allow(clippy::cognitive_complexity)] #![warn(rust_2018_idioms)] #![cfg(feature = "sync")] use tokio::sync::broadcast; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; use std::sync::Arc; macro_rules! assert_recv { ($e:expr) => { match $e.try_recv() { Ok(value) => value, Err(e) => panic!("expected recv; got = {:?}", e), } }; } macro_rules! assert_empty { ($e:expr) => { match $e.try_recv() { Ok(value) => panic!("expected empty; got = {:?}", value), Err(broadcast::error::TryRecvError::Empty) => {} Err(e) => panic!("expected empty; got = {:?}", e), } }; } macro_rules! assert_lagged { ($e:expr, $n:expr) => { match assert_err!($e) { broadcast::error::TryRecvError::Lagged(n) => { assert_eq!(n, $n); } _ => panic!("did not lag"), } }; } macro_rules! assert_closed { ($e:expr) => { match assert_err!($e) { broadcast::error::TryRecvError::Closed => {} _ => panic!("did not lag"), } }; } trait AssertSend: Send + Sync {} impl AssertSend for broadcast::Sender {} impl AssertSend for broadcast::Receiver {} #[test] fn send_try_recv_bounded() { let (tx, mut rx) = broadcast::channel(16); assert_empty!(rx); let n = assert_ok!(tx.send("hello")); assert_eq!(n, 1); let val = assert_recv!(rx); assert_eq!(val, "hello"); assert_empty!(rx); } #[test] fn send_two_recv() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); assert_empty!(rx1); assert_empty!(rx2); let n = assert_ok!(tx.send("hello")); assert_eq!(n, 2); let val = assert_recv!(rx1); assert_eq!(val, "hello"); let val = assert_recv!(rx2); assert_eq!(val, "hello"); assert_empty!(rx1); assert_empty!(rx2); } #[tokio::test] async fn send_recv_into_stream_ready() { use tokio::stream::StreamExt; let (tx, rx) = broadcast::channel::(8); tokio::pin! { let rx = rx.into_stream(); } assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); assert_eq!(Some(Ok(1)), rx.next().await); assert_eq!(Some(Ok(2)), rx.next().await); drop(tx); assert_eq!(None, rx.next().await); } #[tokio::test] async fn send_recv_into_stream_pending() { use tokio::stream::StreamExt; let (tx, rx) = broadcast::channel::(8); tokio::pin! { let rx = rx.into_stream(); } let mut recv = task::spawn(rx.next()); assert_pending!(recv.poll()); assert_ok!(tx.send(1)); assert!(recv.is_woken()); let val = assert_ready!(recv.poll()); assert_eq!(val, Some(Ok(1))); } #[test] fn send_recv_bounded() { let (tx, mut rx) = broadcast::channel(16); let mut recv = task::spawn(rx.recv()); assert_pending!(recv.poll()); assert_ok!(tx.send("hello")); assert!(recv.is_woken()); let val = assert_ready_ok!(recv.poll()); assert_eq!(val, "hello"); } #[test] fn send_two_recv_bounded() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); let mut recv1 = task::spawn(rx1.recv()); let mut recv2 = task::spawn(rx2.recv()); assert_pending!(recv1.poll()); assert_pending!(recv2.poll()); assert_ok!(tx.send("hello")); assert!(recv1.is_woken()); assert!(recv2.is_woken()); let val1 = assert_ready_ok!(recv1.poll()); let val2 = assert_ready_ok!(recv2.poll()); assert_eq!(val1, "hello"); assert_eq!(val2, "hello"); drop((recv1, recv2)); let mut recv1 = task::spawn(rx1.recv()); let mut recv2 = task::spawn(rx2.recv()); assert_pending!(recv1.poll()); assert_ok!(tx.send("world")); assert!(recv1.is_woken()); assert!(!recv2.is_woken()); let val1 = assert_ready_ok!(recv1.poll()); let val2 = assert_ready_ok!(recv2.poll()); assert_eq!(val1, "world"); assert_eq!(val2, "world"); } #[test] fn change_tasks() { let (tx, mut rx) = broadcast::channel(1); let mut recv = Box::pin(rx.recv()); let mut task1 = task::spawn(&mut recv); assert_pending!(task1.poll()); let mut task2 = task::spawn(&mut recv); assert_pending!(task2.poll()); tx.send("hello").unwrap(); assert!(task2.is_woken()); } #[test] fn send_slow_rx() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); { let mut recv2 = task::spawn(rx2.recv()); { let mut recv1 = task::spawn(rx1.recv()); assert_pending!(recv1.poll()); assert_pending!(recv2.poll()); assert_ok!(tx.send("one")); assert!(recv1.is_woken()); assert!(recv2.is_woken()); assert_ok!(tx.send("two")); let val = assert_ready_ok!(recv1.poll()); assert_eq!(val, "one"); } let val = assert_ready_ok!(task::spawn(rx1.recv()).poll()); assert_eq!(val, "two"); let mut recv1 = task::spawn(rx1.recv()); assert_pending!(recv1.poll()); assert_ok!(tx.send("three")); assert!(recv1.is_woken()); let val = assert_ready_ok!(recv1.poll()); assert_eq!(val, "three"); let val = assert_ready_ok!(recv2.poll()); assert_eq!(val, "one"); } let val = assert_recv!(rx2); assert_eq!(val, "two"); let val = assert_recv!(rx2); assert_eq!(val, "three"); } #[test] fn drop_rx_while_values_remain() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); assert_ok!(tx.send("one")); assert_ok!(tx.send("two")); assert_recv!(rx1); assert_recv!(rx2); drop(rx2); drop(rx1); } #[test] fn lagging_rx() { let (tx, mut rx1) = broadcast::channel(2); let mut rx2 = tx.subscribe(); assert_ok!(tx.send("one")); assert_ok!(tx.send("two")); assert_eq!("one", assert_recv!(rx1)); assert_ok!(tx.send("three")); // Lagged too far let x = dbg!(rx2.try_recv()); assert_lagged!(x, 1); // Calling again gets the next value assert_eq!("two", assert_recv!(rx2)); assert_eq!("two", assert_recv!(rx1)); assert_eq!("three", assert_recv!(rx1)); assert_ok!(tx.send("four")); assert_ok!(tx.send("five")); assert_lagged!(rx2.try_recv(), 1); assert_ok!(tx.send("six")); assert_lagged!(rx2.try_recv(), 1); } #[test] fn send_no_rx() { let (tx, _) = broadcast::channel(16); assert_err!(tx.send("hello")); let mut rx = tx.subscribe(); assert_ok!(tx.send("world")); let val = assert_recv!(rx); assert_eq!("world", val); } #[test] #[should_panic] fn zero_capacity() { broadcast::channel::<()>(0); } #[test] #[should_panic] fn capacity_too_big() { use std::usize; broadcast::channel::<()>(1 + (usize::MAX >> 1)); } #[test] fn panic_in_clone() { use std::panic::{self, AssertUnwindSafe}; #[derive(Eq, PartialEq, Debug)] struct MyVal(usize); impl Clone for MyVal { fn clone(&self) -> MyVal { assert_ne!(0, self.0); MyVal(self.0) } } let (tx, mut rx) = broadcast::channel(16); assert_ok!(tx.send(MyVal(0))); assert_ok!(tx.send(MyVal(1))); let res = panic::catch_unwind(AssertUnwindSafe(|| { let _ = rx.try_recv(); })); assert_err!(res); let val = assert_recv!(rx); assert_eq!(val, MyVal(1)); } #[test] fn dropping_tx_notifies_rx() { let (tx, mut rx1) = broadcast::channel::<()>(16); let mut rx2 = tx.subscribe(); let tx2 = tx.clone(); let mut recv1 = task::spawn(rx1.recv()); let mut recv2 = task::spawn(rx2.recv()); assert_pending!(recv1.poll()); assert_pending!(recv2.poll()); drop(tx); assert_pending!(recv1.poll()); assert_pending!(recv2.poll()); drop(tx2); assert!(recv1.is_woken()); assert!(recv2.is_woken()); let err = assert_ready_err!(recv1.poll()); assert!(is_closed(err)); let err = assert_ready_err!(recv2.poll()); assert!(is_closed(err)); } #[test] fn unconsumed_messages_are_dropped() { let (tx, rx) = broadcast::channel(16); let msg = Arc::new(()); assert_ok!(tx.send(msg.clone())); assert_eq!(2, Arc::strong_count(&msg)); drop(rx); assert_eq!(1, Arc::strong_count(&msg)); } #[test] fn single_capacity_recvs() { let (tx, mut rx) = broadcast::channel(1); assert_ok!(tx.send(1)); assert_eq!(assert_recv!(rx), 1); assert_empty!(rx); } #[test] fn single_capacity_recvs_after_drop_1() { let (tx, mut rx) = broadcast::channel(1); assert_ok!(tx.send(1)); drop(tx); assert_eq!(assert_recv!(rx), 1); assert_closed!(rx.try_recv()); } #[test] fn single_capacity_recvs_after_drop_2() { let (tx, mut rx) = broadcast::channel(1); assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); drop(tx); assert_lagged!(rx.try_recv(), 1); assert_eq!(assert_recv!(rx), 2); assert_closed!(rx.try_recv()); } #[test] fn dropping_sender_does_not_overwrite() { let (tx, mut rx) = broadcast::channel(2); assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); drop(tx); assert_eq!(assert_recv!(rx), 1); assert_eq!(assert_recv!(rx), 2); assert_closed!(rx.try_recv()); } #[test] fn lagging_receiver_recovers_after_wrap_closed_1() { let (tx, mut rx) = broadcast::channel(2); assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); assert_ok!(tx.send(3)); drop(tx); assert_lagged!(rx.try_recv(), 1); assert_eq!(assert_recv!(rx), 2); assert_eq!(assert_recv!(rx), 3); assert_closed!(rx.try_recv()); } #[test] fn lagging_receiver_recovers_after_wrap_closed_2() { let (tx, mut rx) = broadcast::channel(2); assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); assert_ok!(tx.send(3)); assert_ok!(tx.send(4)); drop(tx); assert_lagged!(rx.try_recv(), 2); assert_eq!(assert_recv!(rx), 3); assert_eq!(assert_recv!(rx), 4); assert_closed!(rx.try_recv()); } #[test] fn lagging_receiver_recovers_after_wrap_open() { let (tx, mut rx) = broadcast::channel(2); assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); assert_ok!(tx.send(3)); assert_lagged!(rx.try_recv(), 1); assert_eq!(assert_recv!(rx), 2); assert_eq!(assert_recv!(rx), 3); assert_empty!(rx); } fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) }