diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-11-16 22:49:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-16 12:49:35 -0800 |
commit | d0ebb4154748166a4ba07baa4b424a1c45efd219 (patch) | |
tree | 5ea4d611256290f62baea1a9ffa3333b254181df /tokio/src/sync/mpsc | |
parent | f5cb4c20422a35b51bfba3391744f8bcb54f7581 (diff) |
sync: add `Notify::notify_waiters` (#3098)
This PR makes `Notify::notify_waiters` public. The method
already exists, but it changes the way `notify_waiters`,
is used. Previously in order for the consumer to
register interest, in a notification triggered by
`notify_waiters`, the `Notified` future had to be
polled. This introduced friction when using the api
as the future had to be pinned before polled.
This change introduces a counter that tracks how many
times `notified_waiters` has been called. Upon creation of
the future the number of times is loaded. When first
polled the future compares this number with the count
state of the `Notify` type. This avoids the need for
registering the waiter upfront.
Fixes: #3066
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 19 |
1 files changed, 2 insertions, 17 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index c78fb501..a40f5c3d 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -148,25 +148,10 @@ impl<T, S: Semaphore> Tx<T, S> { } pub(crate) async fn closed(&self) { - use std::future::Future; - use std::pin::Pin; - use std::task::Poll; - // In order to avoid a race condition, we first request a notification, - // **then** check the current value's version. If a new version exists, - // the notification request is dropped. Requesting the notification - // requires polling the future once. + // **then** check whether the semaphore is closed. If the semaphore is + // closed the notification request is dropped. let notified = self.inner.notify_rx_closed.notified(); - pin!(notified); - - // Polling the future once is guaranteed to return `Pending` as `watch` - // only notifies using `notify_waiters`. - crate::future::poll_fn(|cx| { - let res = Pin::new(&mut notified).poll(cx); - assert!(!res.is_ready()); - Poll::Ready(()) - }) - .await; if self.inner.semaphore.is_closed() { return; |