diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-29 15:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-29 15:11:31 -0700 |
commit | 2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch) | |
tree | de255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/tests/loom_oneshot.rs | |
parent | c62ef2d232dea1535a8e22484fa2ca083f03e903 (diff) |
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src/sync/tests/loom_oneshot.rs')
-rw-r--r-- | tokio/src/sync/tests/loom_oneshot.rs | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs new file mode 100644 index 00000000..52104736 --- /dev/null +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -0,0 +1,109 @@ +use crate::sync::oneshot; + +use futures_util::future::poll_fn; +use loom::future::block_on; +use loom::thread; +use std::task::Poll::{Pending, Ready}; + +#[test] +fn smoke() { + loom::model(|| { + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + tx.send(1).unwrap(); + }); + + let value = block_on(rx).unwrap(); + assert_eq!(1, value); + }); +} + +#[test] +fn changing_rx_task() { + loom::model(|| { + let (tx, mut rx) = oneshot::channel(); + + thread::spawn(move || { + tx.send(1).unwrap(); + }); + + let rx = thread::spawn(move || { + let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) { + Ready(Ok(value)) => { + assert_eq!(1, value); + Ready(true) + } + Ready(Err(_)) => unimplemented!(), + Pending => Ready(false), + })); + + if ready { + None + } else { + Some(rx) + } + }) + .join() + .unwrap(); + + if let Some(rx) = rx { + // Previous task parked, use a new task... + let value = block_on(rx).unwrap(); + assert_eq!(1, value); + } + }); +} + +// TODO: Move this into `oneshot` proper. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct OnClose<'a> { + tx: &'a mut oneshot::Sender<i32>, +} + +impl<'a> OnClose<'a> { + fn new(tx: &'a mut oneshot::Sender<i32>) -> Self { + OnClose { tx } + } +} + +impl Future for OnClose<'_> { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { + let res = self.get_mut().tx.poll_closed(cx); + Ready(res.is_ready()) + } +} + +#[test] +fn changing_tx_task() { + loom::model(|| { + let (mut tx, rx) = oneshot::channel::<i32>(); + + thread::spawn(move || { + drop(rx); + }); + + let tx = thread::spawn(move || { + let t1 = block_on(OnClose::new(&mut tx)); + + if t1 { + None + } else { + Some(tx) + } + }) + .join() + .unwrap(); + + if let Some(mut tx) = tx { + // Previous task parked, use a new task... + block_on(OnClose::new(&mut tx)); + } + }); +} |