summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests
diff options
context:
space:
mode:
authorkalcutter <31195032+kalcutter@users.noreply.github.com>2020-01-22 22:59:05 +0100
committerCarl Lerche <me@carllerche.com>2020-01-22 13:59:05 -0800
commitf9ea576ccae5beffeaa2f2c48c2c0d2f9449673b (patch)
treef726788538b57d83f5e223648d7d21622ec86c41 /tokio/src/sync/tests
parent7f580071f3e5d475db200d2101ff35be0b4f6efe (diff)
sync: fix broadcast bugs (#2135)
Make sure the tail mutex is acquired when `condvar` is notified, otherwise the wakeup may be lost and the sender could be left waiting. Use `notify_all()` instead of `notify_one()` to ensure that the correct sender is woken. Finally, only do any of this when there are no more readers left. Additionally, calling `send()` is buggy and may cause a panic when the slot has another pending send.
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r--tokio/src/sync/tests/loom_broadcast.rs40
1 files changed, 40 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs
index da61563b..da12fb9f 100644
--- a/tokio/src/sync/tests/loom_broadcast.rs
+++ b/tokio/src/sync/tests/loom_broadcast.rs
@@ -6,6 +6,46 @@ use loom::sync::Arc;
use loom::thread;
use tokio_test::{assert_err, assert_ok};
+#[test]
+fn broadcast_send() {
+ loom::model(|| {
+ let (tx1, mut rx) = broadcast::channel(2);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ assert_ok!(tx1.send("one"));
+ assert_ok!(tx1.send("two"));
+ assert_ok!(tx1.send("three"));
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ assert_ok!(tx2.send("eins"));
+ assert_ok!(tx2.send("zwei"));
+ assert_ok!(tx2.send("drei"));
+ });
+ });
+
+ block_on(async {
+ let mut num = 0;
+ loop {
+ match rx.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+ assert_eq!(num, 6);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
// An `Arc` is used as the value in order to detect memory leaks.
#[test]
fn broadcast_two() {