From cf025ba45f68934ae2138bb75ee2a5ee50506d1b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 24 Sep 2020 17:26:38 -0700 Subject: sync: support mpsc send with `&self` (#2861) Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters) --- tokio/tests/rt_threaded.rs | 10 +- tokio/tests/sync_mpsc.rs | 363 ++++++++++++++++++--------------------------- 2 files changed, 150 insertions(+), 223 deletions(-) (limited to 'tokio/tests') diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index a67c090e..2c7cfb80 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -70,7 +70,7 @@ fn many_multishot_futures() { let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10); for _ in 0..CHAIN { - let (mut next_tx, next_rx) = tokio::sync::mpsc::channel(10); + let (next_tx, next_rx) = tokio::sync::mpsc::channel(10); // Forward all the messages rt.spawn(async move { @@ -83,8 +83,8 @@ fn many_multishot_futures() { } // This final task cycles if needed - let (mut final_tx, final_rx) = tokio::sync::mpsc::channel(10); - let mut cycle_tx = start_tx.clone(); + let (final_tx, final_rx) = tokio::sync::mpsc::channel(10); + let cycle_tx = start_tx.clone(); let mut rem = CYCLES; rt.spawn(async move { @@ -107,7 +107,7 @@ fn many_multishot_futures() { { rt.block_on(async move { - for mut start_tx in start_txs { + for start_tx in start_txs { start_tx.send("ping").await.unwrap(); } @@ -340,7 +340,7 @@ fn coop_and_block_in_place() { .unwrap(); rt.block_on(async move { - let (mut tx, mut rx) = tokio::sync::mpsc::channel(1024); + let (tx, mut rx) = tokio::sync::mpsc::channel(1024); // Fill the channel for _ in 0..1024 { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 919bddbf..adefcb12 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -17,74 +17,72 @@ trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} impl AssertSend for mpsc::Receiver {} -#[test] -fn send_recv_with_buffer() { - let (tx, rx) = mpsc::channel::(16); - let mut tx = task::spawn(tx); - let mut rx = task::spawn(rx); +#[tokio::test] +async fn send_recv_with_buffer() { + let (tx, mut rx) = mpsc::channel::(16); // Using poll_ready / try_send - assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); - tx.try_send(1).unwrap(); + // let permit assert_ready_ok!(tx.reserve()); + let permit = tx.reserve().await.unwrap(); + permit.send(1); // Without poll_ready tx.try_send(2).unwrap(); drop(tx); - let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + let val = rx.recv().await; assert_eq!(val, Some(1)); - let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + let val = rx.recv().await; assert_eq!(val, Some(2)); - let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + let val = rx.recv().await; assert!(val.is_none()); } -#[test] -fn disarm() { - let (tx, rx) = mpsc::channel::(2); - let mut tx1 = task::spawn(tx.clone()); - let mut tx2 = task::spawn(tx.clone()); - let mut tx3 = task::spawn(tx.clone()); - let mut tx4 = task::spawn(tx); - let mut rx = task::spawn(rx); +#[tokio::test] +async fn reserve_disarm() { + let (tx, mut rx) = mpsc::channel::(2); + let tx1 = tx.clone(); + let tx2 = tx.clone(); + let tx3 = tx.clone(); + let tx4 = tx; // We should be able to `poll_ready` two handles without problem - assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); - assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx))); + let permit1 = assert_ok!(tx1.reserve().await); + let permit2 = assert_ok!(tx2.reserve().await); // But a third should not be ready - assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + let mut r3 = task::spawn(tx3.reserve()); + assert_pending!(r3.poll()); + + let mut r4 = task::spawn(tx4.reserve()); + assert_pending!(r4.poll()); // Using one of the reserved slots should allow a new handle to become ready - tx1.try_send(1).unwrap(); + permit1.send(1); + // We also need to receive for the slot to be free - let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap(); + assert!(!r3.is_woken()); + rx.recv().await.unwrap(); // Now there's a free slot! - assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); - assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + assert!(r3.is_woken()); + assert!(!r4.is_woken()); - // Dropping a ready handle should also open up a slot - drop(tx2); - assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); - assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); - - // Explicitly disarming a handle should also open a slot - assert!(tx3.disarm()); - assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + // Dropping a permit should also open up a slot + drop(permit2); + assert!(r4.is_woken()); - // Disarming a non-armed sender does not free up a slot - assert!(!tx3.disarm()); - assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + let mut r1 = task::spawn(tx1.reserve()); + assert_pending!(r1.poll()); } #[tokio::test] async fn send_recv_stream_with_buffer() { use tokio::stream::StreamExt; - let (mut tx, mut rx) = mpsc::channel::(16); + let (tx, mut rx) = mpsc::channel::(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); @@ -98,7 +96,7 @@ async fn send_recv_stream_with_buffer() { #[tokio::test] async fn async_send_recv_with_buffer() { - let (mut tx, mut rx) = mpsc::channel(16); + let (tx, mut rx) = mpsc::channel(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); @@ -110,37 +108,36 @@ async fn async_send_recv_with_buffer() { assert_eq!(None, rx.recv().await); } -#[test] -fn start_send_past_cap() { +#[tokio::test] +async fn start_send_past_cap() { + use std::future::Future; + let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - let mut t3 = task::spawn(()); - let (mut tx1, mut rx) = mpsc::channel(1); - let mut tx2 = tx1.clone(); + let (tx1, mut rx) = mpsc::channel(1); + let tx2 = tx1.clone(); assert_ok!(tx1.try_send(())); - t1.enter(|cx, _| { - assert_pending!(tx1.poll_ready(cx)); - }); + let mut r1 = Box::pin(tx1.reserve()); + t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx))); - t2.enter(|cx, _| { - assert_pending!(tx2.poll_ready(cx)); - }); + { + let mut r2 = task::spawn(tx2.reserve()); + assert_pending!(r2.poll()); - drop(tx1); + drop(r1); - let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); - assert!(val.is_some()); + assert!(rx.recv().await.is_some()); - assert!(t2.is_woken()); - assert!(!t1.is_woken()); + assert!(r2.is_woken()); + assert!(!t1.is_woken()); + } + drop(tx1); drop(tx2); - let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); - assert!(val.is_none()); + assert!(rx.recv().await.is_none()); } #[test] @@ -149,26 +146,20 @@ fn buffer_gteq_one() { mpsc::channel::(0); } -#[test] -fn send_recv_unbounded() { - let mut t1 = task::spawn(()); - +#[tokio::test] +async fn send_recv_unbounded() { let (tx, mut rx) = mpsc::unbounded_channel::(); // Using `try_send` assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some(1)); - - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some(2)); + assert_eq!(rx.recv().await, Some(1)); + assert_eq!(rx.recv().await, Some(2)); drop(tx); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_none()); + assert!(rx.recv().await.is_none()); } #[tokio::test] @@ -201,11 +192,10 @@ async fn send_recv_stream_unbounded() { assert_eq!(None, rx.next().await); } -#[test] -fn no_t_bounds_buffer() { +#[tokio::test] +async fn no_t_bounds_buffer() { struct NoImpls; - let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug @@ -215,15 +205,13 @@ fn no_t_bounds_buffer() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_some()); + assert!(rx.recv().await.is_some()); } -#[test] -fn no_t_bounds_unbounded() { +#[tokio::test] +async fn no_t_bounds_unbounded() { struct NoImpls; - let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug @@ -233,133 +221,87 @@ fn no_t_bounds_unbounded() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_some()); + assert!(rx.recv().await.is_some()); } -#[test] -fn send_recv_buffer_limited() { - let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - - let (mut tx, mut rx) = mpsc::channel::(1); - - // Run on a task context - t1.enter(|cx, _| { - assert_ready_ok!(tx.poll_ready(cx)); - - // Send first message - assert_ok!(tx.try_send(1)); +#[tokio::test] +async fn send_recv_buffer_limited() { + let (tx, mut rx) = mpsc::channel::(1); - // Not ready - assert_pending!(tx.poll_ready(cx)); + // Reserve capacity + let p1 = assert_ok!(tx.reserve().await); - // Send second message - assert_err!(tx.try_send(1337)); - }); + // Send first message + p1.send(1); - t2.enter(|cx, _| { - // Take the value - let val = assert_ready!(rx.poll_recv(cx)); - assert_eq!(Some(1), val); - }); + // Not ready + let mut p2 = task::spawn(tx.reserve()); + assert_pending!(p2.poll()); - assert!(t1.is_woken()); + // Take the value + assert!(rx.recv().await.is_some()); - t1.enter(|cx, _| { - assert_ready_ok!(tx.poll_ready(cx)); + // Notified + assert!(p2.is_woken()); - assert_ok!(tx.try_send(2)); + // Trying to send fails + assert_err!(tx.try_send(1337)); - // Not ready - assert_pending!(tx.poll_ready(cx)); - }); + // Send second + let permit = assert_ready_ok!(p2.poll()); + permit.send(2); - t2.enter(|cx, _| { - // Take the value - let val = assert_ready!(rx.poll_recv(cx)); - assert_eq!(Some(2), val); - }); - - t1.enter(|cx, _| { - assert_ready_ok!(tx.poll_ready(cx)); - }); + assert!(rx.recv().await.is_some()); } -#[test] -fn recv_close_gets_none_idle() { - let mut t1 = task::spawn(()); - - let (mut tx, mut rx) = mpsc::channel::(10); +#[tokio::test] +async fn recv_close_gets_none_idle() { + let (tx, mut rx) = mpsc::channel::(10); rx.close(); - t1.enter(|cx, _| { - let val = assert_ready!(rx.poll_recv(cx)); - assert!(val.is_none()); - assert_ready_err!(tx.poll_ready(cx)); - }); -} - -#[test] -fn recv_close_gets_none_reserved() { - let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - let mut t3 = task::spawn(()); + assert!(rx.recv().await.is_none()); - let (mut tx1, mut rx) = mpsc::channel::(1); - let mut tx2 = tx1.clone(); + assert_err!(tx.send(1).await); +} - assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); +#[tokio::test] +async fn recv_close_gets_none_reserved() { + let (tx1, mut rx) = mpsc::channel::(1); + let tx2 = tx1.clone(); - t2.enter(|cx, _| { - assert_pending!(tx2.poll_ready(cx)); - }); + let permit1 = assert_ok!(tx1.reserve().await); + let mut permit2 = task::spawn(tx2.reserve()); + assert_pending!(permit2.poll()); rx.close(); - assert!(t2.is_woken()); - - t2.enter(|cx, _| { - assert_ready_err!(tx2.poll_ready(cx)); - }); - - t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx))); + assert!(permit2.is_woken()); + assert_ready_err!(permit2.poll()); - assert!(!t1.is_woken()); - assert!(!t2.is_woken()); - - assert_ok!(tx1.try_send(123)); + { + let mut recv = task::spawn(rx.recv()); + assert_pending!(recv.poll()); - assert!(t3.is_woken()); + permit1.send(123); + assert!(recv.is_woken()); - t3.enter(|cx, _| { - let v = assert_ready!(rx.poll_recv(cx)); + let v = assert_ready!(recv.poll()); assert_eq!(v, Some(123)); + } - let v = assert_ready!(rx.poll_recv(cx)); - assert!(v.is_none()); - }); + assert!(rx.recv().await.is_none()); } -#[test] -fn tx_close_gets_none() { - let mut t1 = task::spawn(()); - +#[tokio::test] +async fn tx_close_gets_none() { let (_, mut rx) = mpsc::channel::(10); - - // Run on a task context - t1.enter(|cx, _| { - let v = assert_ready!(rx.poll_recv(cx)); - assert!(v.is_none()); - }); + assert!(rx.recv().await.is_none()); } -#[test] -fn try_send_fail() { - let mut t1 = task::spawn(()); - - let (mut tx, mut rx) = mpsc::channel(1); +#[tokio::test] +async fn try_send_fail() { + let (tx, mut rx) = mpsc::channel(1); tx.try_send("hello").unwrap(); @@ -369,60 +311,48 @@ fn try_send_fail() { _ => panic!(), } - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some("hello")); + assert_eq!(rx.recv().await, Some("hello")); assert_ok!(tx.try_send("goodbye")); drop(tx); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some("goodbye")); - - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_none()); + assert_eq!(rx.recv().await, Some("goodbye")); + assert!(rx.recv().await.is_none()); } -#[test] -fn drop_tx_with_permit_releases_permit() { - let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - +#[tokio::test] +async fn drop_permit_releases_permit() { // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. - let (mut tx1, _rx) = mpsc::channel::(1); - let mut tx2 = tx1.clone(); - - assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); + let (tx1, _rx) = mpsc::channel::(1); + let tx2 = tx1.clone(); - t2.enter(|cx, _| { - assert_pending!(tx2.poll_ready(cx)); - }); + let permit = assert_ok!(tx1.reserve().await); - drop(tx1); + let mut reserve2 = task::spawn(tx2.reserve()); + assert_pending!(reserve2.poll()); - assert!(t2.is_woken()); + drop(permit); - assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx))); + assert!(reserve2.is_woken()); + assert_ready_ok!(reserve2.poll()); } -#[test] -fn dropping_rx_closes_channel() { - let mut t1 = task::spawn(()); - - let (mut tx, rx) = mpsc::channel(100); +#[tokio::test] +async fn dropping_rx_closes_channel() { + let (tx, rx) = mpsc::channel(100); let msg = Arc::new(()); assert_ok!(tx.try_send(msg.clone())); drop(rx); - assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx))); - + assert_err!(tx.reserve().await); assert_eq!(1, Arc::strong_count(&msg)); } #[test] fn dropping_rx_closes_channel_for_try() { - let (mut tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel(100); let msg = Arc::new(()); tx.try_send(msg.clone()).unwrap(); @@ -444,7 +374,7 @@ fn dropping_rx_closes_channel_for_try() { fn unconsumed_messages_are_dropped() { let msg = Arc::new(()); - let (mut tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel(100); tx.try_send(msg.clone()).unwrap(); @@ -457,7 +387,7 @@ fn unconsumed_messages_are_dropped() { #[test] fn try_recv() { - let (mut tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::channel(1); match rx.try_recv() { Err(TryRecvError::Empty) => {} _ => panic!(), @@ -495,7 +425,7 @@ fn try_recv_unbounded() { #[test] fn blocking_recv() { - let (mut tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); let sync_code = thread::spawn(move || { assert_eq!(Some(10), rx.blocking_recv()); @@ -516,7 +446,7 @@ async fn blocking_recv_async() { #[test] fn blocking_send() { - let (mut tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); let sync_code = thread::spawn(move || { tx.blocking_send(10).unwrap(); @@ -531,28 +461,25 @@ fn blocking_send() { #[tokio::test] #[should_panic] async fn blocking_send_async() { - let (mut tx, _rx) = mpsc::channel::<()>(1); + let (tx, _rx) = mpsc::channel::<()>(1); let _ = tx.blocking_send(()); } -#[test] -fn ready_close_cancel_bounded() { - use futures::future::poll_fn; - - let (mut tx, mut rx) = mpsc::channel::<()>(100); +#[tokio::test] +async fn ready_close_cancel_bounded() { + let (tx, mut rx) = mpsc::channel::<()>(100); let _tx2 = tx.clone(); - { - let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await }); - assert_ready_ok!(ready.poll()); - } + let permit = assert_ok!(tx.reserve().await); rx.close(); - let mut recv = task::spawn(async { rx.recv().await }); + let mut recv = task::spawn(rx.recv()); assert_pending!(recv.poll()); - drop(tx); + drop(permit); assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert!(val.is_none()); } -- cgit v1.2.3