summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-06 08:47:34 -0800
committerGitHub <noreply@github.com>2020-01-06 08:47:34 -0800
commitf0006006ed9938115011c42f26cff16842eb534f (patch)
tree6bd9d4acd2233dfe8c9fb604d88d535f0325a661 /tokio
parent84ff73e687932d77a1163167b938631b1104d54f (diff)
time: advance frozen time in `park_timeout` (#2059)
This patch improves the behavior of frozen time (a testing utility made available with the `test-util` feature flag). Instead of of requiring `time::advance` to be called in order to advance the value returned by `Instant::now`, calls to `time::Driver::park_timeout` will use the provided duration to advance the time. This is the desired behavior as the timeout is used to indicate when the next scheduled delay needs to be fired.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/context.rs12
-rw-r--r--tokio/src/time/clock.rs14
-rw-r--r--tokio/src/time/driver/entry.rs4
-rw-r--r--tokio/src/time/driver/mod.rs20
-rw-r--r--tokio/src/time/driver/registration.rs6
-rw-r--r--tokio/src/time/tests/mod.rs2
-rw-r--r--tokio/src/time/tests/test_delay.rs567
-rw-r--r--tokio/src/time/tests/test_queue.rs369
-rw-r--r--tokio/tests/time_delay.rs160
-rw-r--r--tokio/tests/time_delay_queue.rs389
10 files changed, 849 insertions, 694 deletions
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
index 6ac60b07..d3bd6982 100644
--- a/tokio/src/runtime/context.rs
+++ b/tokio/src/runtime/context.rs
@@ -81,18 +81,6 @@ impl ThreadContext {
})
}
- #[cfg(all(feature = "test-util", feature = "time", test))]
- pub(crate) fn with_time_handle(mut self, handle: crate::runtime::time::Handle) -> Self {
- self.time_handle = handle;
- self
- }
-
- #[cfg(all(feature = "test-util", feature = "time", test))]
- pub(crate) fn with_clock(mut self, clock: crate::runtime::time::Clock) -> Self {
- self.clock.replace(clock);
- self
- }
-
#[cfg(all(feature = "io-driver", not(loom)))]
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() {
diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs
index 5ece7f5a..ae75740c 100644
--- a/tokio/src/time/clock.rs
+++ b/tokio/src/time/clock.rs
@@ -5,7 +5,7 @@
//! configurable.
cfg_not_test_util! {
- use crate::time::Instant;
+ use crate::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub(crate) struct Clock {}
@@ -22,6 +22,14 @@ cfg_not_test_util! {
pub(crate) fn now(&self) -> Instant {
now()
}
+
+ pub(crate) fn is_frozen(&self) -> bool {
+ false
+ }
+
+ pub(crate) fn advance(&self, _dur: Duration) {
+ unreachable!();
+ }
}
}
@@ -137,6 +145,10 @@ cfg_test_util! {
}
}
+ pub(crate) fn is_frozen(&self) -> bool {
+ self.inner.frozen.lock().unwrap().is_some()
+ }
+
pub(crate) fn advance(&self, duration: Duration) {
let mut frozen = self.inner.frozen.lock().unwrap();
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index d5fab897..079ec7e8 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -104,8 +104,8 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry =====
impl Entry {
- pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> {
- let inner = Handle::current().inner().unwrap();
+ pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
+ let inner = handle.inner().unwrap();
let entry: Entry;
// Increment the number of active timeouts
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index 605d9bca..f74a4853 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -4,7 +4,7 @@ mod atomic_stack;
use self::atomic_stack::AtomicStack;
mod entry;
-use self::entry::Entry;
+pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;
@@ -245,7 +245,14 @@ where
let deadline = self.expiration_instant(when);
if deadline > now {
- self.park.park_timeout(deadline - now)?;
+ let dur = deadline - now;
+
+ if self.clock.is_frozen() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ self.clock.advance(dur);
+ } else {
+ self.park.park_timeout(dur)?;
+ }
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
@@ -269,7 +276,14 @@ where
let deadline = self.expiration_instant(when);
if deadline > now {
- self.park.park_timeout(cmp::min(deadline - now, duration))?;
+ let duration = cmp::min(deadline - now, duration);
+
+ if self.clock.is_frozen() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ self.clock.advance(duration);
+ } else {
+ self.park.park_timeout(duration)?;
+ }
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs
index 728d2993..141ca015 100644
--- a/tokio/src/time/driver/registration.rs
+++ b/tokio/src/time/driver/registration.rs
@@ -1,4 +1,4 @@
-use crate::time::driver::Entry;
+use crate::time::driver::{Entry, Handle};
use crate::time::{Duration, Error, Instant};
use std::sync::Arc;
@@ -15,8 +15,10 @@ pub(crate) struct Registration {
impl Registration {
pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
+ let handle = Handle::current();
+
Registration {
- entry: Entry::new(deadline, duration),
+ entry: Entry::new(&handle, deadline, duration),
}
}
diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs
index 6a9f25da..4710d470 100644
--- a/tokio/src/time/tests/mod.rs
+++ b/tokio/src/time/tests/mod.rs
@@ -1,6 +1,4 @@
-mod mock_clock;
mod test_delay;
-mod test_queue;
use crate::time::{self, Instant};
use std::time::Duration;
diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs
index 43bfe379..388f9b8b 100644
--- a/tokio/src/time/tests/test_delay.rs
+++ b/tokio/src/time/tests/test_delay.rs
@@ -1,461 +1,422 @@
#![warn(rust_2018_idioms)]
-use crate::time::tests::mock_clock::mock;
-use crate::time::{delay_until, Duration, Instant};
+use crate::park::{Park, Unpark};
+use crate::time::driver::{Driver, Entry, Handle};
+use crate::time::Clock;
+use crate::time::{Duration, Instant};
+
use tokio_test::task;
-use tokio_test::{assert_pending, assert_ready};
+use tokio_test::{assert_ok, assert_pending, assert_ready_ok};
+
+use std::sync::Arc;
+
+macro_rules! poll {
+ ($e:expr) => {
+ $e.enter(|cx, e| e.poll_elapsed(cx))
+ };
+}
#[test]
fn immediate_delay() {
- mock(|clock| {
- // Create `Delay` that elapsed immediately.
- let mut fut = task::spawn(delay_until(clock.now()));
+ let (mut driver, clock, handle) = setup();
+
+ let when = clock.now();
+ let mut e = task::spawn(delay_until(&handle, when));
- // Ready!
- assert_ready!(fut.poll());
+ assert_ready_ok!(poll!(e));
- // Turn the timer, it runs for the elapsed time
- clock.turn_for(ms(1000));
+ assert_ok!(driver.park_timeout(Duration::from_millis(1000)));
- // The time has not advanced. The `turn` completed immediately.
- assert_eq!(clock.advanced(), ms(1000));
- });
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(clock.advanced(), ms(1000));
}
#[test]
fn delayed_delay_level_0() {
+ let (mut driver, clock, handle) = setup();
+
+ let start = clock.now();
+
for &i in &[1, 10, 60] {
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(i)));
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, start + ms(i)));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(i));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(i));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e));
}
}
#[test]
fn sub_ms_delayed_delay() {
- mock(|clock| {
- for _ in 0..5 {
- let deadline = clock.now() + Duration::from_millis(1) + Duration::new(0, 1);
+ let (mut driver, clock, handle) = setup();
- let mut fut = task::spawn(delay_until(deadline));
+ for _ in 0..5 {
+ let deadline = clock.now() + ms(1) + Duration::new(0, 1);
- assert_pending!(fut.poll());
+ let mut e = task::spawn(delay_until(&handle, deadline));
- clock.turn();
- assert_ready!(fut.poll());
+ assert_pending!(poll!(e));
- assert!(clock.now() >= deadline);
+ assert_ok!(driver.park());
+ assert_ready_ok!(poll!(e));
- clock.advance(Duration::new(0, 1));
- }
- });
+ assert!(clock.now() >= deadline);
+
+ clock.advance(Duration::new(0, 1));
+ }
}
#[test]
fn delayed_delay_wrapping_level_0() {
- mock(|clock| {
- clock.turn_for(ms(5));
- assert_eq!(clock.advanced(), ms(5));
+ let (mut driver, clock, handle) = setup();
+
+ assert_ok!(driver.park_timeout(ms(5)));
+ assert_eq!(clock.advanced(), ms(5));
- let mut fut = task::spawn(delay_until(clock.now() + ms(60)));
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(60)));
- assert_pending!(fut.poll());
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
- assert_pending!(fut.poll());
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(64));
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(65));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(65));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e));
}
#[test]
fn timer_wrapping_with_higher_levels() {
- mock(|clock| {
- // Set delay to hit level 1
- let mut s1 = task::spawn(delay_until(clock.now() + ms(64)));
- assert_pending!(s1.poll());
+ let (mut driver, clock, handle) = setup();
- // Turn a bit
- clock.turn_for(ms(5));
+ // Set delay to hit level 1
+ let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(64)));
+ assert_pending!(poll!(e1));
- // Set timeout such that it will hit level 0, but wrap
- let mut s2 = task::spawn(delay_until(clock.now() + ms(60)));
- assert_pending!(s2.poll());
+ // Turn a bit
+ assert_ok!(driver.park_timeout(ms(5)));
- // This should result in s1 firing
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
+ // Set timeout such that it will hit level 0, but wrap
+ let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(60)));
+ assert_pending!(poll!(e2));
- assert_ready!(s1.poll());
- assert_pending!(s2.poll());
+ // This should result in s1 firing
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(64));
- clock.turn();
- assert_eq!(clock.advanced(), ms(65));
+ assert_ready_ok!(poll!(e1));
+ assert_pending!(poll!(e2));
- assert_ready!(s2.poll());
- });
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(65));
+
+ assert_ready_ok!(poll!(e1));
}
#[test]
fn delay_with_deadline_in_past() {
- mock(|clock| {
- // Create `Delay` that elapsed immediately.
- let mut fut = task::spawn(delay_until(clock.now() - ms(100)));
+ let (mut driver, clock, handle) = setup();
+
+ // Create `Delay` that elapsed immediately.
+ let mut e = task::spawn(delay_until(&handle, clock.now() - ms(100)));
- // Even though the delay expires in the past, it is not ready yet
- // because the timer must observe it.
- assert_ready!(fut.poll());
+ // Even though the delay expires in the past, it is not ready yet
+ // because the timer must observe it.
+ assert_ready_ok!(poll!(e));
- // Turn the timer, it runs for the elapsed time
- clock.turn_for(ms(1000));
+ // Turn the timer, it runs for the elapsed time
+ assert_ok!(driver.park_timeout(ms(1000)));
- // The time has not advanced. The `turn` completed immediately.
- assert_eq!(clock.advanced(), ms(1000));
- });
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(clock.advanced(), ms(1000));
}
#[test]
fn delayed_delay_level_1() {
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(234)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234)));
- // Turn the timer, this will wake up to cascade the timer down.
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(192));
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Turn the timer, this will wake up to cascade the timer down.
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(192));
- // Turn the timer again
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(234));
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // The delay has elapsed.
- assert_ready!(fut.poll());
- });
+ // Turn the timer again
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(234));
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(234)));
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ let (mut driver, clock, handle) = setup();
- // Turn the timer with a smaller timeout than the cascade.
- clock.turn_for(ms(100));
- assert_eq!(clock.advanced(), ms(100));
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234)));
- assert_pending!(fut.poll());
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // Turn the timer, this will wake up to cascade the timer down.
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(192));
+ // Turn the timer with a smaller timeout than the cascade.
+ assert_ok!(driver.park_timeout(ms(100)));
+ assert_eq!(clock.advanced(), ms(100));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ assert_pending!(poll!(e));
- // Turn the timer again
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(234));
+ // Turn the timer, this will wake up to cascade the timer down.
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(192));
- // The delay has elapsed.
- assert_ready!(fut.poll());
- });
-}
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
-#[test]
-#[should_panic]
-fn creating_delay_outside_of_context() {
- let now = Instant::now();
+ // Turn the timer again
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(234));
- // This creates a delay outside of the context of a mock timer. This tests
- // that it will panic.
- let _fut = task::spawn(delay_until(now + ms(500)));
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
}
#[test]
fn concurrently_set_two_timers_second_one_shorter() {
- mock(|clock| {
- let mut fut1 = task::spawn(delay_until(clock.now() + ms(500)));
- let mut fut2 = task::spawn(delay_until(clock.now() + ms(200)));
+ let (mut driver, clock, handle) = setup();
+
+ let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(500)));
+ let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(200)));
- // The delay has not elapsed
- assert_pending!(fut1.poll());
- assert_pending!(fut2.poll());
+ // The delay has not elapsed
+ assert_pending!(poll!(e1));
+ assert_pending!(poll!(e2));
- // Delay until a cascade
- clock.turn();
- assert_eq!(clock.advanced(), ms(192));
+ // Delay until a cascade
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(192));
- // Delay until the second timer.
- clock.turn();
- assert_eq!(clock.advanced(), ms(200));
+ // Delay until the second timer.
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(200));
- // The shorter delay fires
- assert_ready!(fut2.poll());
- assert_pending!(fut1.poll());
+ // The shorter delay fires
+ assert_ready_ok!(poll!(e2));
+ assert_pending!(poll!(e1));
- clock.turn();
- assert_eq!(clock.advanced(), ms(448));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(448));
- assert_pending!(fut1.poll());
+ assert_pending!(poll!(e1));
- // Turn again, this time the time will advance to the second delay
- clock.turn();
- assert_eq!(clock.advanced(), ms(500));
+ // Turn again, this time the time will advance to the second delay
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(500));
- assert_ready!(fut1.poll());
- })
+ assert_ready_ok!(poll!(e1));
}
#[test]
fn short_delay() {
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(1)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1)));
+
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // Turn the timer, but not enough time will go by.
- clock.turn();
+ // Turn the timer, but not enough time will go by.
+ assert_ok!(driver.park());
- // The delay has elapsed.
- assert_ready!(fut.poll());
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
- // The time has advanced to the point of the delay elapsing.
- assert_eq!(clock.advanced(), ms(1));
- })
+ // The time has advanced to the point of the delay elapsing.
+ assert_eq!(clock.advanced(), ms(1));
}
#[test]
fn sorta_long_delay_until() {
const MIN_5: u64 = 5 * 60 * 1000;
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(MIN_5)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MIN_5)));
- let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- for &elapsed in cascades {
- clock.turn();
- assert_eq!(clock.advanced(), ms(elapsed));
+ let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
- assert_pending!(fut.poll());
- }
+ for &elapsed in cascades {
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(elapsed));
+
+ assert_pending!(poll!(e));
+ }
- clock.turn();
- assert_eq!(clock.advanced(), ms(MIN_5));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(MIN_5));
- // The delay has elapsed.
- assert_ready!(fut.poll());
- })
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
}
#[test]
fn very_long_delay() {
const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(MO_5)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MO_5)));
- let cascades = &[
- 12_884_901_888,
- 12_952_010_752,
- 12_959_875_072,
- 12_959_997_952,
- ];
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- for &elapsed in cascades {
- clock.turn();
- assert_eq!(clock.advanced(), ms(elapsed));
+ let cascades = &[
+ 12_884_901_888,
+ 12_952_010_752,
+ 12_959_875_072,
+ 12_959_997_952,
+ ];
- assert_pending!(fut.poll());
- }
-
- // Turn the timer, but not enough time will go by.
- clock.turn();
+ for &elapsed in cascades {
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(elapsed));
- // The time has advanced to the point of the delay elapsing.
- assert_eq!(clock.advanced(), ms(MO_5));
-
- // The delay has elapsed.
- assert_ready!(fut.poll());
- })
-}
-
-#[test]
-#[should_panic]
-fn greater_than_max() {
- const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
-
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(YR_5)));
+ assert_pending!(poll!(e));
+ }
- assert_pending!(fut.poll());
+ // Turn the timer, but not enough time will go by.
+ assert_ok!(driver.park());
- clock.turn_for(ms(0));
+ // The time has advanced to the point of the delay elapsing.
+ assert_eq!(clock.advanced(), ms(MO_5));
- // boom
- let _ = fut.poll();
- })
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
}
#[test]
fn unpark_is_delayed() {
- mock(|clock| {
- let mut fut1 = task::spawn(delay_until(clock.now() + ms(100)));
- let mut fut2 = task::spawn(delay_until(clock.now() + ms(101)));
- let mut fut3 = task::spawn(delay_until(clock.now() + ms(200)));
+ // A special park that will take much longer than the requested duration
+ struct MockPark(Clock);
- assert_pending!(fut1.poll());
- assert_pending!(fut2.poll());
- assert_pending!(fut3.poll());
+ struct MockUnpark;
- clock.park_for(ms(500));
+ impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
- assert_eq!(clock.advanced(), ms(500));
-
- assert_ready!(fut1.poll());
- assert_ready!(fut2.poll());
- assert_ready!(fut3.poll());
- })
-}
-
-#[test]
-fn set_timeout_at_deadline_greater_than_max_timer() {
- const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
- const YR_5: u64 = 5 * YR_1;
-
- mock(|clock| {
- for _ in 0..5 {
- clock.turn_for(ms(YR_1));
+ fn unpark(&self) -> Self::Unpark {
+ MockUnpark
}
- let mut fut = task::spawn(delay_until(clock.now() + ms(1)));
- assert_pending!(fut.poll());
-
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(YR_5) + ms(1));
+ fn park(&mut self) -> Result<(), Self::Error> {
+ panic!("parking forever");
+ }
- assert_ready!(fut.poll());
- });
-}
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ assert_eq!(duration, ms(0));
+ self.0.advance(ms(436));
+ Ok(())
+ }
+ }
-#[test]
-fn reset_future_delay_before_fire() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
+ impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+ }
- assert_pending!(fut.poll());
+ let clock = Clock::new_frozen();
+ let mut driver = Driver::new(MockPark(clock.clone()), clock.clone());
+ let handle = driver.handle();
- fut.reset(clock.now() + ms(200));
+ let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(100)));
+ let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(101)));
+ let mut e3 = task::spawn(delay_until(&handle, clock.now() + ms(200)));
- clock.turn();
- assert_eq!(clock.advanced(), ms(192));
+ assert_pending!(poll!(e1));
+ assert_pending!(poll!(e2));
+ assert_pending!(poll!(e3));
- assert_pending!(fut.poll());
+ assert_ok!(driver.park());
- clock.turn();
- assert_eq!(clock.advanced(), ms(200));
+ assert_eq!(clock.advanced(), ms(500));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e1));
+ assert_ready_ok!(poll!(e2));
+ assert_ready_ok!(poll!(e3));
}
#[test]
-fn reset_past_delay_before_turn() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
-
- assert_pending!(fut.poll());
+fn set_timeout_at_deadline_greater_than_max_timer() {
+ const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
+ const YR_5: u64 = 5 * YR_1;
- fut.reset(clock.now() + ms(80));
+ let (mut driver, clock, handle) = setup();
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
+ for _ in 0..5 {
+ assert_ok!(driver.park_timeout(ms(YR_1)));
+ }
- assert_pending!(fut.poll());
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1)));
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(80));
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(YR_5) + ms(1));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e));
}
-#[test]
-fn reset_past_delay_before_fire() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
-
- assert_pending!(fut.poll());
- clock.turn_for(ms(10));
-
- assert_pending!(fut.poll());
- fut.reset(clock.now() + ms(80));
+fn setup() -> (Driver<MockPark>, Clock, Handle) {
+ let clock = Clock::new_frozen();
+ let driver = Driver::new(MockPark(clock.clone()), clock.clone());
+ let handle = driver.handle();
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
-
- assert_pending!(fut.poll());
-
- clock.turn();
- assert_eq!(clock.advanced(), ms(90));
-
- assert_ready!(fut.poll());
- });
+ (driver, clock, handle)
}
-#[test]
-fn reset_future_delay_after_fire() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
+fn delay_until(handle: &Handle, when: Instant) -> Arc<Entry> {
+ Entry::new(&handle, when, ms(0))
+}
- assert_pending!(fut.poll());
+struct MockPark(Clock);
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(64));
+struct MockUnpark;
- clock.turn();
- assert_eq!(clock.advanced(), ms(100));
+impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
- assert_ready!(fut.poll());
+ fn unpark(&self) -> Self::Unpark {
+ MockUnpark
+ }
- fut.reset(clock.now() + ms(10));
- assert_pending!(fut.poll());
+ fn park(&mut self) -> Result<(), Self::Error> {
+ panic!("parking forever");
+ }
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(110));
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.0.advance(duration);
+ Ok(())
+ }
+}
- assert_ready!(fut.poll());
- });
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
}
fn ms(n: u64) -> Duration {
diff --git a/tokio/src/time/tests/test_queue.rs b/tokio/src/time/tests/test_queue.rs
deleted file mode 100644
index 34b9b7da..00000000
--- a/tokio/src/time/tests/test_queue.rs
+++ /dev/null
@@ -1,369 +0,0 @@
-#![warn(rust_2018_idioms)]
-
-use crate::time::tests::mock_clock::mock;
-use crate::time::{DelayQueue, Duration};
-use tokio_test::{assert_ok, assert_pending, assert_ready, task};
-
-macro_rules! poll {
- ($queue:ident) => {
- $queue.enter(|cx, mut queue| queue.poll_expired(cx))
- };
-}
-
-macro_rules! assert_ready_ok {
- ($e:expr) => {{
- assert_ok!(match assert_ready!($e) {
- Some(v) => v,
- None => panic!("None"),
- })
- }};
-}
-
-#[test]
-fn single_immediate_delay() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
- let _key = queue.insert_at("foo", clock.now());
-
- let entry = assert_ready_ok!(poll!(queue));
- assert_eq!(*entry.get_ref(), "foo");
-
- let entry = assert_ready!(poll!(queue));
- assert!(entry.is_none())
- });
-}
-
-#[test]
-fn multi_immediate_delays() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- let _k = queue.insert_at("1", clock.now());
- let _k = queue.insert_at("2", clock.now());
- let _k = queue.insert_at("3", clock.now());
-
- let mut res = vec![];
-
- while res.len() < 3 {
- let entry = assert_ready_ok!(poll!(queue));
- res.push(entry.into_inner());
- }
-
- let entry = assert_ready!(poll!(queue));
- assert!(entry.is_none());
-
- res.sort();
-
- assert_eq!("1", res[0]);
- assert_eq!("2", res[1]);
- assert_eq!("3", res[2]);
- });
-}
-
-#[test]
-fn single_short_delay() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
- let _key = queue.insert_at("foo", clock.now() + ms(5));
-
- assert_pending!(poll!(queue));
-
- clock.turn_for(ms(1));
-
- assert!(!queue.is_woken());
-
- clock.turn_for(ms(5));
-
- assert!(queue.is_woken());
-
- let entry = assert_ready_ok!(poll!(queue));
- assert_eq!(*entry.get_ref(), "foo");
-
- let entry = assert_ready!(poll!(queue));
- assert!(entry.is_none());
- });
-}
-
-#[test]
-fn multi_delay_at_start() {
- let long = 262_144 + 9 * 4096;
- let delays = &[1000, 2, 234, long, 60, 10];
-
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- // Setup the delays
- for &i in delays {
- let _key = queue.insert_at(i, clock.now() + ms(i));
- }
-
- assert_pending!(poll!(queue));
- assert!(!queue.is_woken());
-
- for elapsed in 0..1200 {
- clock.turn_for(ms(1));
- let elapsed = elapsed + 1;
-
- if delays.contains(&elapsed) {
- assert!(queue.is_woken());
- assert_ready!(poll!(queue));
- assert_pending!(poll!(queue));
- } else if queue.is_woken() {
- let cascade = &[192, 960];
- assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
-
- assert_pending!(poll!(queue));
- }
- }
- });
-}
-
-#[test]
-fn insert_in_past_fires_immediately() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- let now = clock.now();
-
- clock.turn_for(ms(10));
-
- queue.insert_at("foo", now);
-
- assert_ready!(poll!(queue));
- });
-}
-
-#[test]