diff options
author | Jon Gjengset <jon@thesquareplanet.com> | 2020-04-02 11:27:37 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-02 11:27:37 -0400 |
commit | 7fb1698e8d8fa9d4ce295b63a17c461b3a40dddd (patch) | |
tree | 4154588e7a026dedaa2f8a36858f9e91bbd53a3e /tokio/tests/sync_mpsc.rs | |
parent | fa4fe9ef6feea7c8c88c81559797e57da7368b36 (diff) |
sync: Add disarm to mpsc::Sender (#2358)
Fixes #898.
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 7e5c60e2..f02d90aa 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -40,6 +40,44 @@ fn send_recv_with_buffer() { assert!(val.is_none()); } +#[test] +fn disarm() { + let (tx, rx) = mpsc::channel::<i32>(2); + let mut tx1 = task::spawn(tx.clone()); + let mut tx2 = task::spawn(tx.clone()); + let mut tx3 = task::spawn(tx.clone()); + let mut tx4 = task::spawn(tx); + let mut rx = task::spawn(rx); + + // We should be able to `poll_ready` two handles without problem + assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx))); + + // But a third should not be ready + assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Using one of the reserved slots should allow a new handle to become ready + tx1.try_send(1).unwrap(); + // We also need to receive for the slot to be free + let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap(); + // Now there's a free slot! + assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Dropping a ready handle should also open up a slot + drop(tx2); + assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Explicitly disarming a handle should also open a slot + assert!(tx3.disarm()); + assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Disarming a non-armed sender does not free up a slot + assert!(!tx3.disarm()); + assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); +} + #[tokio::test] async fn send_recv_stream_with_buffer() { use tokio::stream::StreamExt; |