summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/chan.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/mpsc/chan.rs')
-rw-r--r--tokio/src/sync/mpsc/chan.rs35
1 files changed, 35 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 2d3f0149..e7b951ed 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
+use crate::sync::Notify;
use std::fmt;
use std::process;
@@ -44,6 +45,9 @@ pub(crate) trait Semaphore {
}
struct Chan<T, S> {
+ /// Notifies all tasks listening for the receiver being dropped
+ notify_rx_closed: Notify,
+
/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,
@@ -102,6 +106,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();
let chan = Arc::new(Chan {
+ notify_rx_closed: Notify::new(),
tx,
semaphore,
rx_waker: AtomicWaker::new(),
@@ -137,6 +142,35 @@ impl<T, S> Tx<T, S> {
}
}
+impl<T, S: Semaphore> Tx<T, S> {
+ pub(crate) async fn closed(&mut self) {
+ use std::future::Future;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ // In order to avoid a race condition, we first request a notification,
+ // **then** check the current value's version. If a new version exists,
+ // the notification request is dropped. Requesting the notification
+ // requires polling the future once.
+ let notified = self.inner.notify_rx_closed.notified();
+ pin!(notified);
+
+ // Polling the future once is guaranteed to return `Pending` as `watch`
+ // only notifies using `notify_waiters`.
+ crate::future::poll_fn(|cx| {
+ let res = Pin::new(&mut notified).poll(cx);
+ assert!(!res.is_ready());
+ Poll::Ready(())
+ })
+ .await;
+
+ if self.inner.semaphore.is_closed() {
+ return;
+ }
+ notified.await;
+ }
+}
+
impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
@@ -182,6 +216,7 @@ impl<T, S: Semaphore> Rx<T, S> {
});
self.inner.semaphore.close();
+ self.inner.notify_rx_closed.notify_waiters();
}
/// Receive the next value