summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-11-16 22:49:35 +0200
committerGitHub <noreply@github.com>2020-11-16 12:49:35 -0800
commitd0ebb4154748166a4ba07baa4b424a1c45efd219 (patch)
tree5ea4d611256290f62baea1a9ffa3333b254181df /tokio/src/sync/mpsc
parentf5cb4c20422a35b51bfba3391744f8bcb54f7581 (diff)
sync: add `Notify::notify_waiters` (#3098)
This PR makes `Notify::notify_waiters` public. The method already exists, but it changes the way `notify_waiters`, is used. Previously in order for the consumer to register interest, in a notification triggered by `notify_waiters`, the `Notified` future had to be polled. This introduced friction when using the api as the future had to be pinned before polled. This change introduces a counter that tracks how many times `notified_waiters` has been called. Upon creation of the future the number of times is loaded. When first polled the future compares this number with the count state of the `Notify` type. This avoids the need for registering the waiter upfront. Fixes: #3066
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r--tokio/src/sync/mpsc/chan.rs19
1 files changed, 2 insertions, 17 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index c78fb501..a40f5c3d 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -148,25 +148,10 @@ impl<T, S: Semaphore> Tx<T, S> {
}
pub(crate) async fn closed(&self) {
- use std::future::Future;
- use std::pin::Pin;
- use std::task::Poll;
-
// In order to avoid a race condition, we first request a notification,
- // **then** check the current value's version. If a new version exists,
- // the notification request is dropped. Requesting the notification
- // requires polling the future once.
+ // **then** check whether the semaphore is closed. If the semaphore is
+ // closed the notification request is dropped.
let notified = self.inner.notify_rx_closed.notified();
- pin!(notified);
-
- // Polling the future once is guaranteed to return `Pending` as `watch`
- // only notifies using `notify_waiters`.
- crate::future::poll_fn(|cx| {
- let res = Pin::new(&mut notified).poll(cx);
- assert!(!res.is_ready());
- Poll::Ready(())
- })
- .await;
if self.inner.semaphore.is_closed() {
return;