diff options
author | Carl Lerche <me@carllerche.com> | 2019-06-24 12:34:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-24 12:34:30 -0700 |
commit | 06c473e62842d257ed275497ce906710ea3f8e19 (patch) | |
tree | 4ca6d337a892aa23266a761b35dc61e988e57954 /tokio-test | |
parent | aa99950b9c983b842bd2107bb771c277d09d495d (diff) |
Update Tokio to use `std::future`. (#1120)
A first pass at updating Tokio to use `std::future`.
Implementations of `Future` from the futures crate are updated to implement
`Future` from std. Implementations of `Stream` are moved to a feature flag.
This commits disables a number of crates that have not yet been updated.
Diffstat (limited to 'tokio-test')
-rw-r--r-- | tokio-test/Cargo.toml | 5 | ||||
-rw-r--r-- | tokio-test/src/lib.rs | 6 | ||||
-rw-r--r-- | tokio-test/src/macros.rs | 76 | ||||
-rw-r--r-- | tokio-test/src/task.rs | 145 |
4 files changed, 155 insertions, 77 deletions
diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 52b0113e..3b090a28 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -22,6 +22,7 @@ categories = ["asynchronous", "testing"] publish = false [dependencies] -futures = "0.1" -tokio-timer = { version = "0.3.0", path = "../tokio-timer" } +assertive = { git = "http://github.com/carllerche/assertive" } +pin-convert = "0.1.0" +# tokio-timer = { version = "0.3.0", path = "../tokio-timer" } tokio-executor = { version = "0.2.0", path = "../tokio-executor" } diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 256759ea..eca6762a 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -20,13 +20,17 @@ //! assert_ready!(fut.poll()); //! ``` -pub mod clock; +// pub mod clock; mod macros; pub mod task; +pub use assertive::{assert_err, assert_ok}; + +/* #[doc(hidden)] pub mod codegen { pub mod futures { pub use futures::*; } } +*/ diff --git a/tokio-test/src/macros.rs b/tokio-test/src/macros.rs index 4dbabd29..722bbd68 100644 --- a/tokio-test/src/macros.rs +++ b/tokio-test/src/macros.rs @@ -1,59 +1,80 @@ //! A collection of useful macros for testing futures and tokio based code -/// Assert if a poll is ready +/// Assert a `Poll` is ready, returning the value. #[macro_export] macro_rules! assert_ready { ($e:expr) => {{ - use $crate::codegen::futures::Async::Ready; + use core::task::Poll::*; match $e { - Ok(Ready(v)) => v, - Ok(_) => panic!("not ready"), - Err(e) => panic!("error = {:?}", e), + Ready(v) => v, + Pending => panic!("pending"), } }}; ($e:expr, $($msg:tt),+) => {{ - use $crate::codegen::futures::Async::Ready; + use core::task::Poll::*; match $e { - Ok(Ready(v)) => v, - Ok(_) => { + Ready(v) => v, + Pending => { let msg = format_args!($($msg),+); - panic!("not ready; {}", msg) - } - Err(e) => { - let msg = format!($($msg),+); - panic!("error = {:?}; {}", e, msg) + panic!("pending; {}", msg) } } }}; } -/// Asset if the poll is not ready +/// Assert a `Poll<Result<...>>` is ready and `Ok`, returning the value. +#[macro_export] +macro_rules! assert_ready_ok { + ($e:expr) => {{ + use tokio_test::{assert_ready, assert_ok}; + let val = assert_ready!($e); + assert_ok!(val) + }}; + ($e:expr, $($msg:tt),+) => {{ + use tokio_test::{assert_ready, assert_ok}; + let val = assert_ready!($e, $($msg),*); + assert_ok!(val, $($msg),*) + }}; +} + +/// Assert a `Poll<Result<...>>` is ready and `Err`, returning the error. #[macro_export] -macro_rules! assert_not_ready { +macro_rules! assert_ready_err { ($e:expr) => {{ - use $crate::codegen::futures::Async::{Ready, NotReady}; + use tokio_test::{assert_ready, assert_err}; + let val = assert_ready!($e); + assert_err!(val) + }}; + ($e:expr, $($msg:tt),+) => {{ + use tokio_test::{assert_ready, assert_err}; + let val = assert_ready!($e, $($msg),*); + assert_err!(val, $($msg),*) + }}; +} + +/// Asset a `Poll` is pending. +#[macro_export] +macro_rules! assert_pending { + ($e:expr) => {{ + use core::task::Poll::*; match $e { - Ok(NotReady) => {} - Ok(Ready(v)) => panic!("ready; value = {:?}", v), - Err(e) => panic!("error = {:?}", e), + Pending => {} + Ready(v) => panic!("ready; value = {:?}", v), } }}; ($e:expr, $($msg:tt),+) => {{ - use $crate::codegen::futures::Async::{Ready, NotReady}; + use core::task::Poll::*; match $e { - Ok(NotReady) => {} - Ok(Ready(v)) => { + Pending => {} + Ready(v) => { let msg = format_args!($($msg),+); panic!("ready; value = {:?}; {}", v, msg) } - Err(e) => { - let msg = format_args!($($msg),+); - panic!("error = {:?}; {}", e, msg) - } } }}; } +/* /// Assert if a poll is ready and check for equality on the value #[macro_export] macro_rules! assert_ready_eq { @@ -76,7 +97,9 @@ macro_rules! assert_ready_eq { } }; } +*/ +/* /// Assert if the deadline has passed #[macro_export] macro_rules! assert_elapsed { @@ -88,6 +111,7 @@ macro_rules! assert_elapsed { assert!($e.unwrap_err().is_elapsed(), $msg); }; } +*/ #[cfg(test)] mod tests { diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index 328d2064..f8944a74 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -17,115 +17,164 @@ //! assert_ready_eq!(task.enter(|| rx.poll()), Some(())); //! ``` -use futures::executor::{spawn, Notify}; -use futures::{future, Async}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_executor::enter; + +use pin_convert::AsPinMut; +use std::future::Future; +use std::mem; use std::sync::{Arc, Condvar, Mutex}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; /// Mock task /// -/// A mock task is able to intercept and track notifications. +/// A mock task is able to intercept and track wake notifications. #[derive(Debug)] pub struct MockTask { - notify: Arc<ThreadNotify>, + waker: Arc<ThreadWaker>, } #[derive(Debug)] -struct ThreadNotify { - state: AtomicUsize, - mutex: Mutex<()>, +struct ThreadWaker { + state: Mutex<usize>, condvar: Condvar, } const IDLE: usize = 0; -const NOTIFY: usize = 1; +const WAKE: usize = 1; const SLEEP: usize = 2; impl MockTask { /// Create a new mock task pub fn new() -> Self { MockTask { - notify: Arc::new(ThreadNotify::new()), + waker: Arc::new(ThreadWaker::new()), } } + /// Poll a future + pub fn poll<T, F>(&mut self, mut fut: T) -> Poll<F::Output> + where + T: AsPinMut<F>, + F: Future, + { + self.enter(|cx| fut.as_pin_mut().poll(cx)) + } + /// Run a closure from the context of the task. /// - /// Any notifications resulting from the execution of the closure are + /// Any wake notifications resulting from the execution of the closure are /// tracked. pub fn enter<F, R>(&mut self, f: F) -> R where - F: FnOnce() -> R, + F: FnOnce(&mut Context<'_>) -> R, { - self.notify.clear(); + let _enter = enter().unwrap(); - let res = spawn(future::lazy(|| Ok::<_, ()>(f()))).poll_future_notify(&self.notify, 0); + self.waker.clear(); + let waker = self.waker(); + let mut cx = Context::from_waker(&waker); - match res.unwrap() { - Async::Ready(v) => v, - _ => unreachable!(), - } + f(&mut cx) } - /// Returns `true` if the inner future has received a readiness notification + /// Returns `true` if the inner future has received a wake notification /// since the last call to `enter`. - pub fn is_notified(&self) -> bool { - self.notify.is_notified() + pub fn is_woken(&self) -> bool { + self.waker.is_woken() } - /// Returns the number of references to the task notifier + /// Returns the number of references to the task waker /// /// The task itself holds a reference. The return value will never be zero. - pub fn notifier_ref_count(&self) -> usize { - Arc::strong_count(&self.notify) + pub fn waker_ref_count(&self) -> usize { + Arc::strong_count(&self.waker) + } + + fn waker(&self) -> Waker { + unsafe { + let raw = to_raw(self.waker.clone()); + Waker::from_raw(raw) + } } } -impl ThreadNotify { +impl ThreadWaker { fn new() -> Self { - ThreadNotify { - state: AtomicUsize::new(IDLE), - mutex: Mutex::new(()), + ThreadWaker { + state: Mutex::new(IDLE), condvar: Condvar::new(), } } - /// Clears any previously received notify, avoiding potential spurrious - /// notifications. This should only be called immediately before running the + /// Clears any previously received wakes, avoiding potential spurrious + /// wake notifications. This should only be called immediately before running the /// task. fn clear(&self) { - self.state.store(IDLE, Ordering::SeqCst); + *self.state.lock().unwrap() = IDLE; } - fn is_notified(&self) -> bool { - match self.state.load(Ordering::SeqCst) { + fn is_woken(&self) -> bool { + match *self.state.lock().unwrap() { IDLE => false, - NOTIFY => true, + WAKE => true, _ => unreachable!(), } } -} -impl Notify for ThreadNotify { - fn notify(&self, _unpark_id: usize) { + fn wake(&self) { // First, try transitioning from IDLE -> NOTIFY, this does not require a // lock. - match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { - IDLE | NOTIFY => return, - SLEEP => {} - _ => unreachable!(), + let mut state = self.state.lock().unwrap(); + let prev = *state; + + if prev == WAKE { + return; } - // The other half is sleeping, this requires a lock - let _m = self.mutex.lock().unwrap(); + *state = WAKE; - // Transition from SLEEP -> NOTIFY - match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { - SLEEP => {} - _ => return, + if prev == IDLE { + return; } - // Wakeup the sleeper + // The other half is sleeping, so we wake it up. + assert_eq!(prev, SLEEP); self.condvar.notify_one(); } } + +static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop); + +unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker { + RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE) +} + +unsafe fn from_raw(raw: *const ()) -> Arc<ThreadWaker> { + Arc::from_raw(raw as *const ThreadWaker) +} + +unsafe fn clone(raw: *const ()) -> RawWaker { + let waker = from_raw(raw); + + // Increment the ref count + mem::forget(waker.clone()); + + to_raw(waker) +} + +unsafe fn wake(raw: *const ()) { + let waker = from_raw(raw); + waker.wake(); +} + +unsafe fn wake_by_ref(raw: *const ()) { + let waker = from_raw(raw); + waker.wake(); + + // We don't actually own a reference to the unparker + mem::forget(waker); +} + +unsafe fn drop(raw: *const ()) { + let _ = from_raw(raw); +} |