diff options
Diffstat (limited to 'tokio/src/executor/tests')
-rw-r--r-- | tokio/src/executor/tests/backoff.rs | 32 | ||||
-rw-r--r-- | tokio/src/executor/tests/loom_oneshot.rs | 49 | ||||
-rw-r--r-- | tokio/src/executor/tests/loom_schedule.rs | 51 | ||||
-rw-r--r-- | tokio/src/executor/tests/mock_park.rs | 66 | ||||
-rw-r--r-- | tokio/src/executor/tests/mock_schedule.rs | 131 | ||||
-rw-r--r-- | tokio/src/executor/tests/mod.rs | 40 | ||||
-rw-r--r-- | tokio/src/executor/tests/track_drop.rs | 57 |
7 files changed, 426 insertions, 0 deletions
diff --git a/tokio/src/executor/tests/backoff.rs b/tokio/src/executor/tests/backoff.rs new file mode 100644 index 00000000..358ab2da --- /dev/null +++ b/tokio/src/executor/tests/backoff.rs @@ -0,0 +1,32 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub(crate) struct Backoff(usize, bool); + +pub(crate) fn backoff(n: usize) -> impl Future<Output = ()> { + Backoff(n, false) +} + +/// Back off, but clone the waker each time +pub(crate) fn backoff_clone(n: usize) -> impl Future<Output = ()> { + Backoff(n, true) +} + +impl Future for Backoff { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.0 == 0 { + return Poll::Ready(()); + } + + self.0 -= 1; + if self.1 { + cx.waker().clone().wake(); + } else { + cx.waker().wake_by_ref(); + } + Poll::Pending + } +} diff --git a/tokio/src/executor/tests/loom_oneshot.rs b/tokio/src/executor/tests/loom_oneshot.rs new file mode 100644 index 00000000..c126fe47 --- /dev/null +++ b/tokio/src/executor/tests/loom_oneshot.rs @@ -0,0 +1,49 @@ +use loom::sync::Notify; + +use std::sync::{Arc, Mutex}; + +pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Inner { + notify: Notify::new(), + value: Mutex::new(None), + }); + + let tx = Sender { + inner: inner.clone(), + }; + let rx = Receiver { inner }; + + (tx, rx) +} + +pub(crate) struct Sender<T> { + inner: Arc<Inner<T>>, +} + +pub(crate) struct Receiver<T> { + inner: Arc<Inner<T>>, +} + +struct Inner<T> { + notify: Notify, + value: Mutex<Option<T>>, +} + +impl<T> Sender<T> { + pub(crate) fn send(self, value: T) { + *self.inner.value.lock().unwrap() = Some(value); + self.inner.notify.notify(); + } +} + +impl<T> Receiver<T> { + pub(crate) fn recv(self) -> T { + loop { + if let Some(v) = self.inner.value.lock().unwrap().take() { + return v; + } + + self.inner.notify.wait(); + } + } +} diff --git a/tokio/src/executor/tests/loom_schedule.rs b/tokio/src/executor/tests/loom_schedule.rs new file mode 100644 index 00000000..7999dd97 --- /dev/null +++ b/tokio/src/executor/tests/loom_schedule.rs @@ -0,0 +1,51 @@ +use crate::executor::task::{Schedule, Task}; + +use loom::sync::Notify; +use std::collections::VecDeque; +use std::sync::Mutex; + +pub(crate) struct LoomSchedule { + notify: Notify, + pending: Mutex<VecDeque<Option<Task<Self>>>>, +} + +impl LoomSchedule { + pub(crate) fn new() -> LoomSchedule { + LoomSchedule { + notify: Notify::new(), + pending: Mutex::new(VecDeque::new()), + } + } + + pub(crate) fn push_task(&self, task: Task<Self>) { + self.schedule(task); + } + + pub(crate) fn recv(&self) -> Option<Task<Self>> { + loop { + if let Some(task) = self.pending.lock().unwrap().pop_front() { + return task; + } + + self.notify.wait(); + } + } +} + +impl Schedule for LoomSchedule { + fn bind(&self, _task: &Task<Self>) {} + + fn release(&self, task: Task<Self>) { + self.release_local(&task); + } + + fn release_local(&self, _task: &Task<Self>) { + self.pending.lock().unwrap().push_back(None); + self.notify.notify(); + } + + fn schedule(&self, task: Task<Self>) { + self.pending.lock().unwrap().push_back(Some(task)); + self.notify.notify(); + } +} diff --git a/tokio/src/executor/tests/mock_park.rs b/tokio/src/executor/tests/mock_park.rs new file mode 100644 index 00000000..2bc9edf5 --- /dev/null +++ b/tokio/src/executor/tests/mock_park.rs @@ -0,0 +1,66 @@ +#![allow(warnings)] + +use crate::executor::park::{Park, Unpark}; + +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use std::sync::Arc; +use std::time::Duration; + +pub struct MockPark { + parks: HashMap<usize, Arc<Inner>>, +} + +#[derive(Clone)] +struct ParkImpl(Arc<Inner>); + +struct Inner { + unparked: AtomicBool, +} + +impl MockPark { + pub fn new() -> MockPark { + MockPark { + parks: HashMap::new(), + } + } + + pub fn is_unparked(&self, index: usize) -> bool { + self.parks[&index].unparked.load(SeqCst) + } + + pub fn clear(&self, index: usize) { + self.parks[&index].unparked.store(false, SeqCst); + } + + pub fn mk_park(&mut self, index: usize) -> impl Park { + let inner = Arc::new(Inner { + unparked: AtomicBool::new(false), + }); + self.parks.insert(index, inner.clone()); + ParkImpl(inner) + } +} + +impl Park for ParkImpl { + type Unpark = ParkImpl; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + self.clone() + } + + fn park(&mut self) -> Result<(), Self::Error> { + unimplemented!(); + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + unimplemented!(); + } +} + +impl Unpark for ParkImpl { + fn unpark(&self) { + self.0.unparked.store(true, SeqCst); + } +} diff --git a/tokio/src/executor/tests/mock_schedule.rs b/tokio/src/executor/tests/mock_schedule.rs new file mode 100644 index 00000000..f105d40c --- /dev/null +++ b/tokio/src/executor/tests/mock_schedule.rs @@ -0,0 +1,131 @@ +#![allow(warnings)] + +use crate::executor::task::{Header, Schedule, Task}; + +use std::collections::VecDeque; +use std::sync::Mutex; +use std::thread; + +pub(crate) struct Mock { + inner: Mutex<Inner>, +} + +pub(crate) struct Noop; +pub(crate) static NOOP_SCHEDULE: Noop = Noop; + +struct Inner { + calls: VecDeque<Call>, + pending_run: VecDeque<Task<Mock>>, + pending_drop: VecDeque<Task<Mock>>, +} + +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +#[derive(Debug, Eq, PartialEq)] +enum Call { + Bind(*const Header<Mock>), + Release, + ReleaseLocal, + Schedule, +} + +pub(crate) fn mock() -> Mock { + Mock { + inner: Mutex::new(Inner { + calls: VecDeque::new(), + pending_run: VecDeque::new(), + pending_drop: VecDeque::new(), + }), + } +} + +impl Mock { + pub(crate) fn bind(self, task: &Task<Mock>) -> Self { + self.push(Call::Bind(task.header() as *const _)); + self + } + + pub(crate) fn release(self) -> Self { + self.push(Call::Release); + self + } + + pub(crate) fn release_local(self) -> Self { + self.push(Call::ReleaseLocal); + self + } + + pub(crate) fn schedule(self) -> Self { + self.push(Call::Schedule); + self + } + + pub(crate) fn next_pending_run(&self) -> Option<Task<Self>> { + self.inner.lock().unwrap().pending_run.pop_front() + } + + pub(crate) fn next_pending_drop(&self) -> Option<Task<Self>> { + self.inner.lock().unwrap().pending_drop.pop_front() + } + + fn push(&self, call: Call) { + self.inner.lock().unwrap().calls.push_back(call); + } + + fn next(&self, name: &str) -> Call { + self.inner + .lock() + .unwrap() + .calls + .pop_front() + .expect(&format!("received `{}`, but none expected", name)) + } +} + +impl Schedule for Mock { + fn bind(&self, task: &Task<Self>) { + match self.next("bind") { + Call::Bind(ptr) => { + assert!(ptr.eq(&(task.header() as *const _))); + } + call => panic!("expected `Bind`, was {:?}", call), + } + } + + fn release(&self, task: Task<Self>) { + match self.next("release") { + Call::Release => { + self.inner.lock().unwrap().pending_drop.push_back(task); + } + call => panic!("expected `Release`, was {:?}", call), + } + } + + fn release_local(&self, _task: &Task<Self>) { + assert_eq!(Call::ReleaseLocal, self.next("release_local")); + } + + fn schedule(&self, task: Task<Self>) { + self.inner.lock().unwrap().pending_run.push_back(task); + assert_eq!(Call::Schedule, self.next("schedule")); + } +} + +impl Drop for Mock { + fn drop(&mut self) { + if !thread::panicking() { + assert!(self.inner.lock().unwrap().calls.is_empty()); + } + } +} + +impl Schedule for Noop { + fn bind(&self, _task: &Task<Self>) {} + + fn release(&self, _task: Task<Self>) {} + + fn release_local(&self, _task: &Task<Self>) {} + + fn schedule(&self, _task: Task<Self>) {} +} diff --git a/tokio/src/executor/tests/mod.rs b/tokio/src/executor/tests/mod.rs new file mode 100644 index 00000000..b287bcf2 --- /dev/null +++ b/tokio/src/executor/tests/mod.rs @@ -0,0 +1,40 @@ +//! Testing utilities + +#[cfg(not(loom))] +pub(crate) mod backoff; + +#[cfg(loom)] +pub(crate) mod loom_oneshot; + +#[cfg(loom)] +pub(crate) mod loom_schedule; + +#[cfg(not(loom))] +pub(crate) mod mock_park; + +pub(crate) mod mock_schedule; + +#[cfg(not(loom))] +pub(crate) mod track_drop; + +/// Panic if expression results in `None`. +#[macro_export] +macro_rules! assert_some { + ($e:expr) => {{ + match $e { + Some(v) => v, + _ => panic!("expected some, was none"), + } + }}; +} + +/// Panic if expression results in `Some`. +#[macro_export] +macro_rules! assert_none { + ($e:expr) => {{ + match $e { + Some(v) => panic!("expected none, was {:?}", v), + _ => {} + } + }}; +} diff --git a/tokio/src/executor/tests/track_drop.rs b/tokio/src/executor/tests/track_drop.rs new file mode 100644 index 00000000..c3ded845 --- /dev/null +++ b/tokio/src/executor/tests/track_drop.rs @@ -0,0 +1,57 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub(crate) struct TrackDrop<T>(T, Arc<AtomicBool>); + +#[derive(Debug)] +pub(crate) struct DidDrop(Arc<AtomicBool>, Arc<AtomicBool>); + +pub(crate) fn track_drop<T: Future>( + future: T, +) -> (impl Future<Output = TrackDrop<T::Output>>, DidDrop) { + let did_drop_future = Arc::new(AtomicBool::new(false)); + let did_drop_output = Arc::new(AtomicBool::new(false)); + let did_drop = DidDrop(did_drop_future.clone(), did_drop_output.clone()); + + let future = async move { TrackDrop(future.await, did_drop_output) }; + + let future = TrackDrop(future, did_drop_future); + + (future, did_drop) +} + +impl<T> TrackDrop<T> { + pub(crate) fn get_ref(&self) -> &T { + &self.0 + } +} + +impl<T: Future> Future for TrackDrop<T> { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }; + me.poll(cx) + } +} + +impl<T> Drop for TrackDrop<T> { + fn drop(&mut self) { + self.1.store(true, SeqCst); + } +} + +impl DidDrop { + pub(crate) fn did_drop_future(&self) -> bool { + self.0.load(SeqCst) + } + + pub(crate) fn did_drop_output(&self) -> bool { + self.1.load(SeqCst) + } +} |