diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-09-22 18:12:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-22 08:12:57 -0700 |
commit | e7091fde786722a5301270e6281fc3c449dcfc14 (patch) | |
tree | f40c752ab5411ef0f4ba8e086da339d857c564b2 /tokio/src/sync/notify.rs | |
parent | 2348f678e6b2a5e37b596c514b4b1a6ccb090d79 (diff) |
sync: Remove readiness assertion in `watch::Receiver::changed() (#2839)
*In `watch::Receiver::changed` `Notified` was polled
for the first time to ensure the waiter is registered while
assuming that the first poll will always return `Pending`.
It is the case however that another instance of `Notified`
is dropped without receiving its notification, this "orphaned"
notification can be used to satisfy another waiter without
even registering it. This commit accounts for that scenario.
Diffstat (limited to 'tokio/src/sync/notify.rs')
-rw-r--r-- | tokio/src/sync/notify.rs | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index c69e2b07..56bbc51b 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -106,6 +106,14 @@ pub struct Notify { waiters: Mutex<WaitList>, } +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, +} + #[derive(Debug)] struct Waiter { /// Intrusive linked-list pointers @@ -115,7 +123,7 @@ struct Waiter { waker: Option<Waker>, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option<NotificationType>, /// Should not be `Unpin`. _p: PhantomPinned, @@ -230,7 +238,7 @@ impl Notify { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } @@ -327,9 +335,9 @@ impl Notify { // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { waker.wake(); @@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<W // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::OneWaiter); let waker = waiter.waker.take(); if waiters.is_empty() { @@ -506,11 +514,11 @@ impl Future for Notified<'_> { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -582,14 +590,13 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); |