summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/notify.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/notify.rs')
-rw-r--r--tokio/src/sync/notify.rs52
1 files changed, 49 insertions, 3 deletions
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
index 4321c974..74c69e50 100644
--- a/tokio/src/sync/notify.rs
+++ b/tokio/src/sync/notify.rs
@@ -123,7 +123,7 @@ struct Waiter {
/// Future returned from `notified()`
#[derive(Debug)]
-struct Notified<'a> {
+pub struct Notified<'a> {
/// The `Notify` being received on.
notify: &'a Notify,
@@ -172,6 +172,12 @@ impl Notify {
/// Wait for a notification.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn notified(&self);
+ /// ```
+ ///
/// Each `Notify` value holds a single permit. If a permit is available from
/// an earlier call to [`notify_one()`], then `notified().await` will complete
/// immediately, consuming that permit. Otherwise, `notified().await` waits
@@ -199,7 +205,7 @@ impl Notify {
/// notify.notify_one();
/// }
/// ```
- pub async fn notified(&self) {
+ pub fn notified(&self) -> Notified<'_> {
Notified {
notify: self,
state: State::Init,
@@ -210,7 +216,6 @@ impl Notify {
_p: PhantomPinned,
}),
}
- .await
}
/// Notifies a waiting task
@@ -279,6 +284,45 @@ impl Notify {
waker.wake();
}
}
+
+ /// Notifies all waiting tasks
+ pub(crate) fn notify_waiters(&self) {
+ // There are waiters, the lock must be acquired to notify.
+ let mut waiters = self.waiters.lock().unwrap();
+
+ // The state must be reloaded while the lock is held. The state may only
+ // transition out of WAITING while the lock is held.
+ let curr = self.state.load(SeqCst);
+
+ if let EMPTY | NOTIFIED = curr {
+ // There are no waiting tasks. In this case, no synchronization is
+ // established between `notify` and `notified().await`.
+ return;
+ }
+
+ // At this point, it is guaranteed that the state will not
+ // concurrently change, as holding the lock is required to
+ // transition **out** of `WAITING`.
+ //
+ // Get pending waiters
+ while let Some(mut waiter) = waiters.pop_back() {
+ // Safety: `waiters` lock is still held.
+ let waiter = unsafe { waiter.as_mut() };
+
+ assert!(!waiter.notified);
+
+ waiter.notified = true;
+
+ if let Some(waker) = waiter.waker.take() {
+ waker.wake();
+ }
+ }
+
+ // All waiters have been notified, the state must be transitioned to
+ // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
+ // held, a `store` is sufficient.
+ self.state.store(EMPTY, SeqCst);
+ }
}
impl Default for Notify {
@@ -430,6 +474,8 @@ impl Future for Notified<'_> {
waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = Waiting;
+
+ return Poll::Pending;
}
Waiting => {
// Currently in the "Waiting" state, implying the caller has