summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-09-25 18:40:31 +0300
committerGitHub <noreply@github.com>2020-09-25 08:40:31 -0700
commit55d932a21fd4c5fa298ca3cfdcb1388dbbf43dd0 (patch)
tree500d0d68a991277b466eabb5b87379bb1037d093 /tokio/src/sync
parent444660664b96f758610a0e7201a6a1a31a0f2405 (diff)
sync: add `mpsc::Sender::closed` future (#2840)
Adding closed future, makes it possible to select over closed and some other work, so that the task is woken when the channel is closed and can proactively cancel itself. Added a mpsc::Sender::closed future that will become ready when the receiver is closed.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs35
-rw-r--r--tokio/src/sync/mpsc/chan.rs35
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs35
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs28
4 files changed, 133 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 2d2006d5..542eae27 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -320,6 +320,41 @@ impl<T> Sender<T> {
}
}
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx1, rx) = mpsc::channel::<()>(1);
+ /// let mut tx2 = tx1.clone();
+ /// let mut tx3 = tx1.clone();
+ /// let mut tx4 = tx1.clone();
+ /// let mut tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ //// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&mut self) {
+ self.chan.closed().await
+ }
+
/// Attempts to immediately send a message on this `Sender`
///
/// This method differs from [`send`] by returning immediately if the channel's
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 2d3f0149..e7b951ed 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
+use crate::sync::Notify;
use std::fmt;
use std::process;
@@ -44,6 +45,9 @@ pub(crate) trait Semaphore {
}
struct Chan<T, S> {
+ /// Notifies all tasks listening for the receiver being dropped
+ notify_rx_closed: Notify,
+
/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,
@@ -102,6 +106,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();
let chan = Arc::new(Chan {
+ notify_rx_closed: Notify::new(),
tx,
semaphore,
rx_waker: AtomicWaker::new(),
@@ -137,6 +142,35 @@ impl<T, S> Tx<T, S> {
}
}
+impl<T, S: Semaphore> Tx<T, S> {
+ pub(crate) async fn closed(&mut self) {
+ use std::future::Future;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ // In order to avoid a race condition, we first request a notification,
+ // **then** check the current value's version. If a new version exists,
+ // the notification request is dropped. Requesting the notification
+ // requires polling the future once.
+ let notified = self.inner.notify_rx_closed.notified();
+ pin!(notified);
+
+ // Polling the future once is guaranteed to return `Pending` as `watch`
+ // only notifies using `notify_waiters`.
+ crate::future::poll_fn(|cx| {
+ let res = Pin::new(&mut notified).poll(cx);
+ assert!(!res.is_ready());
+ Poll::Ready(())
+ })
+ .await;
+
+ if self.inner.semaphore.is_closed() {
+ return;
+ }
+ notified.await;
+ }
+}
+
impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
@@ -182,6 +216,7 @@ impl<T, S: Semaphore> Rx<T, S> {
});
self.inner.semaphore.close();
+ self.inner.notify_rx_closed.notify_waiters();
}
/// Receive the next value
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 59456375..09f71f21 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -210,4 +210,39 @@ impl<T> UnboundedSender<T> {
}
}
}
+
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
+ /// let mut tx2 = tx1.clone();
+ /// let mut tx3 = tx1.clone();
+ /// let mut tx4 = tx1.clone();
+ /// let mut tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ //// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&mut self) {
+ self.chan.closed().await
+ }
}
diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs
index e8db2dea..330e798b 100644
--- a/tokio/src/sync/tests/loom_mpsc.rs
+++ b/tokio/src/sync/tests/loom_mpsc.rs
@@ -41,6 +41,34 @@ fn closing_unbounded_tx() {
}
#[test]
+fn closing_bounded_rx() {
+ loom::model(|| {
+ let (mut tx1, rx) = mpsc::channel::<()>(16);
+ let mut tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn closing_unbounded_rx() {
+ loom::model(|| {
+ let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
+ let mut tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
fn dropping_tx() {
loom::model(|| {
let (tx, mut rx) = mpsc::channel::<()>(16);