diff options
Diffstat (limited to 'tokio/src/sync/notify.rs')
-rw-r--r-- | tokio/src/sync/notify.rs | 52 |
1 files changed, 49 insertions, 3 deletions
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 4321c974..74c69e50 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -123,7 +123,7 @@ struct Waiter { /// Future returned from `notified()` #[derive(Debug)] -struct Notified<'a> { +pub struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, @@ -172,6 +172,12 @@ impl Notify { /// Wait for a notification. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn notified(&self); + /// ``` + /// /// Each `Notify` value holds a single permit. If a permit is available from /// an earlier call to [`notify_one()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits @@ -199,7 +205,7 @@ impl Notify { /// notify.notify_one(); /// } /// ``` - pub async fn notified(&self) { + pub fn notified(&self) -> Notified<'_> { Notified { notify: self, state: State::Init, @@ -210,7 +216,6 @@ impl Notify { _p: PhantomPinned, }), } - .await } /// Notifies a waiting task @@ -279,6 +284,45 @@ impl Notify { waker.wake(); } } + + /// Notifies all waiting tasks + pub(crate) fn notify_waiters(&self) { + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock().unwrap(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let curr = self.state.load(SeqCst); + + if let EMPTY | NOTIFIED = curr { + // There are no waiting tasks. In this case, no synchronization is + // established between `notify` and `notified().await`. + return; + } + + // At this point, it is guaranteed that the state will not + // concurrently change, as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get pending waiters + while let Some(mut waiter) = waiters.pop_back() { + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(!waiter.notified); + + waiter.notified = true; + + if let Some(waker) = waiter.waker.take() { + waker.wake(); + } + } + + // All waiters have been notified, the state must be transitioned to + // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be + // held, a `store` is sufficient. + self.state.store(EMPTY, SeqCst); + } } impl Default for Notify { @@ -430,6 +474,8 @@ impl Future for Notified<'_> { waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); *state = Waiting; + + return Poll::Pending; } Waiting => { // Currently in the "Waiting" state, implying the caller has |