diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-29 15:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-29 15:11:31 -0700 |
commit | 2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch) | |
tree | de255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/tests | |
parent | c62ef2d232dea1535a8e22484fa2ca083f03e903 (diff) |
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src/sync/tests')
-rw-r--r-- | tokio/src/sync/tests/loom_atomic_waker.rs | 45 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_list.rs | 52 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_mpsc.rs | 23 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_oneshot.rs | 109 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_semaphore.rs | 151 | ||||
-rw-r--r-- | tokio/src/sync/tests/mod.rs | 7 |
6 files changed, 387 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_atomic_waker.rs b/tokio/src/sync/tests/loom_atomic_waker.rs new file mode 100644 index 00000000..81e200ff --- /dev/null +++ b/tokio/src/sync/tests/loom_atomic_waker.rs @@ -0,0 +1,45 @@ +use crate::sync::task::AtomicWaker; + +use futures_util::future::poll_fn; +use loom::future::block_on; +use loom::sync::atomic::AtomicUsize; +use loom::thread; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; + +struct Chan { + num: AtomicUsize, + task: AtomicWaker, +} + +#[test] +fn basic_notification() { + const NUM_NOTIFY: usize = 2; + + loom::model(|| { + let chan = Arc::new(Chan { + num: AtomicUsize::new(0), + task: AtomicWaker::new(), + }); + + for _ in 0..NUM_NOTIFY { + let chan = chan.clone(); + + thread::spawn(move || { + chan.num.fetch_add(1, Relaxed); + chan.task.wake(); + }); + } + + block_on(poll_fn(move |cx| { + chan.task.register_by_ref(cx.waker()); + + if NUM_NOTIFY == chan.num.load(Relaxed) { + return Ready(()); + } + + Pending + })); + }); +} diff --git a/tokio/src/sync/tests/loom_list.rs b/tokio/src/sync/tests/loom_list.rs new file mode 100644 index 00000000..4f7746d5 --- /dev/null +++ b/tokio/src/sync/tests/loom_list.rs @@ -0,0 +1,52 @@ +use crate::sync::mpsc::list; + +use loom::thread; +use std::sync::Arc; + +#[test] +fn smoke() { + use crate::sync::mpsc::block::Read::*; + + const NUM_TX: usize = 2; + const NUM_MSG: usize = 2; + + loom::model(|| { + let (tx, mut rx) = list::channel(); + let tx = Arc::new(tx); + + for th in 0..NUM_TX { + let tx = tx.clone(); + + thread::spawn(move || { + for i in 0..NUM_MSG { + tx.push((th, i)); + } + debug!(" + tx thread done"); + }); + } + + let mut next = vec![0; NUM_TX]; + + loop { + debug!(" + rx.pop()"); + match rx.pop(&tx) { + Some(Value((th, v))) => { + debug!(" + pop() -> Some(Value({}))", v); + assert_eq!(v, next[th]); + next[th] += 1; + + if next.iter().all(|&i| i == NUM_MSG) { + break; + } + } + Some(Closed) => { + panic!(); + } + None => { + debug!(" + pop() -> None"); + thread::yield_now(); + } + } + } + }); +} diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs new file mode 100644 index 00000000..748ae9e1 --- /dev/null +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -0,0 +1,23 @@ +use crate::sync::mpsc; + +use futures_util::future::poll_fn; +use loom::future::block_on; +use loom::thread; + +#[test] +fn closing_tx() { + loom::model(|| { + let (mut tx, mut rx) = mpsc::channel(16); + + thread::spawn(move || { + tx.try_send(()).unwrap(); + drop(tx); + }); + + let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + assert!(v.is_some()); + + let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + assert!(v.is_none()); + }); +} diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs new file mode 100644 index 00000000..52104736 --- /dev/null +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -0,0 +1,109 @@ +use crate::sync::oneshot; + +use futures_util::future::poll_fn; +use loom::future::block_on; +use loom::thread; +use std::task::Poll::{Pending, Ready}; + +#[test] +fn smoke() { + loom::model(|| { + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + tx.send(1).unwrap(); + }); + + let value = block_on(rx).unwrap(); + assert_eq!(1, value); + }); +} + +#[test] +fn changing_rx_task() { + loom::model(|| { + let (tx, mut rx) = oneshot::channel(); + + thread::spawn(move || { + tx.send(1).unwrap(); + }); + + let rx = thread::spawn(move || { + let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) { + Ready(Ok(value)) => { + assert_eq!(1, value); + Ready(true) + } + Ready(Err(_)) => unimplemented!(), + Pending => Ready(false), + })); + + if ready { + None + } else { + Some(rx) + } + }) + .join() + .unwrap(); + + if let Some(rx) = rx { + // Previous task parked, use a new task... + let value = block_on(rx).unwrap(); + assert_eq!(1, value); + } + }); +} + +// TODO: Move this into `oneshot` proper. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct OnClose<'a> { + tx: &'a mut oneshot::Sender<i32>, +} + +impl<'a> OnClose<'a> { + fn new(tx: &'a mut oneshot::Sender<i32>) -> Self { + OnClose { tx } + } +} + +impl Future for OnClose<'_> { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { + let res = self.get_mut().tx.poll_closed(cx); + Ready(res.is_ready()) + } +} + +#[test] +fn changing_tx_task() { + loom::model(|| { + let (mut tx, rx) = oneshot::channel::<i32>(); + + thread::spawn(move || { + drop(rx); + }); + + let tx = thread::spawn(move || { + let t1 = block_on(OnClose::new(&mut tx)); + + if t1 { + None + } else { + Some(tx) + } + }) + .join() + .unwrap(); + + if let Some(mut tx) = tx { + // Previous task parked, use a new task... + block_on(OnClose::new(&mut tx)); + } + }); +} diff --git a/tokio/src/sync/tests/loom_semaphore.rs b/tokio/src/sync/tests/loom_semaphore.rs new file mode 100644 index 00000000..d14c7668 --- /dev/null +++ b/tokio/src/sync/tests/loom_semaphore.rs @@ -0,0 +1,151 @@ +use crate::sync::semaphore::*; + +use futures_core::ready; +use futures_util::future::poll_fn; +use loom::future::block_on; +use loom::thread; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; +use std::task::Poll::Ready; +use std::task::{Context, Poll}; + +#[test] +fn basic_usage() { + const NUM: usize = 2; + + struct Actor { + waiter: Permit, + shared: Arc<Shared>, + } + + struct Shared { + semaphore: Semaphore, + active: AtomicUsize, + } + + impl Future for Actor { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let me = &mut *self; + + ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap(); + + let actual = me.shared.active.fetch_add(1, SeqCst); + assert!(actual <= NUM - 1); + + let actual = me.shared.active.fetch_sub(1, SeqCst); + assert!(actual <= NUM); + + me.waiter.release(&me.shared.semaphore); + + Ready(()) + } + } + + loom::model(|| { + let shared = Arc::new(Shared { + semaphore: Semaphore::new(NUM), + active: AtomicUsize::new(0), + }); + + for _ in 0..NUM { + let shared = shared.clone(); + + thread::spawn(move || { + block_on(Actor { + waiter: Permit::new(), + shared, + }); + }); + } + + block_on(Actor { + waiter: Permit::new(), + shared, + }); + }); +} + +#[test] +fn release() { + loom::model(|| { + let semaphore = Arc::new(Semaphore::new(1)); + + { + let semaphore = semaphore.clone(); + thread::spawn(move || { + let mut permit = Permit::new(); + + block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); + + permit.release(&semaphore); + }); + } + + let mut permit = Permit::new(); + + block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); + + permit.release(&semaphore); + }); +} + +#[test] +fn basic_closing() { + const NUM: usize = 2; + + loom::model(|| { + let semaphore = Arc::new(Semaphore::new(1)); + + for _ in 0..NUM { + let semaphore = semaphore.clone(); + + thread::spawn(move || { + let mut permit = Permit::new(); + + for _ in 0..2 { + block_on(poll_fn(|cx| { + permit.poll_acquire(cx, &semaphore).map_err(|_| ()) + }))?; + + permit.release(&semaphore); + } + + Ok::<(), ()>(()) + }); + } + + semaphore.close(); + }); +} + +#[test] +fn concurrent_close() { + const NUM: usize = 3; + + loom::model(|| { + let semaphore = Arc::new(Semaphore::new(1)); + + for _ in 0..NUM { + let semaphore = semaphore.clone(); + + thread::spawn(move || { + let mut permit = Permit::new(); + + block_on(poll_fn(|cx| { + permit.poll_acquire(cx, &semaphore).map_err(|_| ()) + }))?; + + permit.release(&semaphore); + + semaphore.close(); + + Ok::<(), ()>(()) + }); + } + }); +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs new file mode 100644 index 00000000..8e627cb8 --- /dev/null +++ b/tokio/src/sync/tests/mod.rs @@ -0,0 +1,7 @@ +#![cfg(loom)] + +mod loom_atomic_waker; +mod loom_list; +mod loom_mpsc; +mod loom_oneshot; +mod loom_semaphore; |