diff options
author | Carl Lerche <me@carllerche.com> | 2020-05-12 15:09:43 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-12 15:09:43 -0700 |
commit | fb7dfcf4322b5e60604815aea91266b88f0b7823 (patch) | |
tree | aeba04a918be8a00eb09f6001a4f7946bd188c66 /tokio/tests/sync_broadcast.rs | |
parent | a32f918671ef641affbfcc4d4005ab738da795df (diff) |
sync: use intrusive list strategy for broadcast (#2509)
Previously, in the broadcast channel, receiver wakers were passed to the
sender via an atomic stack with allocated nodes. When a message was
sent, the stack was drained. This caused a problem when many receivers
pushed a waiter node then dropped. The waiter node remained indefinitely
in cases where no values were sent.
This patch switches broadcast to use the intrusive linked-list waiter
strategy used by `Notify` and `Semaphore.
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r-- | tokio/tests/sync_broadcast.rs | 77 |
1 files changed, 75 insertions, 2 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 4d756f91..e37695b3 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -90,10 +90,13 @@ fn send_two_recv() { } #[tokio::test] -async fn send_recv_stream() { +async fn send_recv_into_stream_ready() { use tokio::stream::StreamExt; - let (tx, mut rx) = broadcast::channel::<i32>(8); + let (tx, rx) = broadcast::channel::<i32>(8); + tokio::pin! { + let rx = rx.into_stream(); + } assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); @@ -106,6 +109,26 @@ async fn send_recv_stream() { assert_eq!(None, rx.next().await); } +#[tokio::test] +async fn send_recv_into_stream_pending() { + use tokio::stream::StreamExt; + + let (tx, rx) = broadcast::channel::<i32>(8); + + tokio::pin! { + let rx = rx.into_stream(); + } + + let mut recv = task::spawn(rx.next()); + assert_pending!(recv.poll()); + + assert_ok!(tx.send(1)); + + assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert_eq!(val, Some(Ok(1))); +} + #[test] fn send_recv_bounded() { let (tx, mut rx) = broadcast::channel(16); @@ -161,6 +184,23 @@ fn send_two_recv_bounded() { } #[test] +fn change_tasks() { + let (tx, mut rx) = broadcast::channel(1); + + let mut recv = Box::pin(rx.recv()); + + let mut task1 = task::spawn(&mut recv); + assert_pending!(task1.poll()); + + let mut task2 = task::spawn(&mut recv); + assert_pending!(task2.poll()); + + tx.send("hello").unwrap(); + + assert!(task2.is_woken()); +} + +#[test] fn send_slow_rx() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); @@ -451,6 +491,39 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[tokio::test] +async fn send_recv_stream_ready_deprecated() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = broadcast::channel::<i32>(8); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + assert_eq!(Some(Ok(1)), rx.next().await); + assert_eq!(Some(Ok(2)), rx.next().await); + + drop(tx); + + assert_eq!(None, rx.next().await); +} + +#[tokio::test] +async fn send_recv_stream_pending_deprecated() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = broadcast::channel::<i32>(8); + + let mut recv = task::spawn(rx.next()); + assert_pending!(recv.poll()); + + assert_ok!(tx.send(1)); + + assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert_eq!(val, Some(Ok(1))); +} + fn is_closed(err: broadcast::RecvError) -> bool { match err { broadcast::RecvError::Closed => true, |