diff options
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 113 |
1 files changed, 42 insertions, 71 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f724c564..040904e4 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -38,47 +39,33 @@ fn send_recv_with_buffer() { } #[tokio::test] -async fn async_send_recv_with_buffer() { - let (mut tx, mut rx) = mpsc::channel(16); +async fn send_recv_stream_with_buffer() { + use futures::StreamExt; + + let (mut tx, mut rx) = mpsc::channel::<i32>(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); assert_ok!(tx.send(2).await); }); - assert_eq!(Some(1), rx.recv().await); - assert_eq!(Some(2), rx.recv().await); - assert_eq!(None, rx.recv().await); + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); } -#[test] -fn send_sink_recv_with_buffer() { - use futures_core::Stream; - use futures_sink::Sink; - - let (tx, rx) = mpsc::channel::<i32>(16); - - task::spawn(tx).enter(|cx, mut tx| { - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(1)); - - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(2)); +#[tokio::test] +async fn async_send_recv_with_buffer() { + let (mut tx, mut rx) = mpsc::channel(16); - assert_ready_ok!(tx.as_mut().poll_flush(cx)); - assert_ready_ok!(tx.as_mut().poll_close(cx)); + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); }); - task::spawn(rx).enter(|cx, mut rx| { - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(1)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(2)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert!(val.is_none()); - }); + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); } #[test] @@ -124,11 +111,11 @@ fn buffer_gteq_one() { fn send_recv_unbounded() { let mut t1 = task::spawn(()); - let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>(); + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); // Using `try_send` - assert_ok!(tx.try_send(1)); - assert_ok!(tx.try_send(2)); + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(1)); @@ -144,11 +131,11 @@ fn send_recv_unbounded() { #[tokio::test] async fn async_send_recv_unbounded() { - let (mut tx, mut rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { - assert_ok!(tx.try_send(1)); - assert_ok!(tx.try_send(2)); + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); }); assert_eq!(Some(1), rx.recv().await); @@ -156,41 +143,20 @@ async fn async_send_recv_unbounded() { assert_eq!(None, rx.recv().await); } -#[test] -fn sink_send_recv_unbounded() { - use futures_core::Stream; - use futures_sink::Sink; - use futures_util::pin_mut; - - let mut t1 = task::spawn(()); - - let (tx, rx) = mpsc::unbounded_channel::<i32>(); - - t1.enter(|cx, _| { - pin_mut!(tx); - - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(1)); +#[tokio::test] +async fn send_recv_stream_unbounded() { + use futures::StreamExt; - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(2)); + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); - assert_ready_ok!(tx.as_mut().poll_flush(cx)); - assert_ready_ok!(tx.as_mut().poll_close(cx)); + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); }); - t1.enter(|cx, _| { - pin_mut!(rx); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(1)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(2)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert!(val.is_none()); - }); + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); } #[test] @@ -223,7 +189,7 @@ fn no_t_bounds_unbounded() { // same with Receiver println!("{:?}", rx); // and sender should be Clone even though T isn't Clone - assert!(tx.clone().try_send(NoImpls).is_ok()); + assert!(tx.clone().send(NoImpls).is_ok()); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); @@ -356,8 +322,10 @@ fn try_send_fail() { tx.try_send("hello").unwrap(); // This should fail - let err = assert_err!(tx.try_send("fail")); - assert!(err.is_full()); + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("hello")); @@ -421,7 +389,10 @@ fn dropping_rx_closes_channel_for_try() { { let err = assert_err!(tx.try_send(msg.clone())); - assert!(err.is_closed()); + match err { + TrySendError::Closed(..) => {} + _ => panic!(), + } } assert_eq!(1, Arc::strong_count(&msg)); |