diff options
-rw-r--r-- | tokio-test/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio-test/src/macros.rs | 55 | ||||
-rw-r--r-- | tokio-test/src/task.rs | 137 | ||||
-rw-r--r-- | tokio-util/tests/framed_read.rs | 42 | ||||
-rw-r--r-- | tokio-util/tests/framed_write.rs | 11 | ||||
-rw-r--r-- | tokio-util/tests/length_delimited.rs | 34 | ||||
-rw-r--r-- | tokio/tests/io_async_read.rs | 34 | ||||
-rw-r--r-- | tokio/tests/sync_atomic_waker.rs | 16 | ||||
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 124 | ||||
-rw-r--r-- | tokio/tests/sync_oneshot.rs | 113 | ||||
-rw-r--r-- | tokio/tests/sync_semaphore.rs | 85 | ||||
-rw-r--r-- | tokio/tests/timer_delay.rs | 221 | ||||
-rw-r--r-- | tokio/tests/timer_interval.rs | 32 | ||||
-rw-r--r-- | tokio/tests/timer_queue.rs | 152 | ||||
-rw-r--r-- | tokio/tests/timer_throttle.rs | 32 | ||||
-rw-r--r-- | tokio/tests/timer_timeout.rs | 78 |
16 files changed, 500 insertions, 667 deletions
diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 95dc5d7f..6d7ebc34 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -24,7 +24,6 @@ tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } bytes = "0.4" futures-core-preview = "=0.3.0-alpha.19" -pin-convert = "0.1.0" [dev-dependencies] futures-util-preview = "=0.3.0-alpha.19" diff --git a/tokio-test/src/macros.rs b/tokio-test/src/macros.rs index b39b4a02..dbe2280f 100644 --- a/tokio-test/src/macros.rs +++ b/tokio-test/src/macros.rs @@ -13,16 +13,11 @@ /// # Examples /// /// ``` -/// use std::future::Future; -/// use futures_util::{future, pin_mut}; +/// use futures_util::future; /// use tokio_test::{assert_ready, task}; /// -/// task::mock(|cx| { -/// let fut = future::ready(()); -/// -/// pin_mut!(fut); -/// assert_ready!(fut.poll(cx)); -/// }) +/// let mut fut = task::spawn(future::ready(())); +/// assert_ready!(fut.poll()); /// ``` #[macro_export] macro_rules! assert_ready { @@ -57,16 +52,11 @@ macro_rules! assert_ready { /// # Examples /// /// ``` -/// use std::future::Future; -/// use futures_util::{future, pin_mut}; +/// use futures_util::future; /// use tokio_test::{assert_ready_ok, task}; /// -/// task::mock(|cx| { -/// let fut = future::ok::<_, ()>(()); -/// -/// pin_mut!(fut); -/// assert_ready_ok!(fut.poll(cx)); -/// }) +/// let mut fut = task::spawn(future::ok::<_, ()>(())); +/// assert_ready_ok!(fut.poll()); /// ``` #[macro_export] macro_rules! assert_ready_ok { @@ -95,16 +85,11 @@ macro_rules! assert_ready_ok { /// # Examples /// /// ``` -/// use std::future::Future; -/// use futures_util::{future, pin_mut}; +/// use futures_util::future; /// use tokio_test::{assert_ready_err, task}; /// -/// task::mock(|cx| { -/// let fut = future::err::<(), _>(()); -/// -/// pin_mut!(fut); -/// assert_ready_err!(fut.poll(cx)); -/// }) +/// let mut fut = task::spawn(future::err::<(), _>(())); +/// assert_ready_err!(fut.poll()); /// ``` #[macro_export] macro_rules! assert_ready_err { @@ -133,16 +118,11 @@ macro_rules! assert_ready_err { /// # Examples /// /// ``` -/// use std::future::Future; -/// use futures_util::{future, pin_mut}; +/// use futures_util::future; /// use tokio_test::{assert_pending, task}; /// -/// task::mock(|cx| { -/// let fut = future::pending::<()>(); -/// -/// pin_mut!(fut); -/// assert_pending!(fut.poll(cx)); -/// }) +/// let mut fut = task::spawn(future::pending::<()>()); +/// assert_pending!(fut.poll()); /// ``` #[macro_export] macro_rules! assert_pending { @@ -177,16 +157,11 @@ macro_rules! assert_pending { /// # Examples /// /// ``` -/// use std::future::Future; -/// use futures_util::{future, pin_mut}; +/// use futures_util::future; /// use tokio_test::{assert_ready_eq, task}; /// -/// task::mock(|cx| { -/// let fut = future::ready(42); -/// -/// pin_mut!(fut); -/// assert_ready_eq!(fut.poll(cx), 42); -/// }) +/// let mut fut = task::spawn(future::ready(42)); +/// assert_ready_eq!(fut.poll(), 42); /// ``` #[macro_export] macro_rules! assert_ready_eq { diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index 5468749a..c21e31a5 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -1,44 +1,19 @@ //! Futures task based helpers -use tokio::executor::enter; - -use pin_convert::AsPinMut; +use futures_core::Stream; use std::future::Future; use std::mem; +use std::ops; use std::pin::Pin; use std::sync::{Arc, Condvar, Mutex}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -/// Run the provided closure in a `MockTask` context. -/// -/// # Examples -/// -/// ``` -/// use std::future::Future; -/// use futures_util::{future, pin_mut}; -/// use tokio_test::task; -/// -/// task::mock(|cx| { -/// let fut = future::ready(()); -/// -/// pin_mut!(fut); -/// assert!(fut.poll(cx).is_ready()); -/// }) -/// ``` -pub fn mock<F, R>(f: F) -> R -where - F: Fn(&mut Context<'_>) -> R, -{ - let mut task = MockTask::new(); - task.enter(|cx| f(cx)) -} - -/// Mock task -/// -/// A mock task is able to intercept and track wake notifications. -#[derive(Debug, Clone)] -pub struct MockTask { - waker: Arc<ThreadWaker>, +/// TOOD: dox +pub fn spawn<T>(task: T) -> Spawn<T> { + Spawn { + task: MockTask::new(), + future: Box::pin(task), + } } /// Future spawned on a mock task @@ -48,12 +23,12 @@ pub struct Spawn<T> { future: Pin<Box<T>>, } -/// TOOD: dox -pub fn spawn<T>(task: T) -> Spawn<T> { - Spawn { - task: MockTask::new(), - future: Box::pin(task), - } +/// Mock task +/// +/// A mock task is able to intercept and track wake notifications. +#[derive(Debug, Clone)] +struct MockTask { + waker: Arc<ThreadWaker>, } #[derive(Debug)] @@ -66,11 +41,23 @@ const IDLE: usize = 0; const WAKE: usize = 1; const SLEEP: usize = 2; -impl<T: Future> Spawn<T> { - /// Poll a future - pub fn poll(&mut self) -> Poll<T::Output> { - let fut = self.future.as_mut(); - self.task.enter(|cx| fut.poll(cx)) +impl<T> Spawn<T> { + /// Consume `self` returning the inner value + pub fn into_inner(mut self) -> T + where + T: Unpin, + { + drop(self.task); + + // Pin::into_inner is unstable, so we work around it + // + // Safety: `T` is bound by `Unpin`. + unsafe { + let ptr = Pin::get_mut(self.future.as_mut()) as *mut T; + let future = Box::from_raw(ptr); + mem::forget(self.future); + *future + } } /// Returns `true` if the inner future has received a wake notification @@ -85,35 +72,63 @@ impl<T: Future> Spawn<T> { pub fn waker_ref_count(&self) -> usize { self.task.waker_ref_count() } + + /// Enter the task context + pub fn enter<F, R>(&mut self, f: F) -> R + where + F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R, + { + let fut = self.future.as_mut(); + self.task.enter(|cx| f(cx, fut)) + } +} + +impl<T: Unpin> ops::Deref for Spawn<T> { + type Target = T; + + fn deref(&self) -> &T { + &self.future + } +} + +impl<T: Unpin> ops::DerefMut for Spawn<T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.future + } +} + +impl<T: Future> Spawn<T> { + /// Poll a future + pub fn poll(&mut self) -> Poll<T::Output> { + let fut = self.future.as_mut(); + self.task.enter(|cx| fut.poll(cx)) + } +} + +impl<T: Stream> Spawn<T> { + /// Poll a stream + pub fn poll_next(&mut self) -> Poll<Option<T::Item>> { + let stream = self.future.as_mut(); + self.task.enter(|cx| stream.poll_next(cx)) + } } impl MockTask { /// Create a new mock task - pub fn new() -> Self { + fn new() -> Self { MockTask { waker: Arc::new(ThreadWaker::new()), } } - /// Poll a future - pub fn poll<T, F>(&mut self, mut fut: T) -> Poll<F::Output> - where - T: AsPinMut<F>, - F: Future, - { - self.enter(|cx| fut.as_pin_mut().poll(cx)) - } - /// Run a closure from the context of the task. /// /// Any wake notifications resulting from the execution of the closure are /// tracked. - pub fn enter<F, R>(&mut self, f: F) -> R + fn enter<F, R>(&mut self, f: F) -> R where F: FnOnce(&mut Context<'_>) -> R, { - let _enter = enter().unwrap(); - self.waker.clear(); let waker = self.waker(); let mut cx = Context::from_waker(&waker); @@ -123,14 +138,14 @@ impl MockTask { /// Returns `true` if the inner future has received a wake notification /// since the last call to `enter`. - pub fn is_woken(&self) -> bool { + fn is_woken(&self) -> bool { self.waker.is_woken() } /// Returns the number of references to the task waker /// /// The task itself holds a reference. The return value will never be zero. - pub fn waker_ref_count(&self) -> usize { + fn waker_ref_count(&self) -> usize { Arc::strong_count(&self.waker) } @@ -193,7 +208,7 @@ impl ThreadWaker { } } -static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop); +static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker); unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker { RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE) @@ -225,6 +240,6 @@ unsafe fn wake_by_ref(raw: *const ()) { mem::forget(waker); } -unsafe fn drop(raw: *const ()) { +unsafe fn drop_waker(raw: *const ()) { let _ = from_raw(raw); } diff --git a/tokio-util/tests/framed_read.rs b/tokio-util/tests/framed_read.rs index 2064b9fb..6636b880 100644 --- a/tokio-util/tests/framed_read.rs +++ b/tokio-util/tests/framed_read.rs @@ -2,7 +2,7 @@ use tokio::prelude::*; use tokio_test::assert_ready; -use tokio_test::task::MockTask; +use tokio_test::task; use tokio_util::codec::{Decoder, FramedRead}; use bytes::{Buf, BytesMut, IntoBuf}; @@ -51,13 +51,13 @@ impl Decoder for U32Decoder { #[test] fn read_multi_frame_in_packet() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); @@ -67,7 +67,7 @@ fn read_multi_frame_in_packet() { #[test] fn read_multi_frame_across_packets() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00\x00\x00".to_vec()), Ok(b"\x00\x00\x00\x01".to_vec()), @@ -75,7 +75,7 @@ fn read_multi_frame_across_packets() { }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); @@ -85,7 +85,7 @@ fn read_multi_frame_across_packets() { #[test] fn read_not_ready() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Ok(b"\x00\x00\x00\x00".to_vec()), @@ -93,7 +93,7 @@ fn read_not_ready() { }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert!(pin!(framed).poll_next(cx).is_pending()); assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); @@ -103,7 +103,7 @@ fn read_not_ready() { #[test] fn read_partial_then_not_ready() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::WouldBlock, "")), @@ -111,7 +111,7 @@ fn read_partial_then_not_ready() { }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert!(pin!(framed).poll_next(cx).is_pending()); assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); @@ -122,13 +122,13 @@ fn read_partial_then_not_ready() { #[test] fn read_err() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert_eq!( io::ErrorKind::Other, assert_ready!(pin!(framed).poll_next(cx)) @@ -141,14 +141,14 @@ fn read_err() { #[test] fn read_partial_then_err() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert_eq!( io::ErrorKind::Other, assert_ready!(pin!(framed).poll_next(cx)) @@ -161,7 +161,7 @@ fn read_partial_then_err() { #[test] fn read_partial_would_block_then_err() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::WouldBlock, "")), @@ -169,7 +169,7 @@ fn read_partial_would_block_then_err() { }; let mut framed = FramedRead::new(mock, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert!(pin!(framed).poll_next(cx).is_pending()); assert_eq!( io::ErrorKind::Other, @@ -183,11 +183,11 @@ fn read_partial_would_block_then_err() { #[test] fn huge_size() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let data = [0; 32 * 1024]; let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder); - task.enter(|cx| { + task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); @@ -210,11 +210,11 @@ fn huge_size() { #[test] fn data_remaining_is_error() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let slice = Slice(&[0; 5]); let mut framed = FramedRead::new(slice, U32Decoder); - task.enter(|cx| { + task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err()); }); @@ -222,7 +222,7 @@ fn data_remaining_is_error() { #[test] fn multi_frames_on_eof() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); struct MyDecoder(Vec<u32>); impl Decoder for MyDecoder { @@ -244,7 +244,7 @@ fn multi_frames_on_eof() { let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); - task.enter(|cx| { + task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs index 03f0f08b..9d760766 100644 --- a/tokio-util/tests/framed_write.rs +++ b/tokio-util/tests/framed_write.rs @@ -1,8 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::io::AsyncWrite; -use tokio_test::assert_ready; -use tokio_test::task::MockTask; +use tokio_test::{assert_ready, task}; use tokio_util::codec::{Encoder, FramedWrite}; use bytes::{BufMut, BytesMut}; @@ -43,13 +42,13 @@ impl Encoder for U32Encoder { #[test] fn write_multi_frame_in_packet() { - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedWrite::new(mock, U32Encoder); - task.enter(|cx| { + task.enter(|cx, _| { assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok()); assert!(pin!(framed).start_send(0).is_ok()); assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok()); @@ -99,9 +98,9 @@ fn write_hits_backpressure() { // 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer assert_eq!(mock.calls.len(), 6); - let mut task = MockTask::new(); + let mut task = task::spawn(()); let mut framed = FramedWrite::new(mock, U32Encoder); - task.enter(|cx| { + task.enter(|cx, _| { // Send 8KB. This fills up FramedWrite2 buffer for i in 0..ITER { assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok()); diff --git a/tokio-util/tests/length_delimited.rs b/tokio-util/tests/length_delimited.rs index c78dfac8..b287bb35 100644 --- a/tokio-util/tests/length_delimited.rs +++ b/tokio-util/tests/length_delimited.rs @@ -2,7 +2,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::prelude::*; -use tokio_test::task::MockTask; +use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; @@ -26,7 +26,7 @@ macro_rules! mock { macro_rules! assert_next_eq { ($io:ident, $expect:expr) => {{ - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { let res = assert_ready!($io.as_mut().poll_next(cx)); match res { Some(Ok(v)) => assert_eq!(v, $expect.as_ref()), @@ -39,7 +39,7 @@ macro_rules! assert_next_eq { macro_rules! assert_next_pending { ($io:ident) => {{ - MockTask::new().enter(|cx| match $io.as_mut().poll_next(cx) { + task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { Ready(Some(Ok(v))) => panic!("value = {:?}", v), Ready(Some(Err(e))) => panic!("error = {:?}", e), Ready(None) => panic!("done"), @@ -50,7 +50,7 @@ macro_rules! assert_next_pending { macro_rules! assert_next_err { ($io:ident) => {{ - MockTask::new().enter(|cx| match $io.as_mut().poll_next(cx) { + task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { Ready(Some(Ok(v))) => panic!("value = {:?}", v), Ready(Some(Err(_))) => {} Ready(None) => panic!("done"), @@ -61,7 +61,7 @@ macro_rules! assert_next_err { macro_rules! assert_done { ($io:ident) => {{ - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { let res = assert_ready!($io.as_mut().poll_next(cx)); match res { Some(Ok(v)) => panic!("value = {:?}", v), @@ -405,7 +405,7 @@ fn write_single_frame_length_adjusted() { }); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); assert_ready_ok!(io.as_mut().poll_flush(cx)); @@ -418,7 +418,7 @@ fn write_nothing_yields_nothing() { let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new()); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.poll_flush(cx)); }); } @@ -435,7 +435,7 @@ fn write_single_frame_one_packet() { ); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); assert_ready_ok!(io.as_mut().poll_flush(cx)); @@ -459,7 +459,7 @@ fn write_single_multi_frame_one_packet() { ); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); @@ -492,7 +492,7 @@ fn write_single_multi_frame_multi_packet() { ); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); @@ -526,7 +526,7 @@ fn write_single_frame_would_block() { ); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); @@ -549,7 +549,7 @@ fn write_single_frame_little_endian() { }); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); @@ -569,7 +569,7 @@ fn write_single_frame_with_short_length_field() { }); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi"))); @@ -586,7 +586,7 @@ fn write_max_frame_len() { .new_write(mock! {}); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_err!(io.as_mut().start_send(Bytes::from("abcdef"))); @@ -603,7 +603,7 @@ fn write_update_max_frame_len_at_rest() { }); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdef"))); @@ -628,7 +628,7 @@ fn write_update_max_frame_len_in_flight() { }); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdef"))); @@ -648,7 +648,7 @@ fn write_zero() { let io = length_delimited::Builder::new().new_write(mock! {}); pin_mut!(io); - MockTask::new().enter(|cx| { + task::spawn(()).enter(|cx, _| { assert_ready_ok!(io.as_mut().poll_ready(cx)); assert_ok!(io.as_mut().start_send(Bytes::from("abcdef"))); diff --git a/tokio/tests/io_async_read.rs b/tokio/tests/io_async_read.rs index b34fcba3..007e5403 100644 --- a/tokio/tests/io_async_read.rs +++ b/tokio/tests/io_async_read.rs @@ -1,9 +1,8 @@ use tokio::io::AsyncRead; -use tokio_test::task::MockTask; +use tokio_test::task; use tokio_test::{assert_ready_err, assert_ready_ok}; use bytes::{BufMut, BytesMut}; -use futures_util::pin_mut; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -30,12 +29,8 @@ fn read_buf_success() { } let mut buf = BytesMut::with_capacity(65); - let mut task = MockTask::new(); - - task.enter(|cx| { - let rd = Rd; - pin_mut!(rd); + task::spawn(Rd).enter(|cx, rd| { let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); assert_eq!(11, n); @@ -59,12 +54,8 @@ fn read_buf_error() { } let mut buf = BytesMut::with_capacity(65); - let mut task = MockTask::new(); - - task.enter(|cx| { - let rd = Rd; - pin_mut!(rd); + task::spawn(Rd).enter(|cx, rd| { let err = assert_ready_err!(rd.poll_read_buf(cx, &mut buf)); assert_eq!(err.kind(), io::ErrorKind::Other); }); @@ -86,14 +77,9 @@ fn read_buf_no_capacity() { // Can't create BytesMut w/ zero capacity, so fill it up let mut buf = BytesMut::with_capacity(64); - let mut task = MockTask::new(); - buf.put(&[0; 64][..]); - task.enter(|cx| { - let rd = Rd; - pin_mut!(rd); - + task::spawn(Rd).enter(|cx, rd| { let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); assert_eq!(0, n); }); @@ -118,12 +104,8 @@ fn read_buf_no_uninitialized() { } let mut buf = BytesMut::with_capacity(64); - let mut task = MockTask::new(); - - task.enter(|cx| { - let rd = Rd; - pin_mut!(rd); + task::spawn(Rd).enter(|cx, rd| { let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); assert_eq!(0, n); }); @@ -150,16 +132,12 @@ fn read_buf_uninitialized_ok() { // Can't create BytesMut w/ zero capacity, so fill it up let mut buf = BytesMut::with_capacity(64); - let mut task = MockTask::new(); unsafe { buf.bytes_mut()[0..11].copy_from_slice(b"hello world"); } - task.enter(|cx| { - let rd = Rd; - pin_mut!(rd); - + task::spawn(Rd).enter(|cx, rd| { let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); assert_eq!(0, n); }); diff --git a/tokio/tests/sync_atomic_waker.rs b/tokio/tests/sync_atomic_waker.rs index 77d6a73d..8e725526 100644 --- a/tokio/tests/sync_atomic_waker.rs +++ b/tokio/tests/sync_atomic_waker.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::sync::AtomicWaker; -use tokio_test::task::MockTask; +use tokio_test::task; use std::task::Waker; @@ -16,23 +16,21 @@ impl AssertSync for Waker {} #[test] fn basic_usage() { - let waker = AtomicWaker::new(); - let mut task = MockTask::new(); + let mut waker = task::spawn(AtomicWaker::new()); - task.enter(|cx| waker.register_by_ref(cx.waker())); + waker.enter(|cx, waker| waker.register_by_ref(cx.waker())); waker.wake(); - assert!(task.is_woken()); + assert!(waker.is_woken()); } #[test] fn wake_without_register() { - let waker = AtomicWaker::new(); + let mut waker = task::spawn(AtomicWaker::new()); waker.wake(); // Registering should not result in a notification - let mut task = MockTask::new(); - task.enter(|cx| waker.register_by_ref(cx.waker())); + waker.enter(|cx, waker| waker.register_by_ref(cx.waker())); - assert!(!task.is_woken()); + assert!(!waker.is_woken()); } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 891e2361..f724c564 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::sync::mpsc; -use tokio_test::task::MockTask; +use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; @@ -14,13 +14,12 @@ impl AssertSend for mpsc::Receiver<i32> {} #[test] fn send_recv_with_buffer() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); - - let (mut tx, mut rx) = mpsc::channel::<i32>(16); + let (tx, rx) = mpsc::channel::<i32>(16); + let mut tx = task::spawn(tx); + let mut rx = task::spawn(rx); // Using poll_ready / try_send - assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx))); + assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); tx.try_send(1).unwrap(); // Without poll_ready @@ -28,13 +27,13 @@ fn send_recv_with_buffer() { drop(tx); - let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert_eq!(val, Some(2)); - let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert!(val.is_none()); } @@ -56,15 +55,10 @@ async fn async_send_recv_with_buffer() { fn send_sink_recv_with_buffer() { use futures_core::Stream; use futures_sink::Sink; - use futures_util::pin_mut; - - let mut t1 = MockTask::new(); let (tx, rx) = mpsc::channel::<i32>(16); - t1.enter(|cx| { - pin_mut!(tx); - + task::spawn(tx).enter(|cx, mut tx| { assert_ready_ok!(tx.as_mut().poll_ready(cx)); assert_ok!(tx.as_mut().start_send(1)); @@ -75,9 +69,7 @@ fn send_sink_recv_with_buffer() { assert_ready_ok!(tx.as_mut().poll_close(cx)); }); - t1.enter(|cx| { - pin_mut!(rx); - + task::spawn(rx).enter(|cx, mut rx| { let val = assert_ready!(rx.as_mut().poll_next(cx)); assert_eq!(val, Some(1)); @@ -91,26 +83,26 @@ fn send_sink_recv_with_buffer() { #[test] fn start_send_past_cap() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); - let mut t3 = MockTask::new(); + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); let (mut tx1, mut rx) = mpsc::channel(1); let mut tx2 = tx1.clone(); assert_ok!(tx1.try_send(())); - t1.enter(|cx| { + t1.enter(|cx, _| { assert_pending!(tx1.poll_ready(cx)); }); - t2.enter(|cx| { + t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); - let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx))); + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); assert!(val.is_some()); assert!(t2.is_woken()); @@ -118,7 +110,7 @@ fn start_send_past_cap() { drop(tx2); - let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx))); + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); assert!(val.is_none()); } @@ -130,7 +122,7 @@ fn buffer_gteq_one() { #[test] fn send_recv_unbounded() { - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>(); @@ -138,15 +130,15 @@ fn send_recv_unbounded() { assert_ok!(tx.try_send(1)); assert_ok!(tx.try_send(2)); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(2)); drop(tx); - let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_none()); } @@ -170,11 +162,11 @@ fn sink_send_recv_unbounded() { use futures_sink::Sink; use futures_util::pin_mut; - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(()); let (tx, rx) = mpsc::unbounded_channel::<i32>(); - t1.enter(|cx| { + t1.enter(|cx, _| { pin_mut!(tx); assert_ready_ok!(tx.as_mut().poll_ready(cx)); @@ -187,7 +179,7 @@ fn sink_send_recv_unbounded() { assert_ready_ok!(tx.as_mut().poll_close(cx)); }); - t1.enter(|cx| { + t1.enter(|cx, _| { pin_mut!(rx); let val = assert_ready!(rx.as_mut().poll_next(cx)); @@ -205,7 +197,7 @@ fn sink_send_recv_unbounded() { fn no_t_bounds_buffer() { struct NoImpls; - let mut t1 = MockTask::new(); + let mut t1 = task::spawn(( |