diff options
Diffstat (limited to 'tokio/tests/sync_broadcast.rs')
-rw-r--r-- | tokio/tests/sync_broadcast.rs | 77 |
1 files changed, 75 insertions, 2 deletions
diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 4d756f91..e37695b3 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -90,10 +90,13 @@ fn send_two_recv() { } #[tokio::test] -async fn send_recv_stream() { +async fn send_recv_into_stream_ready() { use tokio::stream::StreamExt; - let (tx, mut rx) = broadcast::channel::<i32>(8); + let (tx, rx) = broadcast::channel::<i32>(8); + tokio::pin! { + let rx = rx.into_stream(); + } assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); @@ -106,6 +109,26 @@ async fn send_recv_stream() { assert_eq!(None, rx.next().await); } +#[tokio::test] +async fn send_recv_into_stream_pending() { + use tokio::stream::StreamExt; + + let (tx, rx) = broadcast::channel::<i32>(8); + + tokio::pin! { + let rx = rx.into_stream(); + } + + let mut recv = task::spawn(rx.next()); + assert_pending!(recv.poll()); + + assert_ok!(tx.send(1)); + + assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert_eq!(val, Some(Ok(1))); +} + #[test] fn send_recv_bounded() { let (tx, mut rx) = broadcast::channel(16); @@ -161,6 +184,23 @@ fn send_two_recv_bounded() { } #[test] +fn change_tasks() { + let (tx, mut rx) = broadcast::channel(1); + + let mut recv = Box::pin(rx.recv()); + + let mut task1 = task::spawn(&mut recv); + assert_pending!(task1.poll()); + + let mut task2 = task::spawn(&mut recv); + assert_pending!(task2.poll()); + + tx.send("hello").unwrap(); + + assert!(task2.is_woken()); +} + +#[test] fn send_slow_rx() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); @@ -451,6 +491,39 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[tokio::test] +async fn send_recv_stream_ready_deprecated() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = broadcast::channel::<i32>(8); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + assert_eq!(Some(Ok(1)), rx.next().await); + assert_eq!(Some(Ok(2)), rx.next().await); + + drop(tx); + + assert_eq!(None, rx.next().await); +} + +#[tokio::test] +async fn send_recv_stream_pending_deprecated() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = broadcast::channel::<i32>(8); + + let mut recv = task::spawn(rx.next()); + assert_pending!(recv.poll()); + + assert_ok!(tx.send(1)); + + assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert_eq!(val, Some(Ok(1))); +} + fn is_closed(err: broadcast::RecvError) -> bool { match err { broadcast::RecvError::Closed => true, |