summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/notify.rs
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-09-22 18:12:57 +0300
committerGitHub <noreply@github.com>2020-09-22 08:12:57 -0700
commite7091fde786722a5301270e6281fc3c449dcfc14 (patch)
treef40c752ab5411ef0f4ba8e086da339d857c564b2 /tokio/src/sync/notify.rs
parent2348f678e6b2a5e37b596c514b4b1a6ccb090d79 (diff)
sync: Remove readiness assertion in `watch::Receiver::changed() (#2839)
*In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario.
Diffstat (limited to 'tokio/src/sync/notify.rs')
-rw-r--r--tokio/src/sync/notify.rs33
1 files changed, 20 insertions, 13 deletions
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
index c69e2b07..56bbc51b 100644
--- a/tokio/src/sync/notify.rs
+++ b/tokio/src/sync/notify.rs
@@ -106,6 +106,14 @@ pub struct Notify {
waiters: Mutex<WaitList>,
}
+#[derive(Debug, Clone, Copy)]
+enum NotificationType {
+ // Notification triggered by calling `notify_waiters`
+ AllWaiters,
+ // Notification triggered by calling `notify_one`
+ OneWaiter,
+}
+
#[derive(Debug)]
struct Waiter {
/// Intrusive linked-list pointers
@@ -115,7 +123,7 @@ struct Waiter {
waker: Option<Waker>,
/// `true` if the notification has been assigned to this waiter.
- notified: bool,
+ notified: Option<NotificationType>,
/// Should not be `Unpin`.
_p: PhantomPinned,
@@ -230,7 +238,7 @@ impl Notify {
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
- notified: false,
+ notified: None,
_p: PhantomPinned,
}),
}
@@ -327,9 +335,9 @@ impl Notify {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };
- assert!(!waiter.notified);
+ assert!(waiter.notified.is_none());
- waiter.notified = true;
+ waiter.notified = Some(NotificationType::AllWaiters);
if let Some(waker) = waiter.waker.take() {
waker.wake();
@@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<W
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };
- assert!(!waiter.notified);
+ assert!(waiter.notified.is_none());
- waiter.notified = true;
+ waiter.notified = Some(NotificationType::OneWaiter);
let waker = waiter.waker.take();
if waiters.is_empty() {
@@ -506,11 +514,11 @@ impl Future for Notified<'_> {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
- if w.notified {
+ if w.notified.is_some() {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
- w.notified = false;
+ w.notified = None;
*state = Done;
} else {
@@ -582,14 +590,13 @@ impl Drop for Notified<'_> {
notify.state.store(EMPTY, SeqCst);
}
- // See if the node was notified but not received. In this case, the
- // notification must be sent to another waiter.
+ // See if the node was notified but not received. In this case, if
+ // the notification was triggered via `notify_one`, it must be sent
+ // to the next waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
- let notified = unsafe { (*waiter.get()).notified };
-
- if notified {
+ if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();