summaryrefslogtreecommitdiffstats
path: root/tokio-test
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-06-24 12:34:30 -0700
committerGitHub <noreply@github.com>2019-06-24 12:34:30 -0700
commit06c473e62842d257ed275497ce906710ea3f8e19 (patch)
tree4ca6d337a892aa23266a761b35dc61e988e57954 /tokio-test
parentaa99950b9c983b842bd2107bb771c277d09d495d (diff)
Update Tokio to use `std::future`. (#1120)
A first pass at updating Tokio to use `std::future`. Implementations of `Future` from the futures crate are updated to implement `Future` from std. Implementations of `Stream` are moved to a feature flag. This commits disables a number of crates that have not yet been updated.
Diffstat (limited to 'tokio-test')
-rw-r--r--tokio-test/Cargo.toml5
-rw-r--r--tokio-test/src/lib.rs6
-rw-r--r--tokio-test/src/macros.rs76
-rw-r--r--tokio-test/src/task.rs145
4 files changed, 155 insertions, 77 deletions
diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml
index 52b0113e..3b090a28 100644
--- a/tokio-test/Cargo.toml
+++ b/tokio-test/Cargo.toml
@@ -22,6 +22,7 @@ categories = ["asynchronous", "testing"]
publish = false
[dependencies]
-futures = "0.1"
-tokio-timer = { version = "0.3.0", path = "../tokio-timer" }
+assertive = { git = "http://github.com/carllerche/assertive" }
+pin-convert = "0.1.0"
+# tokio-timer = { version = "0.3.0", path = "../tokio-timer" }
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }
diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs
index 256759ea..eca6762a 100644
--- a/tokio-test/src/lib.rs
+++ b/tokio-test/src/lib.rs
@@ -20,13 +20,17 @@
//! assert_ready!(fut.poll());
//! ```
-pub mod clock;
+// pub mod clock;
mod macros;
pub mod task;
+pub use assertive::{assert_err, assert_ok};
+
+/*
#[doc(hidden)]
pub mod codegen {
pub mod futures {
pub use futures::*;
}
}
+*/
diff --git a/tokio-test/src/macros.rs b/tokio-test/src/macros.rs
index 4dbabd29..722bbd68 100644
--- a/tokio-test/src/macros.rs
+++ b/tokio-test/src/macros.rs
@@ -1,59 +1,80 @@
//! A collection of useful macros for testing futures and tokio based code
-/// Assert if a poll is ready
+/// Assert a `Poll` is ready, returning the value.
#[macro_export]
macro_rules! assert_ready {
($e:expr) => {{
- use $crate::codegen::futures::Async::Ready;
+ use core::task::Poll::*;
match $e {
- Ok(Ready(v)) => v,
- Ok(_) => panic!("not ready"),
- Err(e) => panic!("error = {:?}", e),
+ Ready(v) => v,
+ Pending => panic!("pending"),
}
}};
($e:expr, $($msg:tt),+) => {{
- use $crate::codegen::futures::Async::Ready;
+ use core::task::Poll::*;
match $e {
- Ok(Ready(v)) => v,
- Ok(_) => {
+ Ready(v) => v,
+ Pending => {
let msg = format_args!($($msg),+);
- panic!("not ready; {}", msg)
- }
- Err(e) => {
- let msg = format!($($msg),+);
- panic!("error = {:?}; {}", e, msg)
+ panic!("pending; {}", msg)
}
}
}};
}
-/// Asset if the poll is not ready
+/// Assert a `Poll<Result<...>>` is ready and `Ok`, returning the value.
+#[macro_export]
+macro_rules! assert_ready_ok {
+ ($e:expr) => {{
+ use tokio_test::{assert_ready, assert_ok};
+ let val = assert_ready!($e);
+ assert_ok!(val)
+ }};
+ ($e:expr, $($msg:tt),+) => {{
+ use tokio_test::{assert_ready, assert_ok};
+ let val = assert_ready!($e, $($msg),*);
+ assert_ok!(val, $($msg),*)
+ }};
+}
+
+/// Assert a `Poll<Result<...>>` is ready and `Err`, returning the error.
#[macro_export]
-macro_rules! assert_not_ready {
+macro_rules! assert_ready_err {
($e:expr) => {{
- use $crate::codegen::futures::Async::{Ready, NotReady};
+ use tokio_test::{assert_ready, assert_err};
+ let val = assert_ready!($e);
+ assert_err!(val)
+ }};
+ ($e:expr, $($msg:tt),+) => {{
+ use tokio_test::{assert_ready, assert_err};
+ let val = assert_ready!($e, $($msg),*);
+ assert_err!(val, $($msg),*)
+ }};
+}
+
+/// Asset a `Poll` is pending.
+#[macro_export]
+macro_rules! assert_pending {
+ ($e:expr) => {{
+ use core::task::Poll::*;
match $e {
- Ok(NotReady) => {}
- Ok(Ready(v)) => panic!("ready; value = {:?}", v),
- Err(e) => panic!("error = {:?}", e),
+ Pending => {}
+ Ready(v) => panic!("ready; value = {:?}", v),
}
}};
($e:expr, $($msg:tt),+) => {{
- use $crate::codegen::futures::Async::{Ready, NotReady};
+ use core::task::Poll::*;
match $e {
- Ok(NotReady) => {}
- Ok(Ready(v)) => {
+ Pending => {}
+ Ready(v) => {
let msg = format_args!($($msg),+);
panic!("ready; value = {:?}; {}", v, msg)
}
- Err(e) => {
- let msg = format_args!($($msg),+);
- panic!("error = {:?}; {}", e, msg)
- }
}
}};
}
+/*
/// Assert if a poll is ready and check for equality on the value
#[macro_export]
macro_rules! assert_ready_eq {
@@ -76,7 +97,9 @@ macro_rules! assert_ready_eq {
}
};
}
+*/
+/*
/// Assert if the deadline has passed
#[macro_export]
macro_rules! assert_elapsed {
@@ -88,6 +111,7 @@ macro_rules! assert_elapsed {
assert!($e.unwrap_err().is_elapsed(), $msg);
};
}
+*/
#[cfg(test)]
mod tests {
diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs
index 328d2064..f8944a74 100644
--- a/tokio-test/src/task.rs
+++ b/tokio-test/src/task.rs
@@ -17,115 +17,164 @@
//! assert_ready_eq!(task.enter(|| rx.poll()), Some(()));
//! ```
-use futures::executor::{spawn, Notify};
-use futures::{future, Async};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use tokio_executor::enter;
+
+use pin_convert::AsPinMut;
+use std::future::Future;
+use std::mem;
use std::sync::{Arc, Condvar, Mutex};
+use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
/// Mock task
///
-/// A mock task is able to intercept and track notifications.
+/// A mock task is able to intercept and track wake notifications.
#[derive(Debug)]
pub struct MockTask {
- notify: Arc<ThreadNotify>,
+ waker: Arc<ThreadWaker>,
}
#[derive(Debug)]
-struct ThreadNotify {
- state: AtomicUsize,
- mutex: Mutex<()>,
+struct ThreadWaker {
+ state: Mutex<usize>,
condvar: Condvar,
}
const IDLE: usize = 0;
-const NOTIFY: usize = 1;
+const WAKE: usize = 1;
const SLEEP: usize = 2;
impl MockTask {
/// Create a new mock task
pub fn new() -> Self {
MockTask {
- notify: Arc::new(ThreadNotify::new()),
+ waker: Arc::new(ThreadWaker::new()),
}
}
+ /// Poll a future
+ pub fn poll<T, F>(&mut self, mut fut: T) -> Poll<F::Output>
+ where
+ T: AsPinMut<F>,
+ F: Future,
+ {
+ self.enter(|cx| fut.as_pin_mut().poll(cx))
+ }
+
/// Run a closure from the context of the task.
///
- /// Any notifications resulting from the execution of the closure are
+ /// Any wake notifications resulting from the execution of the closure are
/// tracked.
pub fn enter<F, R>(&mut self, f: F) -> R
where
- F: FnOnce() -> R,
+ F: FnOnce(&mut Context<'_>) -> R,
{
- self.notify.clear();
+ let _enter = enter().unwrap();
- let res = spawn(future::lazy(|| Ok::<_, ()>(f()))).poll_future_notify(&self.notify, 0);
+ self.waker.clear();
+ let waker = self.waker();
+ let mut cx = Context::from_waker(&waker);
- match res.unwrap() {
- Async::Ready(v) => v,
- _ => unreachable!(),
- }
+ f(&mut cx)
}
- /// Returns `true` if the inner future has received a readiness notification
+ /// Returns `true` if the inner future has received a wake notification
/// since the last call to `enter`.
- pub fn is_notified(&self) -> bool {
- self.notify.is_notified()
+ pub fn is_woken(&self) -> bool {
+ self.waker.is_woken()
}
- /// Returns the number of references to the task notifier
+ /// Returns the number of references to the task waker
///
/// The task itself holds a reference. The return value will never be zero.
- pub fn notifier_ref_count(&self) -> usize {
- Arc::strong_count(&self.notify)
+ pub fn waker_ref_count(&self) -> usize {
+ Arc::strong_count(&self.waker)
+ }
+
+ fn waker(&self) -> Waker {
+ unsafe {
+ let raw = to_raw(self.waker.clone());
+ Waker::from_raw(raw)
+ }
}
}
-impl ThreadNotify {
+impl ThreadWaker {
fn new() -> Self {
- ThreadNotify {
- state: AtomicUsize::new(IDLE),
- mutex: Mutex::new(()),
+ ThreadWaker {
+ state: Mutex::new(IDLE),
condvar: Condvar::new(),
}
}
- /// Clears any previously received notify, avoiding potential spurrious
- /// notifications. This should only be called immediately before running the
+ /// Clears any previously received wakes, avoiding potential spurrious
+ /// wake notifications. This should only be called immediately before running the
/// task.
fn clear(&self) {
- self.state.store(IDLE, Ordering::SeqCst);
+ *self.state.lock().unwrap() = IDLE;
}
- fn is_notified(&self) -> bool {
- match self.state.load(Ordering::SeqCst) {
+ fn is_woken(&self) -> bool {
+ match *self.state.lock().unwrap() {
IDLE => false,
- NOTIFY => true,
+ WAKE => true,
_ => unreachable!(),
}
}
-}
-impl Notify for ThreadNotify {
- fn notify(&self, _unpark_id: usize) {
+ fn wake(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
- match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
- IDLE | NOTIFY => return,
- SLEEP => {}
- _ => unreachable!(),
+ let mut state = self.state.lock().unwrap();
+ let prev = *state;
+
+ if prev == WAKE {
+ return;
}
- // The other half is sleeping, this requires a lock
- let _m = self.mutex.lock().unwrap();
+ *state = WAKE;
- // Transition from SLEEP -> NOTIFY
- match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
- SLEEP => {}
- _ => return,
+ if prev == IDLE {
+ return;
}
- // Wakeup the sleeper
+ // The other half is sleeping, so we wake it up.
+ assert_eq!(prev, SLEEP);
self.condvar.notify_one();
}
}
+
+static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
+
+unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker {
+ RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE)
+}
+
+unsafe fn from_raw(raw: *const ()) -> Arc<ThreadWaker> {
+ Arc::from_raw(raw as *const ThreadWaker)
+}
+
+unsafe fn clone(raw: *const ()) -> RawWaker {
+ let waker = from_raw(raw);
+
+ // Increment the ref count
+ mem::forget(waker.clone());
+
+ to_raw(waker)
+}
+
+unsafe fn wake(raw: *const ()) {
+ let waker = from_raw(raw);
+ waker.wake();
+}
+
+unsafe fn wake_by_ref(raw: *const ()) {
+ let waker = from_raw(raw);
+ waker.wake();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(waker);
+}
+
+unsafe fn drop(raw: *const ()) {
+ let _ = from_raw(raw);
+}