summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_broadcast.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-05-12 15:09:43 -0700
committerGitHub <noreply@github.com>2020-05-12 15:09:43 -0700
commitfb7dfcf4322b5e60604815aea91266b88f0b7823 (patch)
treeaeba04a918be8a00eb09f6001a4f7946bd188c66 /tokio/tests/sync_broadcast.rs
parenta32f918671ef641affbfcc4d4005ab738da795df (diff)
sync: use intrusive list strategy for broadcast (#2509)
Previously, in the broadcast channel, receiver wakers were passed to the sender via an atomic stack with allocated nodes. When a message was sent, the stack was drained. This caused a problem when many receivers pushed a waiter node then dropped. The waiter node remained indefinitely in cases where no values were sent. This patch switches broadcast to use the intrusive linked-list waiter strategy used by `Notify` and `Semaphore.
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r--tokio/tests/sync_broadcast.rs77
1 files changed, 75 insertions, 2 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs
index 4d756f91..e37695b3 100644
--- a/tokio/tests/sync_broadcast.rs
+++ b/tokio/tests/sync_broadcast.rs
@@ -90,10 +90,13 @@ fn send_two_recv() {
}
#[tokio::test]
-async fn send_recv_stream() {
+async fn send_recv_into_stream_ready() {
use tokio::stream::StreamExt;
- let (tx, mut rx) = broadcast::channel::<i32>(8);
+ let (tx, rx) = broadcast::channel::<i32>(8);
+ tokio::pin! {
+ let rx = rx.into_stream();
+ }
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
@@ -106,6 +109,26 @@ async fn send_recv_stream() {
assert_eq!(None, rx.next().await);
}
+#[tokio::test]
+async fn send_recv_into_stream_pending() {
+ use tokio::stream::StreamExt;
+
+ let (tx, rx) = broadcast::channel::<i32>(8);
+
+ tokio::pin! {
+ let rx = rx.into_stream();
+ }
+
+ let mut recv = task::spawn(rx.next());
+ assert_pending!(recv.poll());
+
+ assert_ok!(tx.send(1));
+
+ assert!(recv.is_woken());
+ let val = assert_ready!(recv.poll());
+ assert_eq!(val, Some(Ok(1)));
+}
+
#[test]
fn send_recv_bounded() {
let (tx, mut rx) = broadcast::channel(16);
@@ -161,6 +184,23 @@ fn send_two_recv_bounded() {
}
#[test]
+fn change_tasks() {
+ let (tx, mut rx) = broadcast::channel(1);
+
+ let mut recv = Box::pin(rx.recv());
+
+ let mut task1 = task::spawn(&mut recv);
+ assert_pending!(task1.poll());
+
+ let mut task2 = task::spawn(&mut recv);
+ assert_pending!(task2.poll());
+
+ tx.send("hello").unwrap();
+
+ assert!(task2.is_woken());
+}
+
+#[test]
fn send_slow_rx() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
@@ -451,6 +491,39 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}
+#[tokio::test]
+async fn send_recv_stream_ready_deprecated() {
+ use tokio::stream::StreamExt;
+
+ let (tx, mut rx) = broadcast::channel::<i32>(8);
+
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+
+ assert_eq!(Some(Ok(1)), rx.next().await);
+ assert_eq!(Some(Ok(2)), rx.next().await);
+
+ drop(tx);
+
+ assert_eq!(None, rx.next().await);
+}
+
+#[tokio::test]
+async fn send_recv_stream_pending_deprecated() {
+ use tokio::stream::StreamExt;
+
+ let (tx, mut rx) = broadcast::channel::<i32>(8);
+
+ let mut recv = task::spawn(rx.next());
+ assert_pending!(recv.poll());
+
+ assert_ok!(tx.send(1));
+
+ assert!(recv.is_woken());
+ let val = assert_ready!(recv.poll());
+ assert_eq!(val, Some(Ok(1)));
+}
+
fn is_closed(err: broadcast::RecvError) -> bool {
match err {
broadcast::RecvError::Closed => true,