summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r--tokio/tests/sync_mpsc.rs113
1 files changed, 42 insertions, 71 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index f724c564..040904e4 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -38,47 +39,33 @@ fn send_recv_with_buffer() {
}
#[tokio::test]
-async fn async_send_recv_with_buffer() {
- let (mut tx, mut rx) = mpsc::channel(16);
+async fn send_recv_stream_with_buffer() {
+ use futures::StreamExt;
+
+ let (mut tx, mut rx) = mpsc::channel::<i32>(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
assert_ok!(tx.send(2).await);
});
- assert_eq!(Some(1), rx.recv().await);
- assert_eq!(Some(2), rx.recv().await);
- assert_eq!(None, rx.recv().await);
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
}
-#[test]
-fn send_sink_recv_with_buffer() {
- use futures_core::Stream;
- use futures_sink::Sink;
-
- let (tx, rx) = mpsc::channel::<i32>(16);
-
- task::spawn(tx).enter(|cx, mut tx| {
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(1));
-
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(2));
+#[tokio::test]
+async fn async_send_recv_with_buffer() {
+ let (mut tx, mut rx) = mpsc::channel(16);
- assert_ready_ok!(tx.as_mut().poll_flush(cx));
- assert_ready_ok!(tx.as_mut().poll_close(cx));
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1).await);
+ assert_ok!(tx.send(2).await);
});
- task::spawn(rx).enter(|cx, mut rx| {
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(2));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert!(val.is_none());
- });
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
}
#[test]
@@ -124,11 +111,11 @@ fn buffer_gteq_one() {
fn send_recv_unbounded() {
let mut t1 = task::spawn(());
- let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
- assert_ok!(tx.try_send(1));
- assert_ok!(tx.try_send(2));
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
@@ -144,11 +131,11 @@ fn send_recv_unbounded() {
#[tokio::test]
async fn async_send_recv_unbounded() {
- let (mut tx, mut rx) = mpsc::unbounded_channel();
+ let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
- assert_ok!(tx.try_send(1));
- assert_ok!(tx.try_send(2));
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
});
assert_eq!(Some(1), rx.recv().await);
@@ -156,41 +143,20 @@ async fn async_send_recv_unbounded() {
assert_eq!(None, rx.recv().await);
}
-#[test]
-fn sink_send_recv_unbounded() {
- use futures_core::Stream;
- use futures_sink::Sink;
- use futures_util::pin_mut;
-
- let mut t1 = task::spawn(());
-
- let (tx, rx) = mpsc::unbounded_channel::<i32>();
-
- t1.enter(|cx, _| {
- pin_mut!(tx);
-
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(1));
+#[tokio::test]
+async fn send_recv_stream_unbounded() {
+ use futures::StreamExt;
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(2));
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
- assert_ready_ok!(tx.as_mut().poll_flush(cx));
- assert_ready_ok!(tx.as_mut().poll_close(cx));
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
});
- t1.enter(|cx, _| {
- pin_mut!(rx);
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(2));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert!(val.is_none());
- });
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
}
#[test]
@@ -223,7 +189,7 @@ fn no_t_bounds_unbounded() {
// same with Receiver
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
- assert!(tx.clone().try_send(NoImpls).is_ok());
+ assert!(tx.clone().send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
@@ -356,8 +322,10 @@ fn try_send_fail() {
tx.try_send("hello").unwrap();
// This should fail
- let err = assert_err!(tx.try_send("fail"));
- assert!(err.is_full());
+ match assert_err!(tx.try_send("fail")) {
+ TrySendError::Full(..) => {}
+ _ => panic!(),
+ }
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
@@ -421,7 +389,10 @@ fn dropping_rx_closes_channel_for_try() {
{
let err = assert_err!(tx.try_send(msg.clone()));
- assert!(err.is_closed());
+ match err {
+ TrySendError::Closed(..) => {}
+ _ => panic!(),
+ }
}
assert_eq!(1, Arc::strong_count(&msg));