summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/context.rs12
-rw-r--r--tokio/src/time/clock.rs14
-rw-r--r--tokio/src/time/driver/entry.rs4
-rw-r--r--tokio/src/time/driver/mod.rs20
-rw-r--r--tokio/src/time/driver/registration.rs6
-rw-r--r--tokio/src/time/tests/mod.rs2
-rw-r--r--tokio/src/time/tests/test_delay.rs567
-rw-r--r--tokio/src/time/tests/test_queue.rs369
-rw-r--r--tokio/tests/time_delay.rs160
-rw-r--r--tokio/tests/time_delay_queue.rs389
10 files changed, 849 insertions, 694 deletions
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
index 6ac60b07..d3bd6982 100644
--- a/tokio/src/runtime/context.rs
+++ b/tokio/src/runtime/context.rs
@@ -81,18 +81,6 @@ impl ThreadContext {
})
}
- #[cfg(all(feature = "test-util", feature = "time", test))]
- pub(crate) fn with_time_handle(mut self, handle: crate::runtime::time::Handle) -> Self {
- self.time_handle = handle;
- self
- }
-
- #[cfg(all(feature = "test-util", feature = "time", test))]
- pub(crate) fn with_clock(mut self, clock: crate::runtime::time::Clock) -> Self {
- self.clock.replace(clock);
- self
- }
-
#[cfg(all(feature = "io-driver", not(loom)))]
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() {
diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs
index 5ece7f5a..ae75740c 100644
--- a/tokio/src/time/clock.rs
+++ b/tokio/src/time/clock.rs
@@ -5,7 +5,7 @@
//! configurable.
cfg_not_test_util! {
- use crate::time::Instant;
+ use crate::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub(crate) struct Clock {}
@@ -22,6 +22,14 @@ cfg_not_test_util! {
pub(crate) fn now(&self) -> Instant {
now()
}
+
+ pub(crate) fn is_frozen(&self) -> bool {
+ false
+ }
+
+ pub(crate) fn advance(&self, _dur: Duration) {
+ unreachable!();
+ }
}
}
@@ -137,6 +145,10 @@ cfg_test_util! {
}
}
+ pub(crate) fn is_frozen(&self) -> bool {
+ self.inner.frozen.lock().unwrap().is_some()
+ }
+
pub(crate) fn advance(&self, duration: Duration) {
let mut frozen = self.inner.frozen.lock().unwrap();
diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs
index d5fab897..079ec7e8 100644
--- a/tokio/src/time/driver/entry.rs
+++ b/tokio/src/time/driver/entry.rs
@@ -104,8 +104,8 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry =====
impl Entry {
- pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> {
- let inner = Handle::current().inner().unwrap();
+ pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
+ let inner = handle.inner().unwrap();
let entry: Entry;
// Increment the number of active timeouts
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index 605d9bca..f74a4853 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -4,7 +4,7 @@ mod atomic_stack;
use self::atomic_stack::AtomicStack;
mod entry;
-use self::entry::Entry;
+pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;
@@ -245,7 +245,14 @@ where
let deadline = self.expiration_instant(when);
if deadline > now {
- self.park.park_timeout(deadline - now)?;
+ let dur = deadline - now;
+
+ if self.clock.is_frozen() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ self.clock.advance(dur);
+ } else {
+ self.park.park_timeout(dur)?;
+ }
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
@@ -269,7 +276,14 @@ where
let deadline = self.expiration_instant(when);
if deadline > now {
- self.park.park_timeout(cmp::min(deadline - now, duration))?;
+ let duration = cmp::min(deadline - now, duration);
+
+ if self.clock.is_frozen() {
+ self.park.park_timeout(Duration::from_secs(0))?;
+ self.clock.advance(duration);
+ } else {
+ self.park.park_timeout(duration)?;
+ }
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs
index 728d2993..141ca015 100644
--- a/tokio/src/time/driver/registration.rs
+++ b/tokio/src/time/driver/registration.rs
@@ -1,4 +1,4 @@
-use crate::time::driver::Entry;
+use crate::time::driver::{Entry, Handle};
use crate::time::{Duration, Error, Instant};
use std::sync::Arc;
@@ -15,8 +15,10 @@ pub(crate) struct Registration {
impl Registration {
pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
+ let handle = Handle::current();
+
Registration {
- entry: Entry::new(deadline, duration),
+ entry: Entry::new(&handle, deadline, duration),
}
}
diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs
index 6a9f25da..4710d470 100644
--- a/tokio/src/time/tests/mod.rs
+++ b/tokio/src/time/tests/mod.rs
@@ -1,6 +1,4 @@
-mod mock_clock;
mod test_delay;
-mod test_queue;
use crate::time::{self, Instant};
use std::time::Duration;
diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs
index 43bfe379..388f9b8b 100644
--- a/tokio/src/time/tests/test_delay.rs
+++ b/tokio/src/time/tests/test_delay.rs
@@ -1,461 +1,422 @@
#![warn(rust_2018_idioms)]
-use crate::time::tests::mock_clock::mock;
-use crate::time::{delay_until, Duration, Instant};
+use crate::park::{Park, Unpark};
+use crate::time::driver::{Driver, Entry, Handle};
+use crate::time::Clock;
+use crate::time::{Duration, Instant};
+
use tokio_test::task;
-use tokio_test::{assert_pending, assert_ready};
+use tokio_test::{assert_ok, assert_pending, assert_ready_ok};
+
+use std::sync::Arc;
+
+macro_rules! poll {
+ ($e:expr) => {
+ $e.enter(|cx, e| e.poll_elapsed(cx))
+ };
+}
#[test]
fn immediate_delay() {
- mock(|clock| {
- // Create `Delay` that elapsed immediately.
- let mut fut = task::spawn(delay_until(clock.now()));
+ let (mut driver, clock, handle) = setup();
+
+ let when = clock.now();
+ let mut e = task::spawn(delay_until(&handle, when));
- // Ready!
- assert_ready!(fut.poll());
+ assert_ready_ok!(poll!(e));
- // Turn the timer, it runs for the elapsed time
- clock.turn_for(ms(1000));
+ assert_ok!(driver.park_timeout(Duration::from_millis(1000)));
- // The time has not advanced. The `turn` completed immediately.
- assert_eq!(clock.advanced(), ms(1000));
- });
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(clock.advanced(), ms(1000));
}
#[test]
fn delayed_delay_level_0() {
+ let (mut driver, clock, handle) = setup();
+
+ let start = clock.now();
+
for &i in &[1, 10, 60] {
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(i)));
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, start + ms(i)));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(i));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(i));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e));
}
}
#[test]
fn sub_ms_delayed_delay() {
- mock(|clock| {
- for _ in 0..5 {
- let deadline = clock.now() + Duration::from_millis(1) + Duration::new(0, 1);
+ let (mut driver, clock, handle) = setup();
- let mut fut = task::spawn(delay_until(deadline));
+ for _ in 0..5 {
+ let deadline = clock.now() + ms(1) + Duration::new(0, 1);
- assert_pending!(fut.poll());
+ let mut e = task::spawn(delay_until(&handle, deadline));
- clock.turn();
- assert_ready!(fut.poll());
+ assert_pending!(poll!(e));
- assert!(clock.now() >= deadline);
+ assert_ok!(driver.park());
+ assert_ready_ok!(poll!(e));
- clock.advance(Duration::new(0, 1));
- }
- });
+ assert!(clock.now() >= deadline);
+
+ clock.advance(Duration::new(0, 1));
+ }
}
#[test]
fn delayed_delay_wrapping_level_0() {
- mock(|clock| {
- clock.turn_for(ms(5));
- assert_eq!(clock.advanced(), ms(5));
+ let (mut driver, clock, handle) = setup();
+
+ assert_ok!(driver.park_timeout(ms(5)));
+ assert_eq!(clock.advanced(), ms(5));
- let mut fut = task::spawn(delay_until(clock.now() + ms(60)));
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(60)));
- assert_pending!(fut.poll());
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
- assert_pending!(fut.poll());
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(64));
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(65));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(65));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e));
}
#[test]
fn timer_wrapping_with_higher_levels() {
- mock(|clock| {
- // Set delay to hit level 1
- let mut s1 = task::spawn(delay_until(clock.now() + ms(64)));
- assert_pending!(s1.poll());
+ let (mut driver, clock, handle) = setup();
- // Turn a bit
- clock.turn_for(ms(5));
+ // Set delay to hit level 1
+ let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(64)));
+ assert_pending!(poll!(e1));
- // Set timeout such that it will hit level 0, but wrap
- let mut s2 = task::spawn(delay_until(clock.now() + ms(60)));
- assert_pending!(s2.poll());
+ // Turn a bit
+ assert_ok!(driver.park_timeout(ms(5)));
- // This should result in s1 firing
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
+ // Set timeout such that it will hit level 0, but wrap
+ let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(60)));
+ assert_pending!(poll!(e2));
- assert_ready!(s1.poll());
- assert_pending!(s2.poll());
+ // This should result in s1 firing
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(64));
- clock.turn();
- assert_eq!(clock.advanced(), ms(65));
+ assert_ready_ok!(poll!(e1));
+ assert_pending!(poll!(e2));
- assert_ready!(s2.poll());
- });
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(65));
+
+ assert_ready_ok!(poll!(e1));
}
#[test]
fn delay_with_deadline_in_past() {
- mock(|clock| {
- // Create `Delay` that elapsed immediately.
- let mut fut = task::spawn(delay_until(clock.now() - ms(100)));
+ let (mut driver, clock, handle) = setup();
+
+ // Create `Delay` that elapsed immediately.
+ let mut e = task::spawn(delay_until(&handle, clock.now() - ms(100)));
- // Even though the delay expires in the past, it is not ready yet
- // because the timer must observe it.
- assert_ready!(fut.poll());
+ // Even though the delay expires in the past, it is not ready yet
+ // because the timer must observe it.
+ assert_ready_ok!(poll!(e));
- // Turn the timer, it runs for the elapsed time
- clock.turn_for(ms(1000));
+ // Turn the timer, it runs for the elapsed time
+ assert_ok!(driver.park_timeout(ms(1000)));
- // The time has not advanced. The `turn` completed immediately.
- assert_eq!(clock.advanced(), ms(1000));
- });
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(clock.advanced(), ms(1000));
}
#[test]
fn delayed_delay_level_1() {
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(234)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234)));
- // Turn the timer, this will wake up to cascade the timer down.
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(192));
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Turn the timer, this will wake up to cascade the timer down.
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(192));
- // Turn the timer again
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(234));
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // The delay has elapsed.
- assert_ready!(fut.poll());
- });
+ // Turn the timer again
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(234));
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(234)));
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ let (mut driver, clock, handle) = setup();
- // Turn the timer with a smaller timeout than the cascade.
- clock.turn_for(ms(100));
- assert_eq!(clock.advanced(), ms(100));
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234)));
- assert_pending!(fut.poll());
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // Turn the timer, this will wake up to cascade the timer down.
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(192));
+ // Turn the timer with a smaller timeout than the cascade.
+ assert_ok!(driver.park_timeout(ms(100)));
+ assert_eq!(clock.advanced(), ms(100));
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ assert_pending!(poll!(e));
- // Turn the timer again
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(234));
+ // Turn the timer, this will wake up to cascade the timer down.
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(192));
- // The delay has elapsed.
- assert_ready!(fut.poll());
- });
-}
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
-#[test]
-#[should_panic]
-fn creating_delay_outside_of_context() {
- let now = Instant::now();
+ // Turn the timer again
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(234));
- // This creates a delay outside of the context of a mock timer. This tests
- // that it will panic.
- let _fut = task::spawn(delay_until(now + ms(500)));
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
}
#[test]
fn concurrently_set_two_timers_second_one_shorter() {
- mock(|clock| {
- let mut fut1 = task::spawn(delay_until(clock.now() + ms(500)));
- let mut fut2 = task::spawn(delay_until(clock.now() + ms(200)));
+ let (mut driver, clock, handle) = setup();
+
+ let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(500)));
+ let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(200)));
- // The delay has not elapsed
- assert_pending!(fut1.poll());
- assert_pending!(fut2.poll());
+ // The delay has not elapsed
+ assert_pending!(poll!(e1));
+ assert_pending!(poll!(e2));
- // Delay until a cascade
- clock.turn();
- assert_eq!(clock.advanced(), ms(192));
+ // Delay until a cascade
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(192));
- // Delay until the second timer.
- clock.turn();
- assert_eq!(clock.advanced(), ms(200));
+ // Delay until the second timer.
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(200));
- // The shorter delay fires
- assert_ready!(fut2.poll());
- assert_pending!(fut1.poll());
+ // The shorter delay fires
+ assert_ready_ok!(poll!(e2));
+ assert_pending!(poll!(e1));
- clock.turn();
- assert_eq!(clock.advanced(), ms(448));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(448));
- assert_pending!(fut1.poll());
+ assert_pending!(poll!(e1));
- // Turn again, this time the time will advance to the second delay
- clock.turn();
- assert_eq!(clock.advanced(), ms(500));
+ // Turn again, this time the time will advance to the second delay
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(500));
- assert_ready!(fut1.poll());
- })
+ assert_ready_ok!(poll!(e1));
}
#[test]
fn short_delay() {
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(1)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1)));
+
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- // Turn the timer, but not enough time will go by.
- clock.turn();
+ // Turn the timer, but not enough time will go by.
+ assert_ok!(driver.park());
- // The delay has elapsed.
- assert_ready!(fut.poll());
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
- // The time has advanced to the point of the delay elapsing.
- assert_eq!(clock.advanced(), ms(1));
- })
+ // The time has advanced to the point of the delay elapsing.
+ assert_eq!(clock.advanced(), ms(1));
}
#[test]
fn sorta_long_delay_until() {
const MIN_5: u64 = 5 * 60 * 1000;
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(MIN_5)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MIN_5)));
- let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- for &elapsed in cascades {
- clock.turn();
- assert_eq!(clock.advanced(), ms(elapsed));
+ let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
- assert_pending!(fut.poll());
- }
+ for &elapsed in cascades {
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(elapsed));
+
+ assert_pending!(poll!(e));
+ }
- clock.turn();
- assert_eq!(clock.advanced(), ms(MIN_5));
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(MIN_5));
- // The delay has elapsed.
- assert_ready!(fut.poll());
- })
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
}
#[test]
fn very_long_delay() {
const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(MO_5)));
+ let (mut driver, clock, handle) = setup();
- // The delay has not elapsed.
- assert_pending!(fut.poll());
+ // Create a `Delay` that elapses in the future
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MO_5)));
- let cascades = &[
- 12_884_901_888,
- 12_952_010_752,
- 12_959_875_072,
- 12_959_997_952,
- ];
+ // The delay has not elapsed.
+ assert_pending!(poll!(e));
- for &elapsed in cascades {
- clock.turn();
- assert_eq!(clock.advanced(), ms(elapsed));
+ let cascades = &[
+ 12_884_901_888,
+ 12_952_010_752,
+ 12_959_875_072,
+ 12_959_997_952,
+ ];
- assert_pending!(fut.poll());
- }
-
- // Turn the timer, but not enough time will go by.
- clock.turn();
+ for &elapsed in cascades {
+ assert_ok!(driver.park());
+ assert_eq!(clock.advanced(), ms(elapsed));
- // The time has advanced to the point of the delay elapsing.
- assert_eq!(clock.advanced(), ms(MO_5));
-
- // The delay has elapsed.
- assert_ready!(fut.poll());
- })
-}
-
-#[test]
-#[should_panic]
-fn greater_than_max() {
- const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
-
- mock(|clock| {
- // Create a `Delay` that elapses in the future
- let mut fut = task::spawn(delay_until(clock.now() + ms(YR_5)));
+ assert_pending!(poll!(e));
+ }
- assert_pending!(fut.poll());
+ // Turn the timer, but not enough time will go by.
+ assert_ok!(driver.park());
- clock.turn_for(ms(0));
+ // The time has advanced to the point of the delay elapsing.
+ assert_eq!(clock.advanced(), ms(MO_5));
- // boom
- let _ = fut.poll();
- })
+ // The delay has elapsed.
+ assert_ready_ok!(poll!(e));
}
#[test]
fn unpark_is_delayed() {
- mock(|clock| {
- let mut fut1 = task::spawn(delay_until(clock.now() + ms(100)));
- let mut fut2 = task::spawn(delay_until(clock.now() + ms(101)));
- let mut fut3 = task::spawn(delay_until(clock.now() + ms(200)));
+ // A special park that will take much longer than the requested duration
+ struct MockPark(Clock);
- assert_pending!(fut1.poll());
- assert_pending!(fut2.poll());
- assert_pending!(fut3.poll());
+ struct MockUnpark;
- clock.park_for(ms(500));
+ impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
- assert_eq!(clock.advanced(), ms(500));
-
- assert_ready!(fut1.poll());
- assert_ready!(fut2.poll());
- assert_ready!(fut3.poll());
- })
-}
-
-#[test]
-fn set_timeout_at_deadline_greater_than_max_timer() {
- const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
- const YR_5: u64 = 5 * YR_1;
-
- mock(|clock| {
- for _ in 0..5 {
- clock.turn_for(ms(YR_1));
+ fn unpark(&self) -> Self::Unpark {
+ MockUnpark
}
- let mut fut = task::spawn(delay_until(clock.now() + ms(1)));
- assert_pending!(fut.poll());
-
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(YR_5) + ms(1));
+ fn park(&mut self) -> Result<(), Self::Error> {
+ panic!("parking forever");
+ }
- assert_ready!(fut.poll());
- });
-}
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ assert_eq!(duration, ms(0));
+ self.0.advance(ms(436));
+ Ok(())
+ }
+ }
-#[test]
-fn reset_future_delay_before_fire() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
+ impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+ }
- assert_pending!(fut.poll());
+ let clock = Clock::new_frozen();
+ let mut driver = Driver::new(MockPark(clock.clone()), clock.clone());
+ let handle = driver.handle();
- fut.reset(clock.now() + ms(200));
+ let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(100)));
+ let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(101)));
+ let mut e3 = task::spawn(delay_until(&handle, clock.now() + ms(200)));
- clock.turn();
- assert_eq!(clock.advanced(), ms(192));
+ assert_pending!(poll!(e1));
+ assert_pending!(poll!(e2));
+ assert_pending!(poll!(e3));
- assert_pending!(fut.poll());
+ assert_ok!(driver.park());
- clock.turn();
- assert_eq!(clock.advanced(), ms(200));
+ assert_eq!(clock.advanced(), ms(500));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e1));
+ assert_ready_ok!(poll!(e2));
+ assert_ready_ok!(poll!(e3));
}
#[test]
-fn reset_past_delay_before_turn() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
-
- assert_pending!(fut.poll());
+fn set_timeout_at_deadline_greater_than_max_timer() {
+ const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
+ const YR_5: u64 = 5 * YR_1;
- fut.reset(clock.now() + ms(80));
+ let (mut driver, clock, handle) = setup();
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
+ for _ in 0..5 {
+ assert_ok!(driver.park_timeout(ms(YR_1)));
+ }
- assert_pending!(fut.poll());
+ let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1)));
+ assert_pending!(poll!(e));
- clock.turn();
- assert_eq!(clock.advanced(), ms(80));
+ assert_ok!(driver.park_timeout(ms(1000)));
+ assert_eq!(clock.advanced(), ms(YR_5) + ms(1));
- assert_ready!(fut.poll());
- });
+ assert_ready_ok!(poll!(e));
}
-#[test]
-fn reset_past_delay_before_fire() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
-
- assert_pending!(fut.poll());
- clock.turn_for(ms(10));
-
- assert_pending!(fut.poll());
- fut.reset(clock.now() + ms(80));
+fn setup() -> (Driver<MockPark>, Clock, Handle) {
+ let clock = Clock::new_frozen();
+ let driver = Driver::new(MockPark(clock.clone()), clock.clone());
+ let handle = driver.handle();
- clock.turn();
- assert_eq!(clock.advanced(), ms(64));
-
- assert_pending!(fut.poll());
-
- clock.turn();
- assert_eq!(clock.advanced(), ms(90));
-
- assert_ready!(fut.poll());
- });
+ (driver, clock, handle)
}
-#[test]
-fn reset_future_delay_after_fire() {
- mock(|clock| {
- let mut fut = task::spawn(delay_until(clock.now() + ms(100)));
+fn delay_until(handle: &Handle, when: Instant) -> Arc<Entry> {
+ Entry::new(&handle, when, ms(0))
+}
- assert_pending!(fut.poll());
+struct MockPark(Clock);
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(64));
+struct MockUnpark;
- clock.turn();
- assert_eq!(clock.advanced(), ms(100));
+impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
- assert_ready!(fut.poll());
+ fn unpark(&self) -> Self::Unpark {
+ MockUnpark
+ }
- fut.reset(clock.now() + ms(10));
- assert_pending!(fut.poll());
+ fn park(&mut self) -> Result<(), Self::Error> {
+ panic!("parking forever");
+ }
- clock.turn_for(ms(1000));
- assert_eq!(clock.advanced(), ms(110));
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.0.advance(duration);
+ Ok(())
+ }
+}
- assert_ready!(fut.poll());
- });
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
}
fn ms(n: u64) -> Duration {
diff --git a/tokio/src/time/tests/test_queue.rs b/tokio/src/time/tests/test_queue.rs
deleted file mode 100644
index 34b9b7da..00000000
--- a/tokio/src/time/tests/test_queue.rs
+++ /dev/null
@@ -1,369 +0,0 @@
-#![warn(rust_2018_idioms)]
-
-use crate::time::tests::mock_clock::mock;
-use crate::time::{DelayQueue, Duration};
-use tokio_test::{assert_ok, assert_pending, assert_ready, task};
-
-macro_rules! poll {
- ($queue:ident) => {
- $queue.enter(|cx, mut queue| queue.poll_expired(cx))
- };
-}
-
-macro_rules! assert_ready_ok {
- ($e:expr) => {{
- assert_ok!(match assert_ready!($e) {
- Some(v) => v,
- None => panic!("None"),
- })
- }};
-}
-
-#[test]
-fn single_immediate_delay() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
- let _key = queue.insert_at("foo", clock.now());
-
- let entry = assert_ready_ok!(poll!(queue));
- assert_eq!(*entry.get_ref(), "foo");
-
- let entry = assert_ready!(poll!(queue));
- assert!(entry.is_none())
- });
-}
-
-#[test]
-fn multi_immediate_delays() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- let _k = queue.insert_at("1", clock.now());
- let _k = queue.insert_at("2", clock.now());
- let _k = queue.insert_at("3", clock.now());
-
- let mut res = vec![];
-
- while res.len() < 3 {
- let entry = assert_ready_ok!(poll!(queue));
- res.push(entry.into_inner());
- }
-
- let entry = assert_ready!(poll!(queue));
- assert!(entry.is_none());
-
- res.sort();
-
- assert_eq!("1", res[0]);
- assert_eq!("2", res[1]);
- assert_eq!("3", res[2]);
- });
-}
-
-#[test]
-fn single_short_delay() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
- let _key = queue.insert_at("foo", clock.now() + ms(5));
-
- assert_pending!(poll!(queue));
-
- clock.turn_for(ms(1));
-
- assert!(!queue.is_woken());
-
- clock.turn_for(ms(5));
-
- assert!(queue.is_woken());
-
- let entry = assert_ready_ok!(poll!(queue));
- assert_eq!(*entry.get_ref(), "foo");
-
- let entry = assert_ready!(poll!(queue));
- assert!(entry.is_none());
- });
-}
-
-#[test]
-fn multi_delay_at_start() {
- let long = 262_144 + 9 * 4096;
- let delays = &[1000, 2, 234, long, 60, 10];
-
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- // Setup the delays
- for &i in delays {
- let _key = queue.insert_at(i, clock.now() + ms(i));
- }
-
- assert_pending!(poll!(queue));
- assert!(!queue.is_woken());
-
- for elapsed in 0..1200 {
- clock.turn_for(ms(1));
- let elapsed = elapsed + 1;
-
- if delays.contains(&elapsed) {
- assert!(queue.is_woken());
- assert_ready!(poll!(queue));
- assert_pending!(poll!(queue));
- } else if queue.is_woken() {
- let cascade = &[192, 960];
- assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
-
- assert_pending!(poll!(queue));
- }
- }
- });
-}
-
-#[test]
-fn insert_in_past_fires_immediately() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- let now = clock.now();
-
- clock.turn_for(ms(10));
-
- queue.insert_at("foo", now);
-
- assert_ready!(poll!(queue));
- });
-}
-
-#[test]
-fn remove_entry() {
- mock(|clock| {
- let mut queue = task::spawn(DelayQueue::new());
-
- let key = queue.insert_at("foo", clock.now() + ms(5));
-
- assert_pending!(poll!(queue));
-
- let entry = queue.remove(&key);
- assert_eq!(entry.into_inner(), "foo");
-
- clock.turn_for(ms(10));
-
- let entry = assert_ready!(poll!(queue));