diff options
author | kalcutter <31195032+kalcutter@users.noreply.github.com> | 2020-01-22 22:59:05 +0100 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2020-01-22 13:59:05 -0800 |
commit | f9ea576ccae5beffeaa2f2c48c2c0d2f9449673b (patch) | |
tree | f726788538b57d83f5e223648d7d21622ec86c41 /tokio/src/sync/tests | |
parent | 7f580071f3e5d475db200d2101ff35be0b4f6efe (diff) |
sync: fix broadcast bugs (#2135)
Make sure the tail mutex is acquired when `condvar` is notified,
otherwise the wakeup may be lost and the sender could be left waiting.
Use `notify_all()` instead of `notify_one()` to ensure that the correct
sender is woken. Finally, only do any of this when there are no more
readers left.
Additionally, calling `send()` is buggy and may cause a panic when
the slot has another pending send.
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r-- | tokio/src/sync/tests/loom_broadcast.rs | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index da61563b..da12fb9f 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -6,6 +6,46 @@ use loom::sync::Arc; use loom::thread; use tokio_test::{assert_err, assert_ok}; +#[test] +fn broadcast_send() { + loom::model(|| { + let (tx1, mut rx) = broadcast::channel(2); + let tx1 = Arc::new(tx1); + let tx2 = tx1.clone(); + + let th1 = thread::spawn(move || { + block_on(async { + assert_ok!(tx1.send("one")); + assert_ok!(tx1.send("two")); + assert_ok!(tx1.send("three")); + }); + }); + + let th2 = thread::spawn(move || { + block_on(async { + assert_ok!(tx2.send("eins")); + assert_ok!(tx2.send("zwei")); + assert_ok!(tx2.send("drei")); + }); + }); + + block_on(async { + let mut num = 0; + loop { + match rx.recv().await { + Ok(_) => num += 1, + Err(Closed) => break, + Err(Lagged(n)) => num += n as usize, + } + } + assert_eq!(num, 6); + }); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} + // An `Arc` is used as the value in order to detect memory leaks. #[test] fn broadcast_two() { |