summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/src/sync/mpsc/bounded.rs35
-rw-r--r--tokio/src/sync/mpsc/chan.rs35
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs35
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs28
4 files changed, 133 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 2d2006d5..542eae27 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -320,6 +320,41 @@ impl<T> Sender<T> {
}
}
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx1, rx) = mpsc::channel::<()>(1);
+ /// let mut tx2 = tx1.clone();
+ /// let mut tx3 = tx1.clone();
+ /// let mut tx4 = tx1.clone();
+ /// let mut tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ //// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&mut self) {
+ self.chan.closed().await
+ }
+
/// Attempts to immediately send a message on this `Sender`
///
/// This method differs from [`send`] by returning immediately if the channel's
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 2d3f0149..e7b951ed 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
+use crate::sync::Notify;
use std::fmt;
use std::process;
@@ -44,6 +45,9 @@ pub(crate) trait Semaphore {
}
struct Chan<T, S> {
+ /// Notifies all tasks listening for the receiver being dropped
+ notify_rx_closed: Notify,
+
/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,
@@ -102,6 +106,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();
let chan = Arc::new(Chan {
+ notify_rx_closed: Notify::new(),
tx,
semaphore,
rx_waker: AtomicWaker::new(),
@@ -137,6 +142,35 @@ impl<T, S> Tx<T, S> {
}
}
+impl<T, S: Semaphore> Tx<T, S> {
+ pub(crate) async fn closed(&mut self) {
+ use std::future::Future;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ // In order to avoid a race condition, we first request a notification,
+ // **then** check the current value's version. If a new version exists,
+ // the notification request is dropped. Requesting the notification
+ // requires polling the future once.
+ let notified = self.inner.notify_rx_closed.notified();
+ pin!(notified);
+
+ // Polling the future once is guaranteed to return `Pending` as `watch`
+ // only notifies using `notify_waiters`.
+ crate::future::poll_fn(|cx| {
+ let res = Pin::new(&mut notified).poll(cx);
+ assert!(!res.is_ready());
+ Poll::Ready(())
+ })
+ .await;
+
+ if self.inner.semaphore.is_closed() {
+ return;
+ }
+ notified.await;
+ }
+}
+
impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
@@ -182,6 +216,7 @@ impl<T, S: Semaphore> Rx<T, S> {
});
self.inner.semaphore.close();
+ self.inner.notify_rx_closed.notify_waiters();
}
/// Receive the next value
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 59456375..09f71f21 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -210,4 +210,39 @@ impl<T> UnboundedSender<T> {
}
}
}
+
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
+ /// let mut tx2 = tx1.clone();
+ /// let mut tx3 = tx1.clone();
+ /// let mut tx4 = tx1.clone();
+ /// let mut tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ //// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&mut self) {
+ self.chan.closed().await
+ }
}
diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs
index e8db2dea..330e798b 100644
--- a/tokio/src/sync/tests/loom_mpsc.rs
+++ b/tokio/src/sync/tests/loom_mpsc.rs
@@ -41,6 +41,34 @@ fn closing_unbounded_tx() {
}
#[test]
+fn closing_bounded_rx() {
+ loom::model(|| {
+ let (mut tx1, rx) = mpsc::channel::<()>(16);
+ let mut tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn closing_unbounded_rx() {
+ loom::model(|| {
+ let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
+ let mut tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
fn dropping_tx() {
loom::model(|| {
let (tx, mut rx) = mpsc::channel::<()>(16);