summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_mpsc.rs
diff options
context:
space:
mode:
authorJon Gjengset <jon@thesquareplanet.com>2020-04-02 11:27:37 -0400
committerGitHub <noreply@github.com>2020-04-02 11:27:37 -0400
commit7fb1698e8d8fa9d4ce295b63a17c461b3a40dddd (patch)
tree4154588e7a026dedaa2f8a36858f9e91bbd53a3e /tokio/tests/sync_mpsc.rs
parentfa4fe9ef6feea7c8c88c81559797e57da7368b36 (diff)
sync: Add disarm to mpsc::Sender (#2358)
Fixes #898.
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r--tokio/tests/sync_mpsc.rs38
1 files changed, 38 insertions, 0 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index 7e5c60e2..f02d90aa 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -40,6 +40,44 @@ fn send_recv_with_buffer() {
assert!(val.is_none());
}
+#[test]
+fn disarm() {
+ let (tx, rx) = mpsc::channel::<i32>(2);
+ let mut tx1 = task::spawn(tx.clone());
+ let mut tx2 = task::spawn(tx.clone());
+ let mut tx3 = task::spawn(tx.clone());
+ let mut tx4 = task::spawn(tx);
+ let mut rx = task::spawn(rx);
+
+ // We should be able to `poll_ready` two handles without problem
+ assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // But a third should not be ready
+ assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Using one of the reserved slots should allow a new handle to become ready
+ tx1.try_send(1).unwrap();
+ // We also need to receive for the slot to be free
+ let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
+ // Now there's a free slot!
+ assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Dropping a ready handle should also open up a slot
+ drop(tx2);
+ assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Explicitly disarming a handle should also open a slot
+ assert!(tx3.disarm());
+ assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Disarming a non-armed sender does not free up a slot
+ assert!(!tx3.disarm());
+ assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+}
+
#[tokio::test]
async fn send_recv_stream_with_buffer() {
use tokio::stream::StreamExt;