summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/executor/tests')
-rw-r--r--tokio/src/executor/tests/backoff.rs32
-rw-r--r--tokio/src/executor/tests/loom_oneshot.rs49
-rw-r--r--tokio/src/executor/tests/loom_schedule.rs51
-rw-r--r--tokio/src/executor/tests/mock_park.rs66
-rw-r--r--tokio/src/executor/tests/mock_schedule.rs131
-rw-r--r--tokio/src/executor/tests/mod.rs40
-rw-r--r--tokio/src/executor/tests/track_drop.rs57
7 files changed, 426 insertions, 0 deletions
diff --git a/tokio/src/executor/tests/backoff.rs b/tokio/src/executor/tests/backoff.rs
new file mode 100644
index 00000000..358ab2da
--- /dev/null
+++ b/tokio/src/executor/tests/backoff.rs
@@ -0,0 +1,32 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pub(crate) struct Backoff(usize, bool);
+
+pub(crate) fn backoff(n: usize) -> impl Future<Output = ()> {
+ Backoff(n, false)
+}
+
+/// Back off, but clone the waker each time
+pub(crate) fn backoff_clone(n: usize) -> impl Future<Output = ()> {
+ Backoff(n, true)
+}
+
+impl Future for Backoff {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.0 == 0 {
+ return Poll::Ready(());
+ }
+
+ self.0 -= 1;
+ if self.1 {
+ cx.waker().clone().wake();
+ } else {
+ cx.waker().wake_by_ref();
+ }
+ Poll::Pending
+ }
+}
diff --git a/tokio/src/executor/tests/loom_oneshot.rs b/tokio/src/executor/tests/loom_oneshot.rs
new file mode 100644
index 00000000..c126fe47
--- /dev/null
+++ b/tokio/src/executor/tests/loom_oneshot.rs
@@ -0,0 +1,49 @@
+use loom::sync::Notify;
+
+use std::sync::{Arc, Mutex};
+
+pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner {
+ notify: Notify::new(),
+ value: Mutex::new(None),
+ });
+
+ let tx = Sender {
+ inner: inner.clone(),
+ };
+ let rx = Receiver { inner };
+
+ (tx, rx)
+}
+
+pub(crate) struct Sender<T> {
+ inner: Arc<Inner<T>>,
+}
+
+pub(crate) struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+struct Inner<T> {
+ notify: Notify,
+ value: Mutex<Option<T>>,
+}
+
+impl<T> Sender<T> {
+ pub(crate) fn send(self, value: T) {
+ *self.inner.value.lock().unwrap() = Some(value);
+ self.inner.notify.notify();
+ }
+}
+
+impl<T> Receiver<T> {
+ pub(crate) fn recv(self) -> T {
+ loop {
+ if let Some(v) = self.inner.value.lock().unwrap().take() {
+ return v;
+ }
+
+ self.inner.notify.wait();
+ }
+ }
+}
diff --git a/tokio/src/executor/tests/loom_schedule.rs b/tokio/src/executor/tests/loom_schedule.rs
new file mode 100644
index 00000000..7999dd97
--- /dev/null
+++ b/tokio/src/executor/tests/loom_schedule.rs
@@ -0,0 +1,51 @@
+use crate::executor::task::{Schedule, Task};
+
+use loom::sync::Notify;
+use std::collections::VecDeque;
+use std::sync::Mutex;
+
+pub(crate) struct LoomSchedule {
+ notify: Notify,
+ pending: Mutex<VecDeque<Option<Task<Self>>>>,
+}
+
+impl LoomSchedule {
+ pub(crate) fn new() -> LoomSchedule {
+ LoomSchedule {
+ notify: Notify::new(),
+ pending: Mutex::new(VecDeque::new()),
+ }
+ }
+
+ pub(crate) fn push_task(&self, task: Task<Self>) {
+ self.schedule(task);
+ }
+
+ pub(crate) fn recv(&self) -> Option<Task<Self>> {
+ loop {
+ if let Some(task) = self.pending.lock().unwrap().pop_front() {
+ return task;
+ }
+
+ self.notify.wait();
+ }
+ }
+}
+
+impl Schedule for LoomSchedule {
+ fn bind(&self, _task: &Task<Self>) {}
+
+ fn release(&self, task: Task<Self>) {
+ self.release_local(&task);
+ }
+
+ fn release_local(&self, _task: &Task<Self>) {
+ self.pending.lock().unwrap().push_back(None);
+ self.notify.notify();
+ }
+
+ fn schedule(&self, task: Task<Self>) {
+ self.pending.lock().unwrap().push_back(Some(task));
+ self.notify.notify();
+ }
+}
diff --git a/tokio/src/executor/tests/mock_park.rs b/tokio/src/executor/tests/mock_park.rs
new file mode 100644
index 00000000..2bc9edf5
--- /dev/null
+++ b/tokio/src/executor/tests/mock_park.rs
@@ -0,0 +1,66 @@
+#![allow(warnings)]
+
+use crate::executor::park::{Park, Unpark};
+
+use std::collections::HashMap;
+use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
+use std::sync::Arc;
+use std::time::Duration;
+
+pub struct MockPark {
+ parks: HashMap<usize, Arc<Inner>>,
+}
+
+#[derive(Clone)]
+struct ParkImpl(Arc<Inner>);
+
+struct Inner {
+ unparked: AtomicBool,
+}
+
+impl MockPark {
+ pub fn new() -> MockPark {
+ MockPark {
+ parks: HashMap::new(),
+ }
+ }
+
+ pub fn is_unparked(&self, index: usize) -> bool {
+ self.parks[&index].unparked.load(SeqCst)
+ }
+
+ pub fn clear(&self, index: usize) {
+ self.parks[&index].unparked.store(false, SeqCst);
+ }
+
+ pub fn mk_park(&mut self, index: usize) -> impl Park {
+ let inner = Arc::new(Inner {
+ unparked: AtomicBool::new(false),
+ });
+ self.parks.insert(index, inner.clone());
+ ParkImpl(inner)
+ }
+}
+
+impl Park for ParkImpl {
+ type Unpark = ParkImpl;
+ type Error = ();
+
+ fn unpark(&self) -> Self::Unpark {
+ self.clone()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ unimplemented!();
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ unimplemented!();
+ }
+}
+
+impl Unpark for ParkImpl {
+ fn unpark(&self) {
+ self.0.unparked.store(true, SeqCst);
+ }
+}
diff --git a/tokio/src/executor/tests/mock_schedule.rs b/tokio/src/executor/tests/mock_schedule.rs
new file mode 100644
index 00000000..f105d40c
--- /dev/null
+++ b/tokio/src/executor/tests/mock_schedule.rs
@@ -0,0 +1,131 @@
+#![allow(warnings)]
+
+use crate::executor::task::{Header, Schedule, Task};
+
+use std::collections::VecDeque;
+use std::sync::Mutex;
+use std::thread;
+
+pub(crate) struct Mock {
+ inner: Mutex<Inner>,
+}
+
+pub(crate) struct Noop;
+pub(crate) static NOOP_SCHEDULE: Noop = Noop;
+
+struct Inner {
+ calls: VecDeque<Call>,
+ pending_run: VecDeque<Task<Mock>>,
+ pending_drop: VecDeque<Task<Mock>>,
+}
+
+unsafe impl Send for Inner {}
+unsafe impl Sync for Inner {}
+
+#[derive(Debug, Eq, PartialEq)]
+enum Call {
+ Bind(*const Header<Mock>),
+ Release,
+ ReleaseLocal,
+ Schedule,
+}
+
+pub(crate) fn mock() -> Mock {
+ Mock {
+ inner: Mutex::new(Inner {
+ calls: VecDeque::new(),
+ pending_run: VecDeque::new(),
+ pending_drop: VecDeque::new(),
+ }),
+ }
+}
+
+impl Mock {
+ pub(crate) fn bind(self, task: &Task<Mock>) -> Self {
+ self.push(Call::Bind(task.header() as *const _));
+ self
+ }
+
+ pub(crate) fn release(self) -> Self {
+ self.push(Call::Release);
+ self
+ }
+
+ pub(crate) fn release_local(self) -> Self {
+ self.push(Call::ReleaseLocal);
+ self
+ }
+
+ pub(crate) fn schedule(self) -> Self {
+ self.push(Call::Schedule);
+ self
+ }
+
+ pub(crate) fn next_pending_run(&self) -> Option<Task<Self>> {
+ self.inner.lock().unwrap().pending_run.pop_front()
+ }
+
+ pub(crate) fn next_pending_drop(&self) -> Option<Task<Self>> {
+ self.inner.lock().unwrap().pending_drop.pop_front()
+ }
+
+ fn push(&self, call: Call) {
+ self.inner.lock().unwrap().calls.push_back(call);
+ }
+
+ fn next(&self, name: &str) -> Call {
+ self.inner
+ .lock()
+ .unwrap()
+ .calls
+ .pop_front()
+ .expect(&format!("received `{}`, but none expected", name))
+ }
+}
+
+impl Schedule for Mock {
+ fn bind(&self, task: &Task<Self>) {
+ match self.next("bind") {
+ Call::Bind(ptr) => {
+ assert!(ptr.eq(&(task.header() as *const _)));
+ }
+ call => panic!("expected `Bind`, was {:?}", call),
+ }
+ }
+
+ fn release(&self, task: Task<Self>) {
+ match self.next("release") {
+ Call::Release => {
+ self.inner.lock().unwrap().pending_drop.push_back(task);
+ }
+ call => panic!("expected `Release`, was {:?}", call),
+ }
+ }
+
+ fn release_local(&self, _task: &Task<Self>) {
+ assert_eq!(Call::ReleaseLocal, self.next("release_local"));
+ }
+
+ fn schedule(&self, task: Task<Self>) {
+ self.inner.lock().unwrap().pending_run.push_back(task);
+ assert_eq!(Call::Schedule, self.next("schedule"));
+ }
+}
+
+impl Drop for Mock {
+ fn drop(&mut self) {
+ if !thread::panicking() {
+ assert!(self.inner.lock().unwrap().calls.is_empty());
+ }
+ }
+}
+
+impl Schedule for Noop {
+ fn bind(&self, _task: &Task<Self>) {}
+
+ fn release(&self, _task: Task<Self>) {}
+
+ fn release_local(&self, _task: &Task<Self>) {}
+
+ fn schedule(&self, _task: Task<Self>) {}
+}
diff --git a/tokio/src/executor/tests/mod.rs b/tokio/src/executor/tests/mod.rs
new file mode 100644
index 00000000..b287bcf2
--- /dev/null
+++ b/tokio/src/executor/tests/mod.rs
@@ -0,0 +1,40 @@
+//! Testing utilities
+
+#[cfg(not(loom))]
+pub(crate) mod backoff;
+
+#[cfg(loom)]
+pub(crate) mod loom_oneshot;
+
+#[cfg(loom)]
+pub(crate) mod loom_schedule;
+
+#[cfg(not(loom))]
+pub(crate) mod mock_park;
+
+pub(crate) mod mock_schedule;
+
+#[cfg(not(loom))]
+pub(crate) mod track_drop;
+
+/// Panic if expression results in `None`.
+#[macro_export]
+macro_rules! assert_some {
+ ($e:expr) => {{
+ match $e {
+ Some(v) => v,
+ _ => panic!("expected some, was none"),
+ }
+ }};
+}
+
+/// Panic if expression results in `Some`.
+#[macro_export]
+macro_rules! assert_none {
+ ($e:expr) => {{
+ match $e {
+ Some(v) => panic!("expected none, was {:?}", v),
+ _ => {}
+ }
+ }};
+}
diff --git a/tokio/src/executor/tests/track_drop.rs b/tokio/src/executor/tests/track_drop.rs
new file mode 100644
index 00000000..c3ded845
--- /dev/null
+++ b/tokio/src/executor/tests/track_drop.rs
@@ -0,0 +1,57 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+#[derive(Debug)]
+pub(crate) struct TrackDrop<T>(T, Arc<AtomicBool>);
+
+#[derive(Debug)]
+pub(crate) struct DidDrop(Arc<AtomicBool>, Arc<AtomicBool>);
+
+pub(crate) fn track_drop<T: Future>(
+ future: T,
+) -> (impl Future<Output = TrackDrop<T::Output>>, DidDrop) {
+ let did_drop_future = Arc::new(AtomicBool::new(false));
+ let did_drop_output = Arc::new(AtomicBool::new(false));
+ let did_drop = DidDrop(did_drop_future.clone(), did_drop_output.clone());
+
+ let future = async move { TrackDrop(future.await, did_drop_output) };
+
+ let future = TrackDrop(future, did_drop_future);
+
+ (future, did_drop)
+}
+
+impl<T> TrackDrop<T> {
+ pub(crate) fn get_ref(&self) -> &T {
+ &self.0
+ }
+}
+
+impl<T: Future> Future for TrackDrop<T> {
+ type Output = T::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) };
+ me.poll(cx)
+ }
+}
+
+impl<T> Drop for TrackDrop<T> {
+ fn drop(&mut self) {
+ self.1.store(true, SeqCst);
+ }
+}
+
+impl DidDrop {
+ pub(crate) fn did_drop_future(&self) -> bool {
+ self.0.load(SeqCst)
+ }
+
+ pub(crate) fn did_drop_output(&self) -> bool {
+ self.1.load(SeqCst)
+ }
+}