summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:38 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:38 -0700
commitcf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch)
tree39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/tests
parent4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff)
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/rt_threaded.rs10
-rw-r--r--tokio/tests/sync_mpsc.rs363
2 files changed, 150 insertions, 223 deletions
diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs
index a67c090e..2c7cfb80 100644
--- a/tokio/tests/rt_threaded.rs
+++ b/tokio/tests/rt_threaded.rs
@@ -70,7 +70,7 @@ fn many_multishot_futures() {
let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
for _ in 0..CHAIN {
- let (mut next_tx, next_rx) = tokio::sync::mpsc::channel(10);
+ let (next_tx, next_rx) = tokio::sync::mpsc::channel(10);
// Forward all the messages
rt.spawn(async move {
@@ -83,8 +83,8 @@ fn many_multishot_futures() {
}
// This final task cycles if needed
- let (mut final_tx, final_rx) = tokio::sync::mpsc::channel(10);
- let mut cycle_tx = start_tx.clone();
+ let (final_tx, final_rx) = tokio::sync::mpsc::channel(10);
+ let cycle_tx = start_tx.clone();
let mut rem = CYCLES;
rt.spawn(async move {
@@ -107,7 +107,7 @@ fn many_multishot_futures() {
{
rt.block_on(async move {
- for mut start_tx in start_txs {
+ for start_tx in start_txs {
start_tx.send("ping").await.unwrap();
}
@@ -340,7 +340,7 @@ fn coop_and_block_in_place() {
.unwrap();
rt.block_on(async move {
- let (mut tx, mut rx) = tokio::sync::mpsc::channel(1024);
+ let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
// Fill the channel
for _ in 0..1024 {
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index 919bddbf..adefcb12 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -17,74 +17,72 @@ trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
-#[test]
-fn send_recv_with_buffer() {
- let (tx, rx) = mpsc::channel::<i32>(16);
- let mut tx = task::spawn(tx);
- let mut rx = task::spawn(rx);
+#[tokio::test]
+async fn send_recv_with_buffer() {
+ let (tx, mut rx) = mpsc::channel::<i32>(16);
// Using poll_ready / try_send
- assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
- tx.try_send(1).unwrap();
+ // let permit assert_ready_ok!(tx.reserve());
+ let permit = tx.reserve().await.unwrap();
+ permit.send(1);
// Without poll_ready
tx.try_send(2).unwrap();
drop(tx);
- let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
+ let val = rx.recv().await;
assert_eq!(val, Some(1));
- let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
+ let val = rx.recv().await;
assert_eq!(val, Some(2));
- let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
+ let val = rx.recv().await;
assert!(val.is_none());
}
-#[test]
-fn disarm() {
- let (tx, rx) = mpsc::channel::<i32>(2);
- let mut tx1 = task::spawn(tx.clone());
- let mut tx2 = task::spawn(tx.clone());
- let mut tx3 = task::spawn(tx.clone());
- let mut tx4 = task::spawn(tx);
- let mut rx = task::spawn(rx);
+#[tokio::test]
+async fn reserve_disarm() {
+ let (tx, mut rx) = mpsc::channel::<i32>(2);
+ let tx1 = tx.clone();
+ let tx2 = tx.clone();
+ let tx3 = tx.clone();
+ let tx4 = tx;
// We should be able to `poll_ready` two handles without problem
- assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
- assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));
+ let permit1 = assert_ok!(tx1.reserve().await);
+ let permit2 = assert_ok!(tx2.reserve().await);
// But a third should not be ready
- assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+ let mut r3 = task::spawn(tx3.reserve());
+ assert_pending!(r3.poll());
+
+ let mut r4 = task::spawn(tx4.reserve());
+ assert_pending!(r4.poll());
// Using one of the reserved slots should allow a new handle to become ready
- tx1.try_send(1).unwrap();
+ permit1.send(1);
+
// We also need to receive for the slot to be free
- let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
+ assert!(!r3.is_woken());
+ rx.recv().await.unwrap();
// Now there's a free slot!
- assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
- assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert!(r3.is_woken());
+ assert!(!r4.is_woken());
- // Dropping a ready handle should also open up a slot
- drop(tx2);
- assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
- assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
-
- // Explicitly disarming a handle should also open a slot
- assert!(tx3.disarm());
- assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+ // Dropping a permit should also open up a slot
+ drop(permit2);
+ assert!(r4.is_woken());
- // Disarming a non-armed sender does not free up a slot
- assert!(!tx3.disarm());
- assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+ let mut r1 = task::spawn(tx1.reserve());
+ assert_pending!(r1.poll());
}
#[tokio::test]
async fn send_recv_stream_with_buffer() {
use tokio::stream::StreamExt;
- let (mut tx, mut rx) = mpsc::channel::<i32>(16);
+ let (tx, mut rx) = mpsc::channel::<i32>(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
@@ -98,7 +96,7 @@ async fn send_recv_stream_with_buffer() {
#[tokio::test]
async fn async_send_recv_with_buffer() {
- let (mut tx, mut rx) = mpsc::channel(16);
+ let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
@@ -110,37 +108,36 @@ async fn async_send_recv_with_buffer() {
assert_eq!(None, rx.recv().await);
}
-#[test]
-fn start_send_past_cap() {
+#[tokio::test]
+async fn start_send_past_cap() {
+ use std::future::Future;
+
let mut t1 = task::spawn(());
- let mut t2 = task::spawn(());
- let mut t3 = task::spawn(());
- let (mut tx1, mut rx) = mpsc::channel(1);
- let mut tx2 = tx1.clone();
+ let (tx1, mut rx) = mpsc::channel(1);
+ let tx2 = tx1.clone();
assert_ok!(tx1.try_send(()));
- t1.enter(|cx, _| {
- assert_pending!(tx1.poll_ready(cx));
- });
+ let mut r1 = Box::pin(tx1.reserve());
+ t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
- t2.enter(|cx, _| {
- assert_pending!(tx2.poll_ready(cx));
- });
+ {
+ let mut r2 = task::spawn(tx2.reserve());
+ assert_pending!(r2.poll());
- drop(tx1);
+ drop(r1);
- let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
- assert!(val.is_some());
+ assert!(rx.recv().await.is_some());
- assert!(t2.is_woken());
- assert!(!t1.is_woken());
+ assert!(r2.is_woken());
+ assert!(!t1.is_woken());
+ }
+ drop(tx1);
drop(tx2);
- let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
- assert!(val.is_none());
+ assert!(rx.recv().await.is_none());
}
#[test]
@@ -149,26 +146,20 @@ fn buffer_gteq_one() {
mpsc::channel::<i32>(0);
}
-#[test]
-fn send_recv_unbounded() {
- let mut t1 = task::spawn(());
-
+#[tokio::test]
+async fn send_recv_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert_eq!(val, Some(2));
+ assert_eq!(rx.recv().await, Some(1));
+ assert_eq!(rx.recv().await, Some(2));
drop(tx);
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert!(val.is_none());
+ assert!(rx.recv().await.is_none());
}
#[tokio::test]
@@ -201,11 +192,10 @@ async fn send_recv_stream_unbounded() {
assert_eq!(None, rx.next().await);
}
-#[test]
-fn no_t_bounds_buffer() {
+#[tokio::test]
+async fn no_t_bounds_buffer() {
struct NoImpls;
- let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::channel(100);
// sender should be Debug even though T isn't Debug
@@ -215,15 +205,13 @@ fn no_t_bounds_buffer() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert!(val.is_some());
+ assert!(rx.recv().await.is_some());
}
-#[test]
-fn no_t_bounds_unbounded() {
+#[tokio::test]
+async fn no_t_bounds_unbounded() {
struct NoImpls;
- let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::unbounded_channel();
// sender should be Debug even though T isn't Debug
@@ -233,133 +221,87 @@ fn no_t_bounds_unbounded() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().send(NoImpls).is_ok());
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert!(val.is_some());
+ assert!(rx.recv().await.is_some());
}
-#[test]
-fn send_recv_buffer_limited() {
- let mut t1 = task::spawn(());
- let mut t2 = task::spawn(());
-
- let (mut tx, mut rx) = mpsc::channel::<i32>(1);
-
- // Run on a task context
- t1.enter(|cx, _| {
- assert_ready_ok!(tx.poll_ready(cx));
-
- // Send first message
- assert_ok!(tx.try_send(1));
+#[tokio::test]
+async fn send_recv_buffer_limited() {
+ let (tx, mut rx) = mpsc::channel::<i32>(1);
- // Not ready
- assert_pending!(tx.poll_ready(cx));
+ // Reserve capacity
+ let p1 = assert_ok!(tx.reserve().await);
- // Send second message
- assert_err!(tx.try_send(1337));
- });
+ // Send first message
+ p1.send(1);
- t2.enter(|cx, _| {
- // Take the value
- let val = assert_ready!(rx.poll_recv(cx));
- assert_eq!(Some(1), val);
- });
+ // Not ready
+ let mut p2 = task::spawn(tx.reserve());
+ assert_pending!(p2.poll());
- assert!(t1.is_woken());
+ // Take the value
+ assert!(rx.recv().await.is_some());
- t1.enter(|cx, _| {
- assert_ready_ok!(tx.poll_ready(cx));
+ // Notified
+ assert!(p2.is_woken());
- assert_ok!(tx.try_send(2));
+ // Trying to send fails
+ assert_err!(tx.try_send(1337));
- // Not ready
- assert_pending!(tx.poll_ready(cx));
- });
+ // Send second
+ let permit = assert_ready_ok!(p2.poll());
+ permit.send(2);
- t2.enter(|cx, _| {
- // Take the value
- let val = assert_ready!(rx.poll_recv(cx));
- assert_eq!(Some(2), val);
- });
-
- t1.enter(|cx, _| {
- assert_ready_ok!(tx.poll_ready(cx));
- });
+ assert!(rx.recv().await.is_some());
}
-#[test]
-fn recv_close_gets_none_idle() {
- let mut t1 = task::spawn(());
-
- let (mut tx, mut rx) = mpsc::channel::<i32>(10);
+#[tokio::test]
+async fn recv_close_gets_none_idle() {
+ let (tx, mut rx) = mpsc::channel::<i32>(10);
rx.close();
- t1.enter(|cx, _| {
- let val = assert_ready!(rx.poll_recv(cx));
- assert!(val.is_none());
- assert_ready_err!(tx.poll_ready(cx));
- });
-}
-
-#[test]
-fn recv_close_gets_none_reserved() {
- let mut t1 = task::spawn(());
- let mut t2 = task::spawn(());
- let mut t3 = task::spawn(());
+ assert!(rx.recv().await.is_none());
- let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
- let mut tx2 = tx1.clone();
+ assert_err!(tx.send(1).await);
+}
- assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
+#[tokio::test]
+async fn recv_close_gets_none_reserved() {
+ let (tx1, mut rx) = mpsc::channel::<i32>(1);
+ let tx2 = tx1.clone();
- t2.enter(|cx, _| {
- assert_pending!(tx2.poll_ready(cx));
- });
+ let permit1 = assert_ok!(tx1.reserve().await);
+ let mut permit2 = task::spawn(tx2.reserve());
+ assert_pending!(permit2.poll());
rx.close();
- assert!(t2.is_woken());
-
- t2.enter(|cx, _| {
- assert_ready_err!(tx2.poll_ready(cx));
- });
-
- t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
+ assert!(permit2.is_woken());
+ assert_ready_err!(permit2.poll());
- assert!(!t1.is_woken());
- assert!(!t2.is_woken());
-
- assert_ok!(tx1.try_send(123));
+ {
+ let mut recv = task::spawn(rx.recv());
+ assert_pending!(recv.poll());
- assert!(t3.is_woken());
+ permit1.send(123);
+ assert!(recv.is_woken());
- t3.enter(|cx, _| {
- let v = assert_ready!(rx.poll_recv(cx));
+ let v = assert_ready!(recv.poll());
assert_eq!(v, Some(123));
+ }
- let v = assert_ready!(rx.poll_recv(cx));
- assert!(v.is_none());
- });
+ assert!(rx.recv().await.is_none());
}
-#[test]
-fn tx_close_gets_none() {
- let mut t1 = task::spawn(());
-
+#[tokio::test]
+async fn tx_close_gets_none() {
let (_, mut rx) = mpsc::channel::<i32>(10);
-
- // Run on a task context
- t1.enter(|cx, _| {
- let v = assert_ready!(rx.poll_recv(cx));
- assert!(v.is_none());
- });
+ assert!(rx.recv().await.is_none());
}
-#[test]
-fn try_send_fail() {
- let mut t1 = task::spawn(());
-
- let (mut tx, mut rx) = mpsc::channel(1);
+#[tokio::test]
+async fn try_send_fail() {
+ let (tx, mut rx) = mpsc::channel(1);
tx.try_send("hello").unwrap();
@@ -369,60 +311,48 @@ fn try_send_fail() {
_ => panic!(),
}
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert_eq!(val, Some("hello"));
+ assert_eq!(rx.recv().await, Some("hello"));
assert_ok!(tx.try_send("goodbye"));
drop(tx);
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert_eq!(val, Some("goodbye"));
-
- let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
- assert!(val.is_none());
+ assert_eq!(rx.recv().await, Some("goodbye"));
+ assert!(rx.recv().await.is_none());
}
-#[test]
-fn drop_tx_with_permit_releases_permit() {
- let mut t1 = task::spawn(());
- let mut t2 = task::spawn(());
-
+#[tokio::test]
+async fn drop_permit_releases_permit() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
- let (mut tx1, _rx) = mpsc::channel::<i32>(1);
- let mut tx2 = tx1.clone();
-
- assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
+ let (tx1, _rx) = mpsc::channel::<i32>(1);
+ let tx2 = tx1.clone();
- t2.enter(|cx, _| {
- assert_pending!(tx2.poll_ready(cx));
- });
+ let permit = assert_ok!(tx1.reserve().await);
- drop(tx1);
+ let mut reserve2 = task::spawn(tx2.reserve());
+ assert_pending!(reserve2.poll());
- assert!(t2.is_woken());
+ drop(permit);
- assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
+ assert!(reserve2.is_woken());
+ assert_ready_ok!(reserve2.poll());
}
-#[test]
-fn dropping_rx_closes_channel() {
- let mut t1 = task::spawn(());
-
- let (mut tx, rx) = mpsc::channel(100);
+#[tokio::test]
+async fn dropping_rx_closes_channel() {
+ let (tx, rx) = mpsc::channel(100);
let msg = Arc::new(());
assert_ok!(tx.try_send(msg.clone()));
drop(rx);
- assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
-
+ assert_err!(tx.reserve().await);
assert_eq!(1, Arc::strong_count(&msg));
}
#[test]
fn dropping_rx_closes_channel_for_try() {
- let (mut tx, rx) = mpsc::channel(100);
+ let (tx, rx) = mpsc::channel(100);
let msg = Arc::new(());
tx.try_send(msg.clone()).unwrap();
@@ -444,7 +374,7 @@ fn dropping_rx_closes_channel_for_try() {
fn unconsumed_messages_are_dropped() {
let msg = Arc::new(());
- let (mut tx, rx) = mpsc::channel(100);
+ let (tx, rx) = mpsc::channel(100);
tx.try_send(msg.clone()).unwrap();
@@ -457,7 +387,7 @@ fn unconsumed_messages_are_dropped() {
#[test]
fn try_recv() {
- let (mut tx, mut rx) = mpsc::channel(1);
+ let (tx, mut rx) = mpsc::channel(1);
match rx.try_recv() {
Err(TryRecvError::Empty) => {}
_ => panic!(),
@@ -495,7 +425,7 @@ fn try_recv_unbounded() {
#[test]
fn blocking_recv() {
- let (mut tx, mut rx) = mpsc::channel::<u8>(1);
+ let (tx, mut rx) = mpsc::channel::<u8>(1);
let sync_code = thread::spawn(move || {
assert_eq!(Some(10), rx.blocking_recv());
@@ -516,7 +446,7 @@ async fn blocking_recv_async() {
#[test]
fn blocking_send() {
- let (mut tx, mut rx) = mpsc::channel::<u8>(1);
+ let (tx, mut rx) = mpsc::channel::<u8>(1);
let sync_code = thread::spawn(move || {
tx.blocking_send(10).unwrap();
@@ -531,28 +461,25 @@ fn blocking_send() {
#[tokio::test]
#[should_panic]
async fn blocking_send_async() {
- let (mut tx, _rx) = mpsc::channel::<()>(1);
+ let (tx, _rx) = mpsc::channel::<()>(1);
let _ = tx.blocking_send(());
}
-#[test]
-fn ready_close_cancel_bounded() {
- use futures::future::poll_fn;
-
- let (mut tx, mut rx) = mpsc::channel::<()>(100);
+#[tokio::test]
+async fn ready_close_cancel_bounded() {
+ let (tx, mut rx) = mpsc::channel::<()>(100);
let _tx2 = tx.clone();
- {
- let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await });
- assert_ready_ok!(ready.poll());
- }
+ let permit = assert_ok!(tx.reserve().await);
rx.close();
- let mut recv = task::spawn(async { rx.recv().await });
+ let mut recv = task::spawn(rx.recv());
assert_pending!(recv.poll());
- drop(tx);
+ drop(permit);
assert!(recv.is_woken());
+ let val = assert_ready!(recv.poll());
+ assert!(val.is_none());
}