diff options
Diffstat (limited to 'tokio/tests/timer_queue.rs')
-rw-r--r-- | tokio/tests/timer_queue.rs | 401 |
1 files changed, 401 insertions, 0 deletions
diff --git a/tokio/tests/timer_queue.rs b/tokio/tests/timer_queue.rs new file mode 100644 index 00000000..7e1d7f6a --- /dev/null +++ b/tokio/tests/timer_queue.rs @@ -0,0 +1,401 @@ +#![warn(rust_2018_idioms)] + +use tokio::timer::*; +use tokio_test::task::MockTask; +use tokio_test::{assert_ok, assert_pending, assert_ready, clock}; + +use std::time::Duration; + +macro_rules! poll { + ($task:ident, $queue:ident) => { + $task.enter(|cx| $queue.poll_next(cx)) + }; +} + +macro_rules! assert_ready_ok { + ($e:expr) => {{ + assert_ok!(match assert_ready!($e) { + Some(v) => v, + None => panic!("None"), + }) + }}; +} + +#[test] +fn single_immediate_delay() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + let _key = queue.insert_at("foo", clock.now()); + + let entry = assert_ready_ok!(poll!(t, queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(t, queue)); + assert!(entry.is_none()) + }); +} + +#[test] +fn multi_immediate_delays() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let _k = queue.insert_at("1", clock.now()); + let _k = queue.insert_at("2", clock.now()); + let _k = queue.insert_at("3", clock.now()); + + let mut res = vec![]; + + while res.len() < 3 { + let entry = assert_ready_ok!(poll!(t, queue)); + res.push(entry.into_inner()); + } + + let entry = assert_ready!(poll!(t, queue)); + assert!(entry.is_none()); + + res.sort(); + + assert_eq!("1", res[0]); + assert_eq!("2", res[1]); + assert_eq!("3", res[2]); + }); +} + +#[test] +fn single_short_delay() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + let _key = queue.insert_at("foo", clock.now() + ms(5)); + + assert_pending!(poll!(t, queue)); + + clock.turn_for(ms(1)); + + assert!(!t.is_woken()); + + clock.turn_for(ms(5)); + + assert!(t.is_woken()); + + let entry = assert_ready_ok!(poll!(t, queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(t, queue)); + assert!(entry.is_none()); + }); +} + +#[test] +fn multi_delay_at_start() { + let long = 262_144 + 9 * 4096; + let delays = &[1000, 2, 234, long, 60, 10]; + + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + // Setup the delays + for &i in delays { + let _key = queue.insert_at(i, clock.now() + ms(i)); + } + + assert_pending!(poll!(t, queue)); + assert!(!t.is_woken()); + + for elapsed in 0..1200 { + clock.turn_for(ms(1)); + let elapsed = elapsed + 1; + + if delays.contains(&elapsed) { + assert!(t.is_woken()); + assert_ready!(poll!(t, queue)); + assert_pending!(poll!(t, queue)); + } else { + if t.is_woken() { + let cascade = &[192, 960]; + assert!(cascade.contains(&elapsed), "elapsed={}", elapsed); + + assert_pending!(poll!(t, queue)); + } + } + } + }); +} + +#[test] +fn insert_in_past_fires_immediately() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let now = clock.now(); + + clock.turn_for(ms(10)); + + queue.insert_at("foo", now); + + assert_ready!(poll!(t, queue)); + }); +} + +#[test] +fn remove_entry() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let key = queue.insert_at("foo", clock.now() + ms(5)); + + assert_pending!(poll!(t, queue)); + + let entry = queue.remove(&key); + assert_eq!(entry.into_inner(), "foo"); + + clock.turn_for(ms(10)); + + let entry = assert_ready!(poll!(t, queue)); + assert!(entry.is_none()); + }); +} + +#[test] +fn reset_entry() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let now = clock.now(); + let key = queue.insert_at("foo", now + ms(5)); + + assert_pending!(poll!(t, queue)); + clock.turn_for(ms(1)); + + queue.reset_at(&key, now + ms(10)); + + assert_pending!(poll!(t, queue)); + + clock.turn_for(ms(7)); + + assert!(!t.is_woken()); + + assert_pending!(poll!(t, queue)); + + clock.turn_for(ms(3)); + + assert!(t.is_woken()); + + let entry = assert_ready_ok!(poll!(t, queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(t, queue)); + assert!(entry.is_none()) + }); +} + +#[test] +fn reset_much_later() { + let mut t = MockTask::new(); + + // Reproduces tokio-rs/tokio#849. + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + clock.turn_for(ms(1)); + + let key = queue.insert_at("foo", epoch + ms(200)); + + assert_pending!(poll!(t, queue)); + + clock.turn_for(ms(3)); + + queue.reset_at(&key, epoch + ms(5)); + + clock.turn_for(ms(20)); + + assert!(t.is_woken()); + }); +} + +#[test] +fn reset_twice() { + let mut t = MockTask::new(); + + // Reproduces tokio-rs/tokio#849. + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + clock.turn_for(ms(1)); + + let key = queue.insert_at("foo", epoch + ms(200)); + + assert_pending!(poll!(t, queue)); + + clock.turn_for(ms(3)); + + queue.reset_at(&key, epoch + ms(50)); + + clock.turn_for(ms(20)); + + queue.reset_at(&key, epoch + ms(40)); + + clock.turn_for(ms(20)); + + assert!(t.is_woken()); + }); +} + +#[test] +fn remove_expired_item() { + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let now = clock.now(); + + clock.turn_for(ms(10)); + + let key = queue.insert_at("foo", now); + + let entry = queue.remove(&key); + assert_eq!(entry.into_inner(), "foo"); + }) +} + +#[test] +fn expires_before_last_insert() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + queue.insert_at("foo", epoch + ms(10_000)); + + // Delay should be set to 8.192s here. + assert_pending!(poll!(t, queue)); + + // Delay should be set to the delay of the new item here + queue.insert_at("bar", epoch + ms(600)); + + assert_pending!(poll!(t, queue)); + + clock.advance(ms(600)); + + assert!(t.is_woken()); + + let entry = assert_ready_ok!(poll!(t, queue)).into_inner(); + assert_eq!(entry, "bar"); + }) +} + +#[test] +fn multi_reset() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + let foo = queue.insert_at("foo", epoch + ms(200)); + let bar = queue.insert_at("bar", epoch + ms(250)); + + assert_pending!(poll!(t, queue)); + + queue.reset_at(&foo, epoch + ms(300)); + queue.reset_at(&bar, epoch + ms(350)); + queue.reset_at(&foo, epoch + ms(400)); + }) +} + +#[test] +fn expire_first_key_when_reset_to_expire_earlier() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + let foo = queue.insert_at("foo", epoch + ms(200)); + queue.insert_at("bar", epoch + ms(250)); + + assert_pending!(poll!(t, queue)); + + queue.reset_at(&foo, epoch + ms(100)); + + clock.advance(ms(100)); + + assert!(t.is_woken()); + + let entry = assert_ready_ok!(poll!(t, queue)).into_inner(); + assert_eq!(entry, "foo"); + }) +} + +#[test] +fn expire_second_key_when_reset_to_expire_earlier() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + queue.insert_at("foo", epoch + ms(200)); + let bar = queue.insert_at("bar", epoch + ms(250)); + + assert_pending!(poll!(t, queue)); + + queue.reset_at(&bar, epoch + ms(100)); + + clock.advance(ms(100)); + + assert!(t.is_woken()); + let entry = assert_ready_ok!(poll!(t, queue)).into_inner(); + assert_eq!(entry, "bar"); + }) +} + +#[test] +fn reset_first_expiring_item_to_expire_later() { + let mut t = MockTask::new(); + + clock::mock(|clock| { + let mut queue = DelayQueue::new(); + + let epoch = clock.now(); + + let foo = queue.insert_at("foo", epoch + ms(200)); + let _bar = queue.insert_at("bar", epoch + ms(250)); + + assert_pending!(poll!(t, queue)); + + queue.reset_at(&foo, epoch + ms(300)); + clock.advance(ms(250)); + + assert!(t.is_woken()); + + let entry = assert_ready_ok!(poll!(t, queue)).into_inner(); + assert_eq!(entry, "bar"); + }) +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} |