summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio-test/Cargo.toml1
-rw-r--r--tokio-test/src/macros.rs55
-rw-r--r--tokio-test/src/task.rs137
-rw-r--r--tokio-util/tests/framed_read.rs42
-rw-r--r--tokio-util/tests/framed_write.rs11
-rw-r--r--tokio-util/tests/length_delimited.rs34
-rw-r--r--tokio/tests/io_async_read.rs34
-rw-r--r--tokio/tests/sync_atomic_waker.rs16
-rw-r--r--tokio/tests/sync_mpsc.rs124
-rw-r--r--tokio/tests/sync_oneshot.rs113
-rw-r--r--tokio/tests/sync_semaphore.rs85
-rw-r--r--tokio/tests/timer_delay.rs221
-rw-r--r--tokio/tests/timer_interval.rs32
-rw-r--r--tokio/tests/timer_queue.rs152
-rw-r--r--tokio/tests/timer_throttle.rs32
-rw-r--r--tokio/tests/timer_timeout.rs78
16 files changed, 500 insertions, 667 deletions
diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml
index 95dc5d7f..6d7ebc34 100644
--- a/tokio-test/Cargo.toml
+++ b/tokio-test/Cargo.toml
@@ -24,7 +24,6 @@ tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
bytes = "0.4"
futures-core-preview = "=0.3.0-alpha.19"
-pin-convert = "0.1.0"
[dev-dependencies]
futures-util-preview = "=0.3.0-alpha.19"
diff --git a/tokio-test/src/macros.rs b/tokio-test/src/macros.rs
index b39b4a02..dbe2280f 100644
--- a/tokio-test/src/macros.rs
+++ b/tokio-test/src/macros.rs
@@ -13,16 +13,11 @@
/// # Examples
///
/// ```
-/// use std::future::Future;
-/// use futures_util::{future, pin_mut};
+/// use futures_util::future;
/// use tokio_test::{assert_ready, task};
///
-/// task::mock(|cx| {
-/// let fut = future::ready(());
-///
-/// pin_mut!(fut);
-/// assert_ready!(fut.poll(cx));
-/// })
+/// let mut fut = task::spawn(future::ready(()));
+/// assert_ready!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_ready {
@@ -57,16 +52,11 @@ macro_rules! assert_ready {
/// # Examples
///
/// ```
-/// use std::future::Future;
-/// use futures_util::{future, pin_mut};
+/// use futures_util::future;
/// use tokio_test::{assert_ready_ok, task};
///
-/// task::mock(|cx| {
-/// let fut = future::ok::<_, ()>(());
-///
-/// pin_mut!(fut);
-/// assert_ready_ok!(fut.poll(cx));
-/// })
+/// let mut fut = task::spawn(future::ok::<_, ()>(()));
+/// assert_ready_ok!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_ready_ok {
@@ -95,16 +85,11 @@ macro_rules! assert_ready_ok {
/// # Examples
///
/// ```
-/// use std::future::Future;
-/// use futures_util::{future, pin_mut};
+/// use futures_util::future;
/// use tokio_test::{assert_ready_err, task};
///
-/// task::mock(|cx| {
-/// let fut = future::err::<(), _>(());
-///
-/// pin_mut!(fut);
-/// assert_ready_err!(fut.poll(cx));
-/// })
+/// let mut fut = task::spawn(future::err::<(), _>(()));
+/// assert_ready_err!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_ready_err {
@@ -133,16 +118,11 @@ macro_rules! assert_ready_err {
/// # Examples
///
/// ```
-/// use std::future::Future;
-/// use futures_util::{future, pin_mut};
+/// use futures_util::future;
/// use tokio_test::{assert_pending, task};
///
-/// task::mock(|cx| {
-/// let fut = future::pending::<()>();
-///
-/// pin_mut!(fut);
-/// assert_pending!(fut.poll(cx));
-/// })
+/// let mut fut = task::spawn(future::pending::<()>());
+/// assert_pending!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_pending {
@@ -177,16 +157,11 @@ macro_rules! assert_pending {
/// # Examples
///
/// ```
-/// use std::future::Future;
-/// use futures_util::{future, pin_mut};
+/// use futures_util::future;
/// use tokio_test::{assert_ready_eq, task};
///
-/// task::mock(|cx| {
-/// let fut = future::ready(42);
-///
-/// pin_mut!(fut);
-/// assert_ready_eq!(fut.poll(cx), 42);
-/// })
+/// let mut fut = task::spawn(future::ready(42));
+/// assert_ready_eq!(fut.poll(), 42);
/// ```
#[macro_export]
macro_rules! assert_ready_eq {
diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs
index 5468749a..c21e31a5 100644
--- a/tokio-test/src/task.rs
+++ b/tokio-test/src/task.rs
@@ -1,44 +1,19 @@
//! Futures task based helpers
-use tokio::executor::enter;
-
-use pin_convert::AsPinMut;
+use futures_core::Stream;
use std::future::Future;
use std::mem;
+use std::ops;
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
-/// Run the provided closure in a `MockTask` context.
-///
-/// # Examples
-///
-/// ```
-/// use std::future::Future;
-/// use futures_util::{future, pin_mut};
-/// use tokio_test::task;
-///
-/// task::mock(|cx| {
-/// let fut = future::ready(());
-///
-/// pin_mut!(fut);
-/// assert!(fut.poll(cx).is_ready());
-/// })
-/// ```
-pub fn mock<F, R>(f: F) -> R
-where
- F: Fn(&mut Context<'_>) -> R,
-{
- let mut task = MockTask::new();
- task.enter(|cx| f(cx))
-}
-
-/// Mock task
-///
-/// A mock task is able to intercept and track wake notifications.
-#[derive(Debug, Clone)]
-pub struct MockTask {
- waker: Arc<ThreadWaker>,
+/// TOOD: dox
+pub fn spawn<T>(task: T) -> Spawn<T> {
+ Spawn {
+ task: MockTask::new(),
+ future: Box::pin(task),
+ }
}
/// Future spawned on a mock task
@@ -48,12 +23,12 @@ pub struct Spawn<T> {
future: Pin<Box<T>>,
}
-/// TOOD: dox
-pub fn spawn<T>(task: T) -> Spawn<T> {
- Spawn {
- task: MockTask::new(),
- future: Box::pin(task),
- }
+/// Mock task
+///
+/// A mock task is able to intercept and track wake notifications.
+#[derive(Debug, Clone)]
+struct MockTask {
+ waker: Arc<ThreadWaker>,
}
#[derive(Debug)]
@@ -66,11 +41,23 @@ const IDLE: usize = 0;
const WAKE: usize = 1;
const SLEEP: usize = 2;
-impl<T: Future> Spawn<T> {
- /// Poll a future
- pub fn poll(&mut self) -> Poll<T::Output> {
- let fut = self.future.as_mut();
- self.task.enter(|cx| fut.poll(cx))
+impl<T> Spawn<T> {
+ /// Consume `self` returning the inner value
+ pub fn into_inner(mut self) -> T
+ where
+ T: Unpin,
+ {
+ drop(self.task);
+
+ // Pin::into_inner is unstable, so we work around it
+ //
+ // Safety: `T` is bound by `Unpin`.
+ unsafe {
+ let ptr = Pin::get_mut(self.future.as_mut()) as *mut T;
+ let future = Box::from_raw(ptr);
+ mem::forget(self.future);
+ *future
+ }
}
/// Returns `true` if the inner future has received a wake notification
@@ -85,35 +72,63 @@ impl<T: Future> Spawn<T> {
pub fn waker_ref_count(&self) -> usize {
self.task.waker_ref_count()
}
+
+ /// Enter the task context
+ pub fn enter<F, R>(&mut self, f: F) -> R
+ where
+ F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R,
+ {
+ let fut = self.future.as_mut();
+ self.task.enter(|cx| f(cx, fut))
+ }
+}
+
+impl<T: Unpin> ops::Deref for Spawn<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ &self.future
+ }
+}
+
+impl<T: Unpin> ops::DerefMut for Spawn<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ &mut self.future
+ }
+}
+
+impl<T: Future> Spawn<T> {
+ /// Poll a future
+ pub fn poll(&mut self) -> Poll<T::Output> {
+ let fut = self.future.as_mut();
+ self.task.enter(|cx| fut.poll(cx))
+ }
+}
+
+impl<T: Stream> Spawn<T> {
+ /// Poll a stream
+ pub fn poll_next(&mut self) -> Poll<Option<T::Item>> {
+ let stream = self.future.as_mut();
+ self.task.enter(|cx| stream.poll_next(cx))
+ }
}
impl MockTask {
/// Create a new mock task
- pub fn new() -> Self {
+ fn new() -> Self {
MockTask {
waker: Arc::new(ThreadWaker::new()),
}
}
- /// Poll a future
- pub fn poll<T, F>(&mut self, mut fut: T) -> Poll<F::Output>
- where
- T: AsPinMut<F>,
- F: Future,
- {
- self.enter(|cx| fut.as_pin_mut().poll(cx))
- }
-
/// Run a closure from the context of the task.
///
/// Any wake notifications resulting from the execution of the closure are
/// tracked.
- pub fn enter<F, R>(&mut self, f: F) -> R
+ fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Context<'_>) -> R,
{
- let _enter = enter().unwrap();
-
self.waker.clear();
let waker = self.waker();
let mut cx = Context::from_waker(&waker);
@@ -123,14 +138,14 @@ impl MockTask {
/// Returns `true` if the inner future has received a wake notification
/// since the last call to `enter`.
- pub fn is_woken(&self) -> bool {
+ fn is_woken(&self) -> bool {
self.waker.is_woken()
}
/// Returns the number of references to the task waker
///
/// The task itself holds a reference. The return value will never be zero.
- pub fn waker_ref_count(&self) -> usize {
+ fn waker_ref_count(&self) -> usize {
Arc::strong_count(&self.waker)
}
@@ -193,7 +208,7 @@ impl ThreadWaker {
}
}
-static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
+static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker);
unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker {
RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE)
@@ -225,6 +240,6 @@ unsafe fn wake_by_ref(raw: *const ()) {
mem::forget(waker);
}
-unsafe fn drop(raw: *const ()) {
+unsafe fn drop_waker(raw: *const ()) {
let _ = from_raw(raw);
}
diff --git a/tokio-util/tests/framed_read.rs b/tokio-util/tests/framed_read.rs
index 2064b9fb..6636b880 100644
--- a/tokio-util/tests/framed_read.rs
+++ b/tokio-util/tests/framed_read.rs
@@ -2,7 +2,7 @@
use tokio::prelude::*;
use tokio_test::assert_ready;
-use tokio_test::task::MockTask;
+use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};
use bytes::{Buf, BytesMut, IntoBuf};
@@ -51,13 +51,13 @@ impl Decoder for U32Decoder {
#[test]
fn read_multi_frame_in_packet() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
@@ -67,7 +67,7 @@ fn read_multi_frame_in_packet() {
#[test]
fn read_multi_frame_across_packets() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x00".to_vec()),
Ok(b"\x00\x00\x00\x01".to_vec()),
@@ -75,7 +75,7 @@ fn read_multi_frame_across_packets() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
@@ -85,7 +85,7 @@ fn read_multi_frame_across_packets() {
#[test]
fn read_not_ready() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
Ok(b"\x00\x00\x00\x00".to_vec()),
@@ -93,7 +93,7 @@ fn read_not_ready() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
@@ -103,7 +103,7 @@ fn read_not_ready() {
#[test]
fn read_partial_then_not_ready() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
@@ -111,7 +111,7 @@ fn read_partial_then_not_ready() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
@@ -122,13 +122,13 @@ fn read_partial_then_not_ready() {
#[test]
fn read_err() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
@@ -141,14 +141,14 @@ fn read_err() {
#[test]
fn read_partial_then_err() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
@@ -161,7 +161,7 @@ fn read_partial_then_err() {
#[test]
fn read_partial_would_block_then_err() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
@@ -169,7 +169,7 @@ fn read_partial_would_block_then_err() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_eq!(
io::ErrorKind::Other,
@@ -183,11 +183,11 @@ fn read_partial_would_block_then_err() {
#[test]
fn huge_size() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let data = [0; 32 * 1024];
let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
@@ -210,11 +210,11 @@ fn huge_size() {
#[test]
fn data_remaining_is_error() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let slice = Slice(&[0; 5]);
let mut framed = FramedRead::new(slice, U32Decoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
});
@@ -222,7 +222,7 @@ fn data_remaining_is_error() {
#[test]
fn multi_frames_on_eof() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
struct MyDecoder(Vec<u32>);
impl Decoder for MyDecoder {
@@ -244,7 +244,7 @@ fn multi_frames_on_eof() {
let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs
index 03f0f08b..9d760766 100644
--- a/tokio-util/tests/framed_write.rs
+++ b/tokio-util/tests/framed_write.rs
@@ -1,8 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::io::AsyncWrite;
-use tokio_test::assert_ready;
-use tokio_test::task::MockTask;
+use tokio_test::{assert_ready, task};
use tokio_util::codec::{Encoder, FramedWrite};
use bytes::{BufMut, BytesMut};
@@ -43,13 +42,13 @@ impl Encoder for U32Encoder {
#[test]
fn write_multi_frame_in_packet() {
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedWrite::new(mock, U32Encoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
assert!(pin!(framed).start_send(0).is_ok());
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
@@ -99,9 +98,9 @@ fn write_hits_backpressure() {
// 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer
assert_eq!(mock.calls.len(), 6);
- let mut task = MockTask::new();
+ let mut task = task::spawn(());
let mut framed = FramedWrite::new(mock, U32Encoder);
- task.enter(|cx| {
+ task.enter(|cx, _| {
// Send 8KB. This fills up FramedWrite2 buffer
for i in 0..ITER {
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
diff --git a/tokio-util/tests/length_delimited.rs b/tokio-util/tests/length_delimited.rs
index c78dfac8..b287bb35 100644
--- a/tokio-util/tests/length_delimited.rs
+++ b/tokio-util/tests/length_delimited.rs
@@ -2,7 +2,7 @@
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::*;
-use tokio_test::task::MockTask;
+use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
@@ -26,7 +26,7 @@ macro_rules! mock {
macro_rules! assert_next_eq {
($io:ident, $expect:expr) => {{
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
let res = assert_ready!($io.as_mut().poll_next(cx));
match res {
Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
@@ -39,7 +39,7 @@ macro_rules! assert_next_eq {
macro_rules! assert_next_pending {
($io:ident) => {{
- MockTask::new().enter(|cx| match $io.as_mut().poll_next(cx) {
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(e))) => panic!("error = {:?}", e),
Ready(None) => panic!("done"),
@@ -50,7 +50,7 @@ macro_rules! assert_next_pending {
macro_rules! assert_next_err {
($io:ident) => {{
- MockTask::new().enter(|cx| match $io.as_mut().poll_next(cx) {
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(_))) => {}
Ready(None) => panic!("done"),
@@ -61,7 +61,7 @@ macro_rules! assert_next_err {
macro_rules! assert_done {
($io:ident) => {{
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
let res = assert_ready!($io.as_mut().poll_next(cx));
match res {
Some(Ok(v)) => panic!("value = {:?}", v),
@@ -405,7 +405,7 @@ fn write_single_frame_length_adjusted() {
});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
assert_ready_ok!(io.as_mut().poll_flush(cx));
@@ -418,7 +418,7 @@ fn write_nothing_yields_nothing() {
let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.poll_flush(cx));
});
}
@@ -435,7 +435,7 @@ fn write_single_frame_one_packet() {
);
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
assert_ready_ok!(io.as_mut().poll_flush(cx));
@@ -459,7 +459,7 @@ fn write_single_multi_frame_one_packet() {
);
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@@ -492,7 +492,7 @@ fn write_single_multi_frame_multi_packet() {
);
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@@ -526,7 +526,7 @@ fn write_single_frame_would_block() {
);
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@@ -549,7 +549,7 @@ fn write_single_frame_little_endian() {
});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@@ -569,7 +569,7 @@ fn write_single_frame_with_short_length_field() {
});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@@ -586,7 +586,7 @@ fn write_max_frame_len() {
.new_write(mock! {});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
@@ -603,7 +603,7 @@ fn write_update_max_frame_len_at_rest() {
});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
@@ -628,7 +628,7 @@ fn write_update_max_frame_len_in_flight() {
});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
@@ -648,7 +648,7 @@ fn write_zero() {
let io = length_delimited::Builder::new().new_write(mock! {});
pin_mut!(io);
- MockTask::new().enter(|cx| {
+ task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
diff --git a/tokio/tests/io_async_read.rs b/tokio/tests/io_async_read.rs
index b34fcba3..007e5403 100644
--- a/tokio/tests/io_async_read.rs
+++ b/tokio/tests/io_async_read.rs
@@ -1,9 +1,8 @@
use tokio::io::AsyncRead;
-use tokio_test::task::MockTask;
+use tokio_test::task;
use tokio_test::{assert_ready_err, assert_ready_ok};
use bytes::{BufMut, BytesMut};
-use futures_util::pin_mut;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -30,12 +29,8 @@ fn read_buf_success() {
}
let mut buf = BytesMut::with_capacity(65);
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let rd = Rd;
- pin_mut!(rd);
+ task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(11, n);
@@ -59,12 +54,8 @@ fn read_buf_error() {
}
let mut buf = BytesMut::with_capacity(65);
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let rd = Rd;
- pin_mut!(rd);
+ task::spawn(Rd).enter(|cx, rd| {
let err = assert_ready_err!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(err.kind(), io::ErrorKind::Other);
});
@@ -86,14 +77,9 @@ fn read_buf_no_capacity() {
// Can't create BytesMut w/ zero capacity, so fill it up
let mut buf = BytesMut::with_capacity(64);
- let mut task = MockTask::new();
-
buf.put(&[0; 64][..]);
- task.enter(|cx| {
- let rd = Rd;
- pin_mut!(rd);
-
+ task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});
@@ -118,12 +104,8 @@ fn read_buf_no_uninitialized() {
}
let mut buf = BytesMut::with_capacity(64);
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let rd = Rd;
- pin_mut!(rd);
+ task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});
@@ -150,16 +132,12 @@ fn read_buf_uninitialized_ok() {
// Can't create BytesMut w/ zero capacity, so fill it up
let mut buf = BytesMut::with_capacity(64);
- let mut task = MockTask::new();
unsafe {
buf.bytes_mut()[0..11].copy_from_slice(b"hello world");
}
- task.enter(|cx| {
- let rd = Rd;
- pin_mut!(rd);
-
+ task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});
diff --git a/tokio/tests/sync_atomic_waker.rs b/tokio/tests/sync_atomic_waker.rs
index 77d6a73d..8e725526 100644
--- a/tokio/tests/sync_atomic_waker.rs
+++ b/tokio/tests/sync_atomic_waker.rs
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::AtomicWaker;
-use tokio_test::task::MockTask;
+use tokio_test::task;
use std::task::Waker;
@@ -16,23 +16,21 @@ impl AssertSync for Waker {}
#[test]
fn basic_usage() {
- let waker = AtomicWaker::new();
- let mut task = MockTask::new();
+ let mut waker = task::spawn(AtomicWaker::new());
- task.enter(|cx| waker.register_by_ref(cx.waker()));
+ waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
waker.wake();
- assert!(task.is_woken());
+ assert!(waker.is_woken());
}
#[test]
fn wake_without_register() {
- let waker = AtomicWaker::new();
+ let mut waker = task::spawn(AtomicWaker::new());
waker.wake();
// Registering should not result in a notification
- let mut task = MockTask::new();
- task.enter(|cx| waker.register_by_ref(cx.waker()));
+ waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
- assert!(!task.is_woken());
+ assert!(!waker.is_woken());
}
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index 891e2361..f724c564 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::mpsc;
-use tokio_test::task::MockTask;
+use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
@@ -14,13 +14,12 @@ impl AssertSend for mpsc::Receiver<i32> {}
#[test]
fn send_recv_with_buffer() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
-
- let (mut tx, mut rx) = mpsc::channel::<i32>(16);
+ let (tx, rx) = mpsc::channel::<i32>(16);
+ let mut tx = task::spawn(tx);
+ let mut rx = task::spawn(rx);
// Using poll_ready / try_send
- assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx)));
+ assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
tx.try_send(1).unwrap();
// Without poll_ready
@@ -28,13 +27,13 @@ fn send_recv_with_buffer() {
drop(tx);
- let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
- let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -56,15 +55,10 @@ async fn async_send_recv_with_buffer() {
fn send_sink_recv_with_buffer() {
use futures_core::Stream;
use futures_sink::Sink;
- use futures_util::pin_mut;
-
- let mut t1 = MockTask::new();
let (tx, rx) = mpsc::channel::<i32>(16);
- t1.enter(|cx| {
- pin_mut!(tx);
-
+ task::spawn(tx).enter(|cx, mut tx| {
assert_ready_ok!(tx.as_mut().poll_ready(cx));
assert_ok!(tx.as_mut().start_send(1));
@@ -75,9 +69,7 @@ fn send_sink_recv_with_buffer() {
assert_ready_ok!(tx.as_mut().poll_close(cx));
});
- t1.enter(|cx| {
- pin_mut!(rx);
-
+ task::spawn(rx).enter(|cx, mut rx| {
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));
@@ -91,26 +83,26 @@ fn send_sink_recv_with_buffer() {
#[test]
fn start_send_past_cap() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
- let mut t3 = MockTask::new();
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+ let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel(1);
let mut tx2 = tx1.clone();
assert_ok!(tx1.try_send(()));
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
assert_pending!(tx1.poll_ready(cx));
});
- t2.enter(|cx| {
+ t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
- let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
+ let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_some());
assert!(t2.is_woken());
@@ -118,7 +110,7 @@ fn start_send_past_cap() {
drop(tx2);
- let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
+ let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -130,7 +122,7 @@ fn buffer_gteq_one() {
#[test]
fn send_recv_unbounded() {
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
@@ -138,15 +130,15 @@ fn send_recv_unbounded() {
assert_ok!(tx.try_send(1));
assert_ok!(tx.try_send(2));
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
drop(tx);
- let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -170,11 +162,11 @@ fn sink_send_recv_unbounded() {
use futures_sink::Sink;
use futures_util::pin_mut;
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn(());
let (tx, rx) = mpsc::unbounded_channel::<i32>();
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
pin_mut!(tx);
assert_ready_ok!(tx.as_mut().poll_ready(cx));
@@ -187,7 +179,7 @@ fn sink_send_recv_unbounded() {
assert_ready_ok!(tx.as_mut().poll_close(cx));
});
- t1.enter(|cx| {
+ t1.enter(|cx, _| {
pin_mut!(rx);
let val = assert_ready!(rx.as_mut().poll_next(cx));
@@ -205,7 +197,7 @@ fn sink_send_recv_unbounded() {
fn no_t_bounds_buffer() {
struct NoImpls;
- let mut t1 = MockTask::new();
+ let mut t1 = task::spawn((