diff options
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/runtime/context.rs | 12 | ||||
-rw-r--r-- | tokio/src/time/clock.rs | 14 | ||||
-rw-r--r-- | tokio/src/time/driver/entry.rs | 4 | ||||
-rw-r--r-- | tokio/src/time/driver/mod.rs | 20 | ||||
-rw-r--r-- | tokio/src/time/driver/registration.rs | 6 | ||||
-rw-r--r-- | tokio/src/time/tests/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/time/tests/test_delay.rs | 567 | ||||
-rw-r--r-- | tokio/src/time/tests/test_queue.rs | 369 | ||||
-rw-r--r-- | tokio/tests/time_delay.rs | 160 | ||||
-rw-r--r-- | tokio/tests/time_delay_queue.rs | 389 |
10 files changed, 849 insertions, 694 deletions
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 6ac60b07..d3bd6982 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -81,18 +81,6 @@ impl ThreadContext { }) } - #[cfg(all(feature = "test-util", feature = "time", test))] - pub(crate) fn with_time_handle(mut self, handle: crate::runtime::time::Handle) -> Self { - self.time_handle = handle; - self - } - - #[cfg(all(feature = "test-util", feature = "time", test))] - pub(crate) fn with_clock(mut self, clock: crate::runtime::time::Clock) -> Self { - self.clock.replace(clock); - self - } - #[cfg(all(feature = "io-driver", not(loom)))] pub(crate) fn io_handle() -> crate::runtime::io::Handle { CONTEXT.with(|ctx| match *ctx.borrow() { diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 5ece7f5a..ae75740c 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -5,7 +5,7 @@ //! configurable. cfg_not_test_util! { - use crate::time::Instant; + use crate::time::{Duration, Instant}; #[derive(Debug, Clone)] pub(crate) struct Clock {} @@ -22,6 +22,14 @@ cfg_not_test_util! { pub(crate) fn now(&self) -> Instant { now() } + + pub(crate) fn is_frozen(&self) -> bool { + false + } + + pub(crate) fn advance(&self, _dur: Duration) { + unreachable!(); + } } } @@ -137,6 +145,10 @@ cfg_test_util! { } } + pub(crate) fn is_frozen(&self) -> bool { + self.inner.frozen.lock().unwrap().is_some() + } + pub(crate) fn advance(&self, duration: Duration) { let mut frozen = self.inner.frozen.lock().unwrap(); diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index d5fab897..079ec7e8 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -104,8 +104,8 @@ const ERROR: u64 = u64::MAX; // ===== impl Entry ===== impl Entry { - pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> { - let inner = Handle::current().inner().unwrap(); + pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> { + let inner = handle.inner().unwrap(); let entry: Entry; // Increment the number of active timeouts diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 605d9bca..f74a4853 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -4,7 +4,7 @@ mod atomic_stack; use self::atomic_stack::AtomicStack; mod entry; -use self::entry::Entry; +pub(super) use self::entry::Entry; mod handle; pub(crate) use self::handle::Handle; @@ -245,7 +245,14 @@ where let deadline = self.expiration_instant(when); if deadline > now { - self.park.park_timeout(deadline - now)?; + let dur = deadline - now; + + if self.clock.is_frozen() { + self.park.park_timeout(Duration::from_secs(0))?; + self.clock.advance(dur); + } else { + self.park.park_timeout(dur)?; + } } else { self.park.park_timeout(Duration::from_secs(0))?; } @@ -269,7 +276,14 @@ where let deadline = self.expiration_instant(when); if deadline > now { - self.park.park_timeout(cmp::min(deadline - now, duration))?; + let duration = cmp::min(deadline - now, duration); + + if self.clock.is_frozen() { + self.park.park_timeout(Duration::from_secs(0))?; + self.clock.advance(duration); + } else { + self.park.park_timeout(duration)?; + } } else { self.park.park_timeout(Duration::from_secs(0))?; } diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs index 728d2993..141ca015 100644 --- a/tokio/src/time/driver/registration.rs +++ b/tokio/src/time/driver/registration.rs @@ -1,4 +1,4 @@ -use crate::time::driver::Entry; +use crate::time::driver::{Entry, Handle}; use crate::time::{Duration, Error, Instant}; use std::sync::Arc; @@ -15,8 +15,10 @@ pub(crate) struct Registration { impl Registration { pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { + let handle = Handle::current(); + Registration { - entry: Entry::new(deadline, duration), + entry: Entry::new(&handle, deadline, duration), } } diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs index 6a9f25da..4710d470 100644 --- a/tokio/src/time/tests/mod.rs +++ b/tokio/src/time/tests/mod.rs @@ -1,6 +1,4 @@ -mod mock_clock; mod test_delay; -mod test_queue; use crate::time::{self, Instant}; use std::time::Duration; diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs index 43bfe379..388f9b8b 100644 --- a/tokio/src/time/tests/test_delay.rs +++ b/tokio/src/time/tests/test_delay.rs @@ -1,461 +1,422 @@ #![warn(rust_2018_idioms)] -use crate::time::tests::mock_clock::mock; -use crate::time::{delay_until, Duration, Instant}; +use crate::park::{Park, Unpark}; +use crate::time::driver::{Driver, Entry, Handle}; +use crate::time::Clock; +use crate::time::{Duration, Instant}; + use tokio_test::task; -use tokio_test::{assert_pending, assert_ready}; +use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; + +use std::sync::Arc; + +macro_rules! poll { + ($e:expr) => { + $e.enter(|cx, e| e.poll_elapsed(cx)) + }; +} #[test] fn immediate_delay() { - mock(|clock| { - // Create `Delay` that elapsed immediately. - let mut fut = task::spawn(delay_until(clock.now())); + let (mut driver, clock, handle) = setup(); + + let when = clock.now(); + let mut e = task::spawn(delay_until(&handle, when)); - // Ready! - assert_ready!(fut.poll()); + assert_ready_ok!(poll!(e)); - // Turn the timer, it runs for the elapsed time - clock.turn_for(ms(1000)); + assert_ok!(driver.park_timeout(Duration::from_millis(1000))); - // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.advanced(), ms(1000)); - }); + // The time has not advanced. The `turn` completed immediately. + assert_eq!(clock.advanced(), ms(1000)); } #[test] fn delayed_delay_level_0() { + let (mut driver, clock, handle) = setup(); + + let start = clock.now(); + for &i in &[1, 10, 60] { - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(i))); + // Create a `Delay` that elapses in the future + let mut e = task::spawn(delay_until(&handle, start + ms(i))); - // The delay has not elapsed. - assert_pending!(fut.poll()); + // The delay has not elapsed. + assert_pending!(poll!(e)); - clock.turn(); - assert_eq!(clock.advanced(), ms(i)); + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(i)); - assert_ready!(fut.poll()); - }); + assert_ready_ok!(poll!(e)); } } #[test] fn sub_ms_delayed_delay() { - mock(|clock| { - for _ in 0..5 { - let deadline = clock.now() + Duration::from_millis(1) + Duration::new(0, 1); + let (mut driver, clock, handle) = setup(); - let mut fut = task::spawn(delay_until(deadline)); + for _ in 0..5 { + let deadline = clock.now() + ms(1) + Duration::new(0, 1); - assert_pending!(fut.poll()); + let mut e = task::spawn(delay_until(&handle, deadline)); - clock.turn(); - assert_ready!(fut.poll()); + assert_pending!(poll!(e)); - assert!(clock.now() >= deadline); + assert_ok!(driver.park()); + assert_ready_ok!(poll!(e)); - clock.advance(Duration::new(0, 1)); - } - }); + assert!(clock.now() >= deadline); + + clock.advance(Duration::new(0, 1)); + } } #[test] fn delayed_delay_wrapping_level_0() { - mock(|clock| { - clock.turn_for(ms(5)); - assert_eq!(clock.advanced(), ms(5)); + let (mut driver, clock, handle) = setup(); + + assert_ok!(driver.park_timeout(ms(5))); + assert_eq!(clock.advanced(), ms(5)); - let mut fut = task::spawn(delay_until(clock.now() + ms(60))); + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(60))); - assert_pending!(fut.poll()); + assert_pending!(poll!(e)); - clock.turn(); - assert_eq!(clock.advanced(), ms(64)); - assert_pending!(fut.poll()); + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(64)); + assert_pending!(poll!(e)); - clock.turn(); - assert_eq!(clock.advanced(), ms(65)); + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(65)); - assert_ready!(fut.poll()); - }); + assert_ready_ok!(poll!(e)); } #[test] fn timer_wrapping_with_higher_levels() { - mock(|clock| { - // Set delay to hit level 1 - let mut s1 = task::spawn(delay_until(clock.now() + ms(64))); - assert_pending!(s1.poll()); + let (mut driver, clock, handle) = setup(); - // Turn a bit - clock.turn_for(ms(5)); + // Set delay to hit level 1 + let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(64))); + assert_pending!(poll!(e1)); - // Set timeout such that it will hit level 0, but wrap - let mut s2 = task::spawn(delay_until(clock.now() + ms(60))); - assert_pending!(s2.poll()); + // Turn a bit + assert_ok!(driver.park_timeout(ms(5))); - // This should result in s1 firing - clock.turn(); - assert_eq!(clock.advanced(), ms(64)); + // Set timeout such that it will hit level 0, but wrap + let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(60))); + assert_pending!(poll!(e2)); - assert_ready!(s1.poll()); - assert_pending!(s2.poll()); + // This should result in s1 firing + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(64)); - clock.turn(); - assert_eq!(clock.advanced(), ms(65)); + assert_ready_ok!(poll!(e1)); + assert_pending!(poll!(e2)); - assert_ready!(s2.poll()); - }); + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(65)); + + assert_ready_ok!(poll!(e1)); } #[test] fn delay_with_deadline_in_past() { - mock(|clock| { - // Create `Delay` that elapsed immediately. - let mut fut = task::spawn(delay_until(clock.now() - ms(100))); + let (mut driver, clock, handle) = setup(); + + // Create `Delay` that elapsed immediately. + let mut e = task::spawn(delay_until(&handle, clock.now() - ms(100))); - // Even though the delay expires in the past, it is not ready yet - // because the timer must observe it. - assert_ready!(fut.poll()); + // Even though the delay expires in the past, it is not ready yet + // because the timer must observe it. + assert_ready_ok!(poll!(e)); - // Turn the timer, it runs for the elapsed time - clock.turn_for(ms(1000)); + // Turn the timer, it runs for the elapsed time + assert_ok!(driver.park_timeout(ms(1000))); - // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.advanced(), ms(1000)); - }); + // The time has not advanced. The `turn` completed immediately. + assert_eq!(clock.advanced(), ms(1000)); } #[test] fn delayed_delay_level_1() { - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(234))); + let (mut driver, clock, handle) = setup(); - // The delay has not elapsed. - assert_pending!(fut.poll()); + // Create a `Delay` that elapses in the future + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234))); - // Turn the timer, this will wake up to cascade the timer down. - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(192)); + // The delay has not elapsed. + assert_pending!(poll!(e)); - // The delay has not elapsed. - assert_pending!(fut.poll()); + // Turn the timer, this will wake up to cascade the timer down. + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.advanced(), ms(192)); - // Turn the timer again - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(234)); + // The delay has not elapsed. + assert_pending!(poll!(e)); - // The delay has elapsed. - assert_ready!(fut.poll()); - }); + // Turn the timer again + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.advanced(), ms(234)); - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(234))); + // The delay has elapsed. + assert_ready_ok!(poll!(e)); - // The delay has not elapsed. - assert_pending!(fut.poll()); + let (mut driver, clock, handle) = setup(); - // Turn the timer with a smaller timeout than the cascade. - clock.turn_for(ms(100)); - assert_eq!(clock.advanced(), ms(100)); + // Create a `Delay` that elapses in the future + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234))); - assert_pending!(fut.poll()); + // The delay has not elapsed. + assert_pending!(poll!(e)); - // Turn the timer, this will wake up to cascade the timer down. - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(192)); + // Turn the timer with a smaller timeout than the cascade. + assert_ok!(driver.park_timeout(ms(100))); + assert_eq!(clock.advanced(), ms(100)); - // The delay has not elapsed. - assert_pending!(fut.poll()); + assert_pending!(poll!(e)); - // Turn the timer again - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(234)); + // Turn the timer, this will wake up to cascade the timer down. + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.advanced(), ms(192)); - // The delay has elapsed. - assert_ready!(fut.poll()); - }); -} + // The delay has not elapsed. + assert_pending!(poll!(e)); -#[test] -#[should_panic] -fn creating_delay_outside_of_context() { - let now = Instant::now(); + // Turn the timer again + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.advanced(), ms(234)); - // This creates a delay outside of the context of a mock timer. This tests - // that it will panic. - let _fut = task::spawn(delay_until(now + ms(500))); + // The delay has elapsed. + assert_ready_ok!(poll!(e)); } #[test] fn concurrently_set_two_timers_second_one_shorter() { - mock(|clock| { - let mut fut1 = task::spawn(delay_until(clock.now() + ms(500))); - let mut fut2 = task::spawn(delay_until(clock.now() + ms(200))); + let (mut driver, clock, handle) = setup(); + + let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(500))); + let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(200))); - // The delay has not elapsed - assert_pending!(fut1.poll()); - assert_pending!(fut2.poll()); + // The delay has not elapsed + assert_pending!(poll!(e1)); + assert_pending!(poll!(e2)); - // Delay until a cascade - clock.turn(); - assert_eq!(clock.advanced(), ms(192)); + // Delay until a cascade + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(192)); - // Delay until the second timer. - clock.turn(); - assert_eq!(clock.advanced(), ms(200)); + // Delay until the second timer. + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(200)); - // The shorter delay fires - assert_ready!(fut2.poll()); - assert_pending!(fut1.poll()); + // The shorter delay fires + assert_ready_ok!(poll!(e2)); + assert_pending!(poll!(e1)); - clock.turn(); - assert_eq!(clock.advanced(), ms(448)); + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(448)); - assert_pending!(fut1.poll()); + assert_pending!(poll!(e1)); - // Turn again, this time the time will advance to the second delay - clock.turn(); - assert_eq!(clock.advanced(), ms(500)); + // Turn again, this time the time will advance to the second delay + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(500)); - assert_ready!(fut1.poll()); - }) + assert_ready_ok!(poll!(e1)); } #[test] fn short_delay() { - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(1))); + let (mut driver, clock, handle) = setup(); - // The delay has not elapsed. - assert_pending!(fut.poll()); + // Create a `Delay` that elapses in the future + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1))); + + // The delay has not elapsed. + assert_pending!(poll!(e)); - // Turn the timer, but not enough time will go by. - clock.turn(); + // Turn the timer, but not enough time will go by. + assert_ok!(driver.park()); - // The delay has elapsed. - assert_ready!(fut.poll()); + // The delay has elapsed. + assert_ready_ok!(poll!(e)); - // The time has advanced to the point of the delay elapsing. - assert_eq!(clock.advanced(), ms(1)); - }) + // The time has advanced to the point of the delay elapsing. + assert_eq!(clock.advanced(), ms(1)); } #[test] fn sorta_long_delay_until() { const MIN_5: u64 = 5 * 60 * 1000; - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(MIN_5))); + let (mut driver, clock, handle) = setup(); - // The delay has not elapsed. - assert_pending!(fut.poll()); + // Create a `Delay` that elapses in the future + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MIN_5))); - let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64]; + // The delay has not elapsed. + assert_pending!(poll!(e)); - for &elapsed in cascades { - clock.turn(); - assert_eq!(clock.advanced(), ms(elapsed)); + let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64]; - assert_pending!(fut.poll()); - } + for &elapsed in cascades { + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(elapsed)); + + assert_pending!(poll!(e)); + } - clock.turn(); - assert_eq!(clock.advanced(), ms(MIN_5)); + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(MIN_5)); - // The delay has elapsed. - assert_ready!(fut.poll()); - }) + // The delay has elapsed. + assert_ready_ok!(poll!(e)); } #[test] fn very_long_delay() { const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(MO_5))); + let (mut driver, clock, handle) = setup(); - // The delay has not elapsed. - assert_pending!(fut.poll()); + // Create a `Delay` that elapses in the future + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MO_5))); - let cascades = &[ - 12_884_901_888, - 12_952_010_752, - 12_959_875_072, - 12_959_997_952, - ]; + // The delay has not elapsed. + assert_pending!(poll!(e)); - for &elapsed in cascades { - clock.turn(); - assert_eq!(clock.advanced(), ms(elapsed)); + let cascades = &[ + 12_884_901_888, + 12_952_010_752, + 12_959_875_072, + 12_959_997_952, + ]; - assert_pending!(fut.poll()); - } - - // Turn the timer, but not enough time will go by. - clock.turn(); + for &elapsed in cascades { + assert_ok!(driver.park()); + assert_eq!(clock.advanced(), ms(elapsed)); - // The time has advanced to the point of the delay elapsing. - assert_eq!(clock.advanced(), ms(MO_5)); - - // The delay has elapsed. - assert_ready!(fut.poll()); - }) -} - -#[test] -#[should_panic] -fn greater_than_max() { - const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000; - - mock(|clock| { - // Create a `Delay` that elapses in the future - let mut fut = task::spawn(delay_until(clock.now() + ms(YR_5))); + assert_pending!(poll!(e)); + } - assert_pending!(fut.poll()); + // Turn the timer, but not enough time will go by. + assert_ok!(driver.park()); - clock.turn_for(ms(0)); + // The time has advanced to the point of the delay elapsing. + assert_eq!(clock.advanced(), ms(MO_5)); - // boom - let _ = fut.poll(); - }) + // The delay has elapsed. + assert_ready_ok!(poll!(e)); } #[test] fn unpark_is_delayed() { - mock(|clock| { - let mut fut1 = task::spawn(delay_until(clock.now() + ms(100))); - let mut fut2 = task::spawn(delay_until(clock.now() + ms(101))); - let mut fut3 = task::spawn(delay_until(clock.now() + ms(200))); + // A special park that will take much longer than the requested duration + struct MockPark(Clock); - assert_pending!(fut1.poll()); - assert_pending!(fut2.poll()); - assert_pending!(fut3.poll()); + struct MockUnpark; - clock.park_for(ms(500)); + impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); - assert_eq!(clock.advanced(), ms(500)); - - assert_ready!(fut1.poll()); - assert_ready!(fut2.poll()); - assert_ready!(fut3.poll()); - }) -} - -#[test] -fn set_timeout_at_deadline_greater_than_max_timer() { - const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; - const YR_5: u64 = 5 * YR_1; - - mock(|clock| { - for _ in 0..5 { - clock.turn_for(ms(YR_1)); + fn unpark(&self) -> Self::Unpark { + MockUnpark } - let mut fut = task::spawn(delay_until(clock.now() + ms(1))); - assert_pending!(fut.poll()); - - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(YR_5) + ms(1)); + fn park(&mut self) -> Result<(), Self::Error> { + panic!("parking forever"); + } - assert_ready!(fut.poll()); - }); -} + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + assert_eq!(duration, ms(0)); + self.0.advance(ms(436)); + Ok(()) + } + } -#[test] -fn reset_future_delay_before_fire() { - mock(|clock| { - let mut fut = task::spawn(delay_until(clock.now() + ms(100))); + impl Unpark for MockUnpark { + fn unpark(&self) {} + } - assert_pending!(fut.poll()); + let clock = Clock::new_frozen(); + let mut driver = Driver::new(MockPark(clock.clone()), clock.clone()); + let handle = driver.handle(); - fut.reset(clock.now() + ms(200)); + let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(100))); + let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(101))); + let mut e3 = task::spawn(delay_until(&handle, clock.now() + ms(200))); - clock.turn(); - assert_eq!(clock.advanced(), ms(192)); + assert_pending!(poll!(e1)); + assert_pending!(poll!(e2)); + assert_pending!(poll!(e3)); - assert_pending!(fut.poll()); + assert_ok!(driver.park()); - clock.turn(); - assert_eq!(clock.advanced(), ms(200)); + assert_eq!(clock.advanced(), ms(500)); - assert_ready!(fut.poll()); - }); + assert_ready_ok!(poll!(e1)); + assert_ready_ok!(poll!(e2)); + assert_ready_ok!(poll!(e3)); } #[test] -fn reset_past_delay_before_turn() { - mock(|clock| { - let mut fut = task::spawn(delay_until(clock.now() + ms(100))); - - assert_pending!(fut.poll()); +fn set_timeout_at_deadline_greater_than_max_timer() { + const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; + const YR_5: u64 = 5 * YR_1; - fut.reset(clock.now() + ms(80)); + let (mut driver, clock, handle) = setup(); - clock.turn(); - assert_eq!(clock.advanced(), ms(64)); + for _ in 0..5 { + assert_ok!(driver.park_timeout(ms(YR_1))); + } - assert_pending!(fut.poll()); + let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1))); + assert_pending!(poll!(e)); - clock.turn(); - assert_eq!(clock.advanced(), ms(80)); + assert_ok!(driver.park_timeout(ms(1000))); + assert_eq!(clock.advanced(), ms(YR_5) + ms(1)); - assert_ready!(fut.poll()); - }); + assert_ready_ok!(poll!(e)); } -#[test] -fn reset_past_delay_before_fire() { - mock(|clock| { - let mut fut = task::spawn(delay_until(clock.now() + ms(100))); - - assert_pending!(fut.poll()); - clock.turn_for(ms(10)); - - assert_pending!(fut.poll()); - fut.reset(clock.now() + ms(80)); +fn setup() -> (Driver<MockPark>, Clock, Handle) { + let clock = Clock::new_frozen(); + let driver = Driver::new(MockPark(clock.clone()), clock.clone()); + let handle = driver.handle(); - clock.turn(); - assert_eq!(clock.advanced(), ms(64)); - - assert_pending!(fut.poll()); - - clock.turn(); - assert_eq!(clock.advanced(), ms(90)); - - assert_ready!(fut.poll()); - }); + (driver, clock, handle) } -#[test] -fn reset_future_delay_after_fire() { - mock(|clock| { - let mut fut = task::spawn(delay_until(clock.now() + ms(100))); +fn delay_until(handle: &Handle, when: Instant) -> Arc<Entry> { + Entry::new(&handle, when, ms(0)) +} - assert_pending!(fut.poll()); +struct MockPark(Clock); - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(64)); +struct MockUnpark; - clock.turn(); - assert_eq!(clock.advanced(), ms(100)); +impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); - assert_ready!(fut.poll()); + fn unpark(&self) -> Self::Unpark { + MockUnpark + } - fut.reset(clock.now() + ms(10)); - assert_pending!(fut.poll()); + fn park(&mut self) -> Result<(), Self::Error> { + panic!("parking forever"); + } - clock.turn_for(ms(1000)); - assert_eq!(clock.advanced(), ms(110)); + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.0.advance(duration); + Ok(()) + } +} - assert_ready!(fut.poll()); - }); +impl Unpark for MockUnpark { + fn unpark(&self) {} } fn ms(n: u64) -> Duration { diff --git a/tokio/src/time/tests/test_queue.rs b/tokio/src/time/tests/test_queue.rs deleted file mode 100644 index 34b9b7da..00000000 --- a/tokio/src/time/tests/test_queue.rs +++ /dev/null @@ -1,369 +0,0 @@ -#![warn(rust_2018_idioms)] - -use crate::time::tests::mock_clock::mock; -use crate::time::{DelayQueue, Duration}; -use tokio_test::{assert_ok, assert_pending, assert_ready, task}; - -macro_rules! poll { - ($queue:ident) => { - $queue.enter(|cx, mut queue| queue.poll_expired(cx)) - }; -} - -macro_rules! assert_ready_ok { - ($e:expr) => {{ - assert_ok!(match assert_ready!($e) { - Some(v) => v, - None => panic!("None"), - }) - }}; -} - -#[test] -fn single_immediate_delay() { - mock(|clock| { - let mut queue = task::spawn(DelayQueue::new()); - let _key = queue.insert_at("foo", clock.now()); - - let entry = assert_ready_ok!(poll!(queue)); - assert_eq!(*entry.get_ref(), "foo"); - - let entry = assert_ready!(poll!(queue)); - assert!(entry.is_none()) - }); -} - -#[test] -fn multi_immediate_delays() { - mock(|clock| { - let mut queue = task::spawn(DelayQueue::new()); - - let _k = queue.insert_at("1", clock.now()); - let _k = queue.insert_at("2", clock.now()); - let _k = queue.insert_at("3", clock.now()); - - let mut res = vec![]; - - while res.len() < 3 { - let entry = assert_ready_ok!(poll!(queue)); - res.push(entry.into_inner()); - } - - let entry = assert_ready!(poll!(queue)); - assert!(entry.is_none()); - - res.sort(); - - assert_eq!("1", res[0]); - assert_eq!("2", res[1]); - assert_eq!("3", res[2]); - }); -} - -#[test] -fn single_short_delay() { - mock(|clock| { - let mut queue = task::spawn(DelayQueue::new()); - let _key = queue.insert_at("foo", clock.now() + ms(5)); - - assert_pending!(poll!(queue)); - - clock.turn_for(ms(1)); - - assert!(!queue.is_woken()); - - clock.turn_for(ms(5)); - - assert!(queue.is_woken()); - - let entry = assert_ready_ok!(poll!(queue)); - assert_eq!(*entry.get_ref(), "foo"); - - let entry = assert_ready!(poll!(queue)); - assert!(entry.is_none()); - }); -} - -#[test] -fn multi_delay_at_start() { - let long = 262_144 + 9 * 4096; - let delays = &[1000, 2, 234, long, 60, 10]; - - mock(|clock| { - let mut queue = task::spawn(DelayQueue::new()); - - // Setup the delays - for &i in delays { - let _key = queue.insert_at(i, clock.now() + ms(i)); - } - - assert_pending!(poll!(queue)); - assert!(!queue.is_woken()); - - for elapsed in 0..1200 { - clock.turn_for(ms(1)); - let elapsed = elapsed + 1; - - if delays.contains(&elapsed) { - assert!(queue.is_woken()); - assert_ready!(poll!(queue)); - assert_pending!(poll!(queue)); - } else if queue.is_woken() { - let cascade = &[192, 960]; - assert!(cascade.contains(&elapsed), "elapsed={}", elapsed); - - assert_pending!(poll!(queue)); - } - } - }); -} - -#[test] -fn insert_in_past_fires_immediately() { - mock(|clock| { - let mut queue = task::spawn(DelayQueue::new()); - - let now = clock.now(); - - clock.turn_for(ms(10)); - - queue.insert_at("foo", now); - - assert_ready!(poll!(queue)); - }); -} - -#[test] -fn remove_entry() { - mock(|clock| { - let mut queue = task::spawn(DelayQueue::new()); - - let key = queue.insert_at("foo", clock.now() + ms(5)); - - assert_pending!(poll!(queue)); - - let entry = queue.remove(&key); - assert_eq!(entry.into_inner(), "foo"); - - clock.turn_for(ms(10)); - - let entry = assert_ready!(poll!(queue)); |