summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r--tokio/tests/sync_mpsc.rs124
1 files changed, 58 insertions, 66 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index 891e2361..f724c564 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::mpsc;
-use tokio_test::task::MockTask;
+use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
@@ -14,13 +14,12 @@ impl AssertSend for mpsc::Receiver<i32> {}
#[test]
fn send_recv_with_buffer() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
-
- let (mut tx, mut rx) = mpsc::channel::<i32>(16);
+ let (tx, rx) = mpsc::channel::<i32>(16);
+ let mut tx = task::spawn(tx);
+ let mut rx = task::spawn(rx);
// Using poll_ready / try_send
- assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx)));
+ assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
tx.try_send(1).unwrap();
// Without poll_ready
@@ -28,13 +27,13 @@ fn send_recv_with_buffer() {
drop(tx);
- let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
- let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -56,15 +55,10 @@ async fn async_send_recv_with_buffer() {
fn send_sink_recv_with_buffer() {
use futures_core::Stream;
use futures_sink::Sink;
- use futures_util::pin_mut;
-
- let mut t1 = MockTask::new();
let (tx, rx) = mpsc::channel::<i32>(16);
- t1.enter(|cx| {
- pin_mut!(tx);
-
+ task::spawn(tx).enter(|cx, mut tx| {
assert_ready_ok!(tx.as_mut().poll_ready(cx));
assert_ok!(tx.as_mut().start_send(1));
@@ -75,9 +69,7 @@ fn send_sink_recv_with_buffer() {
assert_ready_ok!(tx.as_mut().poll_close(cx));
});
- t1.enter(|cx| {
- pin_mut!(rx);
-
+ task::spawn(rx).enter(|cx, mut rx| {
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));
@@ -91,26 +83,26 @@ fn send_sink_recv_with_buffer() {
#[test]
fn start_send_past_cap() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
- let mut t3 = MockTask::new();
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+ let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel(1);
let mut tx2 = tx1.clone();
assert_ok!(tx1.try_send(()));
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
assert_pending!(tx1.poll_ready(cx));
});
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
- let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
+ let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_some());
assert!(t2.is_woken());
@@ -118,7 +110,7 @@ fn start_send_past_cap() {
drop(tx2);
- let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
+ let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -130,7 +122,7 @@ fn buffer_gteq_one() {
#[test]
fn send_recv_unbounded() {
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
@@ -138,15 +130,15 @@ fn send_recv_unbounded() {
assert_ok!(tx.try_send(1));
assert_ok!(tx.try_send(2));
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
drop(tx);
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -170,11 +162,11 @@ fn sink_send_recv_unbounded() {
use futures_sink::Sink;
use futures_util::pin_mut;
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (tx, rx) = mpsc::unbounded_channel::<i32>();
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
pin_mut!(tx);
assert_ready_ok!(tx.as_mut().poll_ready(cx));
@@ -187,7 +179,7 @@ fn sink_send_recv_unbounded() {
assert_ready_ok!(tx.as_mut().poll_close(cx));
});
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
pin_mut!(rx);
let val = assert_ready!(rx.as_mut().poll_next(cx));
@@ -205,7 +197,7 @@ fn sink_send_recv_unbounded() {
fn no_t_bounds_buffer() {
struct NoImpls;
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::channel(100);
// sender should be Debug even though T isn't Debug
@@ -215,7 +207,7 @@ fn no_t_bounds_buffer() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
}
@@ -223,7 +215,7 @@ fn no_t_bounds_buffer() {
fn no_t_bounds_unbounded() {
struct NoImpls;
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::unbounded_channel();
// sender should be Debug even though T isn't Debug
@@ -233,19 +225,19 @@ fn no_t_bounds_unbounded() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
}
#[test]
fn send_recv_buffer_limited() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel::<i32>(1);
// Run on a task context
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
// Send first message
@@ -258,7 +250,7 @@ fn send_recv_buffer_limited() {
assert_err!(tx.try_send(1337));
});
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
// Take the value
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(1), val);
@@ -266,7 +258,7 @@ fn send_recv_buffer_limited() {
assert!(t1.is_woken());
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
assert_ok!(tx.try_send(2));
@@ -275,26 +267,26 @@ fn send_recv_buffer_limited() {
assert_pending!(tx.poll_ready(cx));
});
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
// Take the value
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(2), val);
});
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
});
}
#[test]
fn recv_close_gets_none_idle() {
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
rx.close();
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
let val = assert_ready!(rx.poll_recv(cx));
assert!(val.is_none());
assert_ready_err!(tx.poll_ready(cx));
@@ -303,16 +295,16 @@ fn recv_close_gets_none_idle() {
#[test]
fn recv_close_gets_none_reserved() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
- let mut t3 = MockTask::new();
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+ let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
- assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx)));
+ assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
@@ -320,11 +312,11 @@ fn recv_close_gets_none_reserved() {
assert!(t2.is_woken());
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
assert_ready_err!(tx2.poll_ready(cx));
});
- t3.enter(|cx| assert_pending!(rx.poll_recv(cx)));
+ t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
assert!(!t1.is_woken());
assert!(!t2.is_woken());
@@ -333,7 +325,7 @@ fn recv_close_gets_none_reserved() {
assert!(t3.is_woken());
- t3.enter(|cx| {
+ t3.enter(|cx, _| {
let v = assert_ready!(rx.poll_recv(cx));
assert_eq!(v, Some(123));
@@ -344,12 +336,12 @@ fn recv_close_gets_none_reserved() {
#[test]
fn tx_close_gets_none() {
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (_, mut rx) = mpsc::channel::<i32>(10);
// Run on a task context
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
@@ -357,7 +349,7 @@ fn tx_close_gets_none() {
#[test]
fn try_send_fail() {
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel(1);
@@ -367,32 +359,32 @@ fn try_send_fail() {
let err = assert_err!(tx.try_send("fail"));
assert!(err.is_full());
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
assert_ok!(tx.try_send("goodbye"));
drop(tx);
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("goodbye"));
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
#[test]
fn drop_tx_with_permit_releases_permit() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
let (mut tx1, _rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
- assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx)));
+ assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
@@ -400,12 +392,12 @@ fn drop_tx_with_permit_releases_permit() {
assert!(t2.is_woken());
- assert_ready_ok!(t2.enter(|cx| tx2.poll_ready(cx)));
+ assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
}
#[test]
fn dropping_rx_closes_channel() {
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (mut tx, rx) = mpsc::channel(100);
@@ -413,7 +405,7 @@ fn dropping_rx_closes_channel() {
assert_ok!(tx.try_send(msg.clone()));
drop(rx);
- assert_ready_err!(t1.enter(|cx| tx.poll_ready(cx)));
+ assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
assert_eq!(1, Arc::strong_count(&msg));
}