From e7091fde786722a5301270e6281fc3c449dcfc14 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 22 Sep 2020 18:12:57 +0300 Subject: sync: Remove readiness assertion in `watch::Receiver::changed() (#2839) *In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario. --- tokio/src/sync/notify.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) (limited to 'tokio/src/sync/notify.rs') diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index c69e2b07..56bbc51b 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -106,6 +106,14 @@ pub struct Notify { waiters: Mutex, } +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, +} + #[derive(Debug)] struct Waiter { /// Intrusive linked-list pointers @@ -115,7 +123,7 @@ struct Waiter { waker: Option, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option, /// Should not be `Unpin`. _p: PhantomPinned, @@ -230,7 +238,7 @@ impl Notify { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } @@ -327,9 +335,9 @@ impl Notify { // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { waker.wake(); @@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -582,14 +590,13 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); -- cgit v1.2.3