summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/tests
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs45
-rw-r--r--tokio/src/sync/tests/loom_list.rs52
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs23
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs109
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs151
-rw-r--r--tokio/src/sync/tests/mod.rs7
6 files changed, 387 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_atomic_waker.rs b/tokio/src/sync/tests/loom_atomic_waker.rs
new file mode 100644
index 00000000..81e200ff
--- /dev/null
+++ b/tokio/src/sync/tests/loom_atomic_waker.rs
@@ -0,0 +1,45 @@
+use crate::sync::task::AtomicWaker;
+
+use futures_util::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::atomic::AtomicUsize;
+use loom::thread;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+
+struct Chan {
+ num: AtomicUsize,
+ task: AtomicWaker,
+}
+
+#[test]
+fn basic_notification() {
+ const NUM_NOTIFY: usize = 2;
+
+ loom::model(|| {
+ let chan = Arc::new(Chan {
+ num: AtomicUsize::new(0),
+ task: AtomicWaker::new(),
+ });
+
+ for _ in 0..NUM_NOTIFY {
+ let chan = chan.clone();
+
+ thread::spawn(move || {
+ chan.num.fetch_add(1, Relaxed);
+ chan.task.wake();
+ });
+ }
+
+ block_on(poll_fn(move |cx| {
+ chan.task.register_by_ref(cx.waker());
+
+ if NUM_NOTIFY == chan.num.load(Relaxed) {
+ return Ready(());
+ }
+
+ Pending
+ }));
+ });
+}
diff --git a/tokio/src/sync/tests/loom_list.rs b/tokio/src/sync/tests/loom_list.rs
new file mode 100644
index 00000000..4f7746d5
--- /dev/null
+++ b/tokio/src/sync/tests/loom_list.rs
@@ -0,0 +1,52 @@
+use crate::sync::mpsc::list;
+
+use loom::thread;
+use std::sync::Arc;
+
+#[test]
+fn smoke() {
+ use crate::sync::mpsc::block::Read::*;
+
+ const NUM_TX: usize = 2;
+ const NUM_MSG: usize = 2;
+
+ loom::model(|| {
+ let (tx, mut rx) = list::channel();
+ let tx = Arc::new(tx);
+
+ for th in 0..NUM_TX {
+ let tx = tx.clone();
+
+ thread::spawn(move || {
+ for i in 0..NUM_MSG {
+ tx.push((th, i));
+ }
+ debug!(" + tx thread done");
+ });
+ }
+
+ let mut next = vec![0; NUM_TX];
+
+ loop {
+ debug!(" + rx.pop()");
+ match rx.pop(&tx) {
+ Some(Value((th, v))) => {
+ debug!(" + pop() -> Some(Value({}))", v);
+ assert_eq!(v, next[th]);
+ next[th] += 1;
+
+ if next.iter().all(|&i| i == NUM_MSG) {
+ break;
+ }
+ }
+ Some(Closed) => {
+ panic!();
+ }
+ None => {
+ debug!(" + pop() -> None");
+ thread::yield_now();
+ }
+ }
+ }
+ });
+}
diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs
new file mode 100644
index 00000000..748ae9e1
--- /dev/null
+++ b/tokio/src/sync/tests/loom_mpsc.rs
@@ -0,0 +1,23 @@
+use crate::sync::mpsc;
+
+use futures_util::future::poll_fn;
+use loom::future::block_on;
+use loom::thread;
+
+#[test]
+fn closing_tx() {
+ loom::model(|| {
+ let (mut tx, mut rx) = mpsc::channel(16);
+
+ thread::spawn(move || {
+ tx.try_send(()).unwrap();
+ drop(tx);
+ });
+
+ let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ assert!(v.is_some());
+
+ let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ assert!(v.is_none());
+ });
+}
diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs
new file mode 100644
index 00000000..52104736
--- /dev/null
+++ b/tokio/src/sync/tests/loom_oneshot.rs
@@ -0,0 +1,109 @@
+use crate::sync::oneshot;
+
+use futures_util::future::poll_fn;
+use loom::future::block_on;
+use loom::thread;
+use std::task::Poll::{Pending, Ready};
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ });
+}
+
+#[test]
+fn changing_rx_task() {
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let rx = thread::spawn(move || {
+ let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
+ Ready(Ok(value)) => {
+ assert_eq!(1, value);
+ Ready(true)
+ }
+ Ready(Err(_)) => unimplemented!(),
+ Pending => Ready(false),
+ }));
+
+ if ready {
+ None
+ } else {
+ Some(rx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(rx) = rx {
+ // Previous task parked, use a new task...
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ }
+ });
+}
+
+// TODO: Move this into `oneshot` proper.
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct OnClose<'a> {
+ tx: &'a mut oneshot::Sender<i32>,
+}
+
+impl<'a> OnClose<'a> {
+ fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
+ OnClose { tx }
+ }
+}
+
+impl Future for OnClose<'_> {
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
+ let res = self.get_mut().tx.poll_closed(cx);
+ Ready(res.is_ready())
+ }
+}
+
+#[test]
+fn changing_tx_task() {
+ loom::model(|| {
+ let (mut tx, rx) = oneshot::channel::<i32>();
+
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ let tx = thread::spawn(move || {
+ let t1 = block_on(OnClose::new(&mut tx));
+
+ if t1 {
+ None
+ } else {
+ Some(tx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(mut tx) = tx {
+ // Previous task parked, use a new task...
+ block_on(OnClose::new(&mut tx));
+ }
+ });
+}
diff --git a/tokio/src/sync/tests/loom_semaphore.rs b/tokio/src/sync/tests/loom_semaphore.rs
new file mode 100644
index 00000000..d14c7668
--- /dev/null
+++ b/tokio/src/sync/tests/loom_semaphore.rs
@@ -0,0 +1,151 @@
+use crate::sync::semaphore::*;
+
+use futures_core::ready;
+use futures_util::future::poll_fn;
+use loom::future::block_on;
+use loom::thread;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::Arc;
+use std::task::Poll::Ready;
+use std::task::{Context, Poll};
+
+#[test]
+fn basic_usage() {
+ const NUM: usize = 2;
+
+ struct Actor {
+ waiter: Permit,
+ shared: Arc<Shared>,
+ }
+
+ struct Shared {
+ semaphore: Semaphore,
+ active: AtomicUsize,
+ }
+
+ impl Future for Actor {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ let me = &mut *self;
+
+ ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap();
+
+ let actual = me.shared.active.fetch_add(1, SeqCst);
+ assert!(actual <= NUM - 1);
+
+ let actual = me.shared.active.fetch_sub(1, SeqCst);
+ assert!(actual <= NUM);
+
+ me.waiter.release(&me.shared.semaphore);
+
+ Ready(())
+ }
+ }
+
+ loom::model(|| {
+ let shared = Arc::new(Shared {
+ semaphore: Semaphore::new(NUM),
+ active: AtomicUsize::new(0),
+ });
+
+ for _ in 0..NUM {
+ let shared = shared.clone();
+
+ thread::spawn(move || {
+ block_on(Actor {
+ waiter: Permit::new(),
+ shared,
+ });
+ });
+ }
+
+ block_on(Actor {
+ waiter: Permit::new(),
+ shared,
+ });
+ });
+}
+
+#[test]
+fn release() {
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || {
+ let mut permit = Permit::new();
+
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
+
+ permit.release(&semaphore);
+ });
+ }
+
+ let mut permit = Permit::new();
+
+ block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
+
+ permit.release(&semaphore);
+ });
+}
+
+#[test]
+fn basic_closing() {
+ const NUM: usize = 2;
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ for _ in 0..NUM {
+ let semaphore = semaphore.clone();
+
+ thread::spawn(move || {
+ let mut permit = Permit::new();
+
+ for _ in 0..2 {
+ block_on(poll_fn(|cx| {
+ permit.poll_acquire(cx, &semaphore).map_err(|_| ())
+ }))?;
+
+ permit.release(&semaphore);
+ }
+
+ Ok::<(), ()>(())
+ });
+ }
+
+ semaphore.close();
+ });
+}
+
+#[test]
+fn concurrent_close() {
+ const NUM: usize = 3;
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ for _ in 0..NUM {
+ let semaphore = semaphore.clone();
+
+ thread::spawn(move || {
+ let mut permit = Permit::new();
+
+ block_on(poll_fn(|cx| {
+ permit.poll_acquire(cx, &semaphore).map_err(|_| ())
+ }))?;
+
+ permit.release(&semaphore);
+
+ semaphore.close();
+
+ Ok::<(), ()>(())
+ });
+ }
+ });
+}
diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs
new file mode 100644
index 00000000..8e627cb8
--- /dev/null
+++ b/tokio/src/sync/tests/mod.rs
@@ -0,0 +1,7 @@
+#![cfg(loom)]
+
+mod loom_atomic_waker;
+mod loom_list;
+mod loom_mpsc;
+mod loom_oneshot;
+mod loom_semaphore;