diff options
author | Alice Ryhl <alice@ryhl.io> | 2020-12-10 09:46:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-10 09:46:01 +0100 |
commit | f60860af7edefef5373d50d77ab605d648d60526 (patch) | |
tree | 44bc86bbaa5393a0dc3a94a2066569dcb1b79df1 /tokio/src | |
parent | 2a30e13f38b864807f9ad92023e91b060a6227a4 (diff) |
watch: fix spurious wakeup (#3234)
Co-authored-by: @tijsvd
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/sync/watch.rs | 71 |
1 files changed, 56 insertions, 15 deletions
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index b377ca7f..eb61fd03 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -53,10 +53,10 @@ use crate::sync::Notify; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst}; +use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::ops; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, RwLock, RwLockReadGuard}; /// Receives values from the associated [`Sender`](struct@Sender). /// @@ -241,19 +241,19 @@ impl<T> Receiver<T> { /// } /// ``` pub async fn changed(&mut self) -> Result<(), error::RecvError> { - // In order to avoid a race condition, we first request a notification, - // **then** check the current value's version. If a new version exists, - // the notification request is dropped. - let notified = self.shared.notify_rx.notified(); - - if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { - return ret; + loop { + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. + let notified = self.shared.notify_rx.notified(); + + if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { + return ret; + } + + notified.await; + // loop around again in case the wake-up was spurious } - - notified.await; - - maybe_changed(&self.shared, &mut self.version) - .expect("[bug] failed to observe change after notificaton.") } } @@ -390,3 +390,44 @@ impl<T> ops::Deref for Ref<'_, T> { self.inner.deref() } } + +#[cfg(all(test, loom))] +mod tests { + use futures::future::FutureExt; + use loom::thread; + + // test for https://github.com/tokio-rs/tokio/issues/3168 + #[test] + fn watch_spurious_wakeup() { + loom::model(|| { + let (send, mut recv) = crate::sync::watch::channel(0i32); + + send.send(1).unwrap(); + + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + send + }); + + recv.changed().now_or_never(); + + let send = send_thread.join().unwrap(); + let recv_thread = thread::spawn(move || { + recv.changed().now_or_never(); + recv.changed().now_or_never(); + recv + }); + + send.send(3).unwrap(); + + let mut recv = recv_thread.join().unwrap(); + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + }); + + recv.changed().now_or_never(); + + send_thread.join().unwrap(); + }); + } +} |