summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlice Ryhl <alice@ryhl.io>2020-12-10 09:46:01 +0100
committerGitHub <noreply@github.com>2020-12-10 09:46:01 +0100
commitf60860af7edefef5373d50d77ab605d648d60526 (patch)
tree44bc86bbaa5393a0dc3a94a2066569dcb1b79df1
parent2a30e13f38b864807f9ad92023e91b060a6227a4 (diff)
downloadtokio-f60860af7edefef5373d50d77ab605d648d60526.tar.gz
tokio-f60860af7edefef5373d50d77ab605d648d60526.tar.xz
watch: fix spurious wakeup (#3234)
Co-authored-by: @tijsvd
-rw-r--r--tokio/src/sync/watch.rs71
1 files changed, 56 insertions, 15 deletions
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index b377ca7f..eb61fd03 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -53,10 +53,10 @@
use crate::sync::Notify;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst};
+use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::ops;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::{Relaxed, SeqCst};
-use std::sync::{Arc, RwLock, RwLockReadGuard};
/// Receives values from the associated [`Sender`](struct@Sender).
///
@@ -241,19 +241,19 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
- // In order to avoid a race condition, we first request a notification,
- // **then** check the current value's version. If a new version exists,
- // the notification request is dropped.
- let notified = self.shared.notify_rx.notified();
-
- if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
- return ret;
+ loop {
+ // In order to avoid a race condition, we first request a notification,
+ // **then** check the current value's version. If a new version exists,
+ // the notification request is dropped.
+ let notified = self.shared.notify_rx.notified();
+
+ if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
+ return ret;
+ }
+
+ notified.await;
+ // loop around again in case the wake-up was spurious
}
-
- notified.await;
-
- maybe_changed(&self.shared, &mut self.version)
- .expect("[bug] failed to observe change after notificaton.")
}
}
@@ -390,3 +390,44 @@ impl<T> ops::Deref for Ref<'_, T> {
self.inner.deref()
}
}
+
+#[cfg(all(test, loom))]
+mod tests {
+ use futures::future::FutureExt;
+ use loom::thread;
+
+ // test for https://github.com/tokio-rs/tokio/issues/3168
+ #[test]
+ fn watch_spurious_wakeup() {
+ loom::model(|| {
+ let (send, mut recv) = crate::sync::watch::channel(0i32);
+
+ send.send(1).unwrap();
+
+ let send_thread = thread::spawn(move || {
+ send.send(2).unwrap();
+ send
+ });
+
+ recv.changed().now_or_never();
+
+ let send = send_thread.join().unwrap();
+ let recv_thread = thread::spawn(move || {
+ recv.changed().now_or_never();
+ recv.changed().now_or_never();
+ recv
+ });
+
+ send.send(3).unwrap();
+
+ let mut recv = recv_thread.join().unwrap();
+ let send_thread = thread::spawn(move || {
+ send.send(2).unwrap();
+ });
+
+ recv.changed().now_or_never();
+
+ send_thread.join().unwrap();
+ });
+ }
+}