diff options
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 124 |
1 files changed, 58 insertions, 66 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 891e2361..f724c564 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::sync::mpsc; -use tokio_test::task::MockTask; +use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; @@ -14,13 +14,12 @@ impl AssertSend for mpsc::Receiver<i32> {} #[test] fn send_recv_with_buffer() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); - - let (mut tx, mut rx) = mpsc::channel::<i32>(16); + let (tx, rx) = mpsc::channel::<i32>(16); + let mut tx = task::spawn(tx); + let mut rx = task::spawn(rx); // Using poll_ready / try_send - assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx))); + assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); tx.try_send(1).unwrap(); // Without poll_ready @@ -28,13 +27,13 @@ fn send_recv_with_buffer() { drop(tx); - let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert_eq!(val, Some(2)); - let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert!(val.is_none()); } @@ -56,15 +55,10 @@ async fn async_send_recv_with_buffer() { fn send_sink_recv_with_buffer() { use futures_core::Stream; use futures_sink::Sink; - use futures_util::pin_mut; - - let mut t1 = MockTask::new(); let (tx, rx) = mpsc::channel::<i32>(16); - t1.enter(|cx| { - pin_mut!(tx); - + task::spawn(tx).enter(|cx, mut tx| { assert_ready_ok!(tx.as_mut().poll_ready(cx)); assert_ok!(tx.as_mut().start_send(1)); @@ -75,9 +69,7 @@ fn send_sink_recv_with_buffer() { assert_ready_ok!(tx.as_mut().poll_close(cx)); }); - t1.enter(|cx| { - pin_mut!(rx); - + task::spawn(rx).enter(|cx, mut rx| { let val = assert_ready!(rx.as_mut().poll_next(cx)); assert_eq!(val, Some(1)); @@ -91,26 +83,26 @@ fn send_sink_recv_with_buffer() { #[test] fn start_send_past_cap() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); - let mut t3 = MockTask::new(); + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); let (mut tx1, mut rx) = mpsc::channel(1); let mut tx2 = tx1.clone(); assert_ok!(tx1.try_send(())); - t1.enter(|cx| { + t1.enter(|cx, _| { assert_pending!(tx1.poll_ready(cx)); }); - t2.enter(|cx| { + t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); - let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx))); + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); assert!(val.is_some()); assert!(t2.is_woken()); @@ -118,7 +110,7 @@ fn start_send_past_cap() { drop(tx2); - let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx))); + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); assert!(val.is_none()); } @@ -130,7 +122,7 @@ fn buffer_gteq_one() { #[test] fn send_recv_unbounded() { - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>(); @@ -138,15 +130,15 @@ fn send_recv_unbounded() { assert_ok!(tx.try_send(1)); assert_ok!(tx.try_send(2)); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(2)); drop(tx); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_none()); } @@ -170,11 +162,11 @@ fn sink_send_recv_unbounded() { use futures_sink::Sink; use futures_util::pin_mut; - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (tx, rx) = mpsc::unbounded_channel::<i32>(); - t1.enter(|cx| { + t1.enter(|cx, _| { pin_mut!(tx); assert_ready_ok!(tx.as_mut().poll_ready(cx)); @@ -187,7 +179,7 @@ fn sink_send_recv_unbounded() { assert_ready_ok!(tx.as_mut().poll_close(cx)); }); - t1.enter(|cx| { + t1.enter(|cx, _| { pin_mut!(rx); let val = assert_ready!(rx.as_mut().poll_next(cx)); @@ -205,7 +197,7 @@ fn sink_send_recv_unbounded() { fn no_t_bounds_buffer() { struct NoImpls; - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug @@ -215,7 +207,7 @@ fn no_t_bounds_buffer() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); } @@ -223,7 +215,7 @@ fn no_t_bounds_buffer() { fn no_t_bounds_unbounded() { struct NoImpls; - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug @@ -233,19 +225,19 @@ fn no_t_bounds_unbounded() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); } #[test] fn send_recv_buffer_limited() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); let (mut tx, mut rx) = mpsc::channel::<i32>(1); // Run on a task context - t1.enter(|cx| { + t1.enter(|cx, _| { assert_ready_ok!(tx.poll_ready(cx)); // Send first message @@ -258,7 +250,7 @@ fn send_recv_buffer_limited() { assert_err!(tx.try_send(1337)); }); - t2.enter(|cx| { + t2.enter(|cx, _| { // Take the value let val = assert_ready!(rx.poll_recv(cx)); assert_eq!(Some(1), val); @@ -266,7 +258,7 @@ fn send_recv_buffer_limited() { assert!(t1.is_woken()); - t1.enter(|cx| { + t1.enter(|cx, _| { assert_ready_ok!(tx.poll_ready(cx)); assert_ok!(tx.try_send(2)); @@ -275,26 +267,26 @@ fn send_recv_buffer_limited() { assert_pending!(tx.poll_ready(cx)); }); - t2.enter(|cx| { + t2.enter(|cx, _| { // Take the value let val = assert_ready!(rx.poll_recv(cx)); assert_eq!(Some(2), val); }); - t1.enter(|cx| { + t1.enter(|cx, _| { assert_ready_ok!(tx.poll_ready(cx)); }); } #[test] fn recv_close_gets_none_idle() { - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (mut tx, mut rx) = mpsc::channel::<i32>(10); rx.close(); - t1.enter(|cx| { + t1.enter(|cx, _| { let val = assert_ready!(rx.poll_recv(cx)); assert!(val.is_none()); assert_ready_err!(tx.poll_ready(cx)); @@ -303,16 +295,16 @@ fn recv_close_gets_none_idle() { #[test] fn recv_close_gets_none_reserved() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); - let mut t3 = MockTask::new(); + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); let (mut tx1, mut rx) = mpsc::channel::<i32>(1); let mut tx2 = tx1.clone(); - assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx))); + assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); - t2.enter(|cx| { + t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); @@ -320,11 +312,11 @@ fn recv_close_gets_none_reserved() { assert!(t2.is_woken()); - t2.enter(|cx| { + t2.enter(|cx, _| { assert_ready_err!(tx2.poll_ready(cx)); }); - t3.enter(|cx| assert_pending!(rx.poll_recv(cx))); + t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx))); assert!(!t1.is_woken()); assert!(!t2.is_woken()); @@ -333,7 +325,7 @@ fn recv_close_gets_none_reserved() { assert!(t3.is_woken()); - t3.enter(|cx| { + t3.enter(|cx, _| { let v = assert_ready!(rx.poll_recv(cx)); assert_eq!(v, Some(123)); @@ -344,12 +336,12 @@ fn recv_close_gets_none_reserved() { #[test] fn tx_close_gets_none() { - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (_, mut rx) = mpsc::channel::<i32>(10); // Run on a task context - t1.enter(|cx| { + t1.enter(|cx, _| { let v = assert_ready!(rx.poll_recv(cx)); assert!(v.is_none()); }); @@ -357,7 +349,7 @@ fn tx_close_gets_none() { #[test] fn try_send_fail() { - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (mut tx, mut rx) = mpsc::channel(1); @@ -367,32 +359,32 @@ fn try_send_fail() { let err = assert_err!(tx.try_send("fail")); assert!(err.is_full()); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("hello")); assert_ok!(tx.try_send("goodbye")); drop(tx); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("goodbye")); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_none()); } #[test] fn drop_tx_with_permit_releases_permit() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. let (mut tx1, _rx) = mpsc::channel::<i32>(1); let mut tx2 = tx1.clone(); - assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx))); + assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); - t2.enter(|cx| { + t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); @@ -400,12 +392,12 @@ fn drop_tx_with_permit_releases_permit() { assert!(t2.is_woken()); - assert_ready_ok!(t2.enter(|cx| tx2.poll_ready(cx))); + assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx))); } #[test] fn dropping_rx_closes_channel() { - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (mut tx, rx) = mpsc::channel(100); @@ -413,7 +405,7 @@ fn dropping_rx_closes_channel() { assert_ok!(tx.try_send(msg.clone())); drop(rx); - assert_ready_err!(t1.enter(|cx| tx.poll_ready(cx))); + assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx))); assert_eq!(1, Arc::strong_count(&msg)); } |