diff options
Diffstat (limited to 'tokio/src/time/driver/tests/mod.rs')
-rw-r--r-- | tokio/src/time/driver/tests/mod.rs | 246 |
1 files changed, 239 insertions, 7 deletions
diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index 88ff5525..e6af798f 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -1,18 +1,249 @@ -use crate::park::Unpark; -use crate::time::driver::Inner; -use crate::time::Instant; +use std::{task::Context, time::Duration}; -use loom::thread; +#[cfg(not(loom))] +use futures::task::noop_waker_ref; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use crate::loom::sync::{Arc, Mutex}; +use crate::loom::thread; +use crate::{ + loom::sync::atomic::{AtomicBool, Ordering}, + park::Unpark, +}; -struct MockUnpark; +use super::{Handle, TimerEntry}; +struct MockUnpark {} impl Unpark for MockUnpark { fn unpark(&self) {} } +impl MockUnpark { + fn mock() -> Box<dyn Unpark> { + Box::new(Self {}) + } +} + +fn block_on<T>(f: impl std::future::Future<Output = T>) -> T { + #[cfg(loom)] + return loom::future::block_on(f); + + #[cfg(not(loom))] + return futures::executor::block_on(f); +} + +fn model(f: impl Fn() + Send + Sync + 'static) { + #[cfg(loom)] + loom::model(f); + + #[cfg(not(loom))] + f(); +} + +#[test] +fn single_timer() { + model(|| { + let clock = crate::time::clock::Clock::new(); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(Mutex::new(inner))); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + }); + + thread::yield_now(); + + // This may or may not return Some (depending on how it races with the + // thread). If it does return None, however, the timer should complete + // synchronously. + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn drop_timer() { + model(|| { + let clock = crate::time::clock::Clock::new(); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(Mutex::new(inner))); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + }); + + thread::yield_now(); + + // advance 2s in the future. + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn change_waker() { + model(|| { + let clock = crate::time::clock::Clock::new(); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(Mutex::new(inner))); + + let handle_ = handle.clone(); + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + }); + + thread::yield_now(); + + // advance 2s + handle.process_at_time(time_source.now() + 2_000_000_000); + + jh.join().unwrap(); + }) +} + +#[test] +fn reset_future() { + model(|| { + let finished_early = Arc::new(AtomicBool::new(false)); + + let clock = crate::time::clock::Clock::new(); + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let handle = Handle::new(Arc::new(Mutex::new(inner))); + + let handle_ = handle.clone(); + let finished_early_ = finished_early.clone(); + let start = clock.now(); + + let jh = thread::spawn(move || { + let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); + pin!(entry); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + + entry.as_mut().reset(start + Duration::from_secs(2)); + + // shouldn't complete before 2s + block_on(futures::future::poll_fn(|cx| { + entry.as_mut().poll_elapsed(cx) + })) + .unwrap(); + + finished_early_.store(true, Ordering::Relaxed); + }); + + thread::yield_now(); + + // This may or may not return a wakeup time. + handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500))); + + assert!(!finished_early.load(Ordering::Relaxed)); + + handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500))); + + jh.join().unwrap(); + + assert!(finished_early.load(Ordering::Relaxed)); + }) +} + +#[test] +#[cfg(not(loom))] +fn poll_process_levels() { + let clock = crate::time::clock::Clock::new(); + clock.pause(); + + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source, MockUnpark::mock()); + let handle = Handle::new(Arc::new(Mutex::new(inner))); + + let mut entries = vec![]; + + for i in 0..1024 { + let mut entry = Box::pin(TimerEntry::new( + &handle, + clock.now() + Duration::from_millis(i), + )); + + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); + + entries.push(entry); + } + + for t in 1..1024 { + handle.process_at_time(t as u64); + for (deadline, future) in entries.iter_mut().enumerate() { + let mut context = Context::from_waker(noop_waker_ref()); + if deadline <= t { + assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); + } else { + assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); + } + } + } +} + +#[test] +#[cfg(not(loom))] +fn poll_process_levels_targeted() { + let mut context = Context::from_waker(noop_waker_ref()); + + let clock = crate::time::clock::Clock::new(); + clock.pause(); + + let time_source = super::ClockTime::new(clock.clone()); + + let inner = super::Inner::new(time_source, MockUnpark::mock()); + let handle = Handle::new(Arc::new(Mutex::new(inner))); + + let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); + pin!(e1); + + handle.process_at_time(62); + assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); + handle.process_at_time(192); + handle.process_at_time(192); +} +/* #[test] fn balanced_incr_and_decr() { const OPS: usize = 5; @@ -53,3 +284,4 @@ fn balanced_incr_and_decr() { assert_eq!(inner.num(Ordering::SeqCst), 0); }) } +*/ |