summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-09-22 18:12:57 +0300
committerGitHub <noreply@github.com>2020-09-22 08:12:57 -0700
commite7091fde786722a5301270e6281fc3c449dcfc14 (patch)
treef40c752ab5411ef0f4ba8e086da339d857c564b2 /tokio/src/sync
parent2348f678e6b2a5e37b596c514b4b1a6ccb090d79 (diff)
sync: Remove readiness assertion in `watch::Receiver::changed() (#2839)
*In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/notify.rs33
-rw-r--r--tokio/src/sync/tests/loom_watch.rs22
2 files changed, 39 insertions, 16 deletions
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
index c69e2b07..56bbc51b 100644
--- a/tokio/src/sync/notify.rs
+++ b/tokio/src/sync/notify.rs
@@ -106,6 +106,14 @@ pub struct Notify {
waiters: Mutex<WaitList>,
}
+#[derive(Debug, Clone, Copy)]
+enum NotificationType {
+ // Notification triggered by calling `notify_waiters`
+ AllWaiters,
+ // Notification triggered by calling `notify_one`
+ OneWaiter,
+}
+
#[derive(Debug)]
struct Waiter {
/// Intrusive linked-list pointers
@@ -115,7 +123,7 @@ struct Waiter {
waker: Option<Waker>,
/// `true` if the notification has been assigned to this waiter.
- notified: bool,
+ notified: Option<NotificationType>,
/// Should not be `Unpin`.
_p: PhantomPinned,
@@ -230,7 +238,7 @@ impl Notify {
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
- notified: false,
+ notified: None,
_p: PhantomPinned,
}),
}
@@ -327,9 +335,9 @@ impl Notify {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };
- assert!(!waiter.notified);
+ assert!(waiter.notified.is_none());
- waiter.notified = true;
+ waiter.notified = Some(NotificationType::AllWaiters);
if let Some(waker) = waiter.waker.take() {
waker.wake();
@@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<W
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };
- assert!(!waiter.notified);
+ assert!(waiter.notified.is_none());
- waiter.notified = true;
+ waiter.notified = Some(NotificationType::OneWaiter);
let waker = waiter.waker.take();
if waiters.is_empty() {
@@ -506,11 +514,11 @@ impl Future for Notified<'_> {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
- if w.notified {
+ if w.notified.is_some() {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
- w.notified = false;
+ w.notified = None;
*state = Done;
} else {
@@ -582,14 +590,13 @@ impl Drop for Notified<'_> {
notify.state.store(EMPTY, SeqCst);
}
- // See if the node was notified but not received. In this case, the
- // notification must be sent to another waiter.
+ // See if the node was notified but not received. In this case, if
+ // the notification was triggered via `notify_one`, it must be sent
+ // to the next waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
- let notified = unsafe { (*waiter.get()).notified };
-
- if notified {
+ if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();
diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs
index 7944cab8..c575b5b6 100644
--- a/tokio/src/sync/tests/loom_watch.rs
+++ b/tokio/src/sync/tests/loom_watch.rs
@@ -6,14 +6,30 @@ use loom::thread;
#[test]
fn smoke() {
loom::model(|| {
- let (tx, mut rx) = watch::channel(1);
+ let (tx, mut rx1) = watch::channel(1);
+ let mut rx2 = rx1.clone();
+ let mut rx3 = rx1.clone();
+ let mut rx4 = rx1.clone();
+ let mut rx5 = rx1.clone();
let th = thread::spawn(move || {
tx.send(2).unwrap();
});
- block_on(rx.changed()).unwrap();
- assert_eq!(*rx.borrow(), 2);
+ block_on(rx1.changed()).unwrap();
+ assert_eq!(*rx1.borrow(), 2);
+
+ block_on(rx2.changed()).unwrap();
+ assert_eq!(*rx2.borrow(), 2);
+
+ block_on(rx3.changed()).unwrap();
+ assert_eq!(*rx3.borrow(), 2);
+
+ block_on(rx4.changed()).unwrap();
+ assert_eq!(*rx4.borrow(), 2);
+
+ block_on(rx5.changed()).unwrap();
+ assert_eq!(*rx5.borrow(), 2);
th.join().unwrap();
})