diff options
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 35 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 35 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 35 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_mpsc.rs | 28 |
4 files changed, 133 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 2d2006d5..542eae27 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -320,6 +320,41 @@ impl<T> Sender<T> { } } + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx1, rx) = mpsc::channel::<()>(1); + /// let mut tx2 = tx1.clone(); + /// let mut tx3 = tx1.clone(); + /// let mut tx4 = tx1.clone(); + /// let mut tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&mut self) { + self.chan.closed().await + } + /// Attempts to immediately send a message on this `Sender` /// /// This method differs from [`send`] by returning immediately if the channel's diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 2d3f0149..e7b951ed 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::list; +use crate::sync::Notify; use std::fmt; use std::process; @@ -44,6 +45,9 @@ pub(crate) trait Semaphore { } struct Chan<T, S> { + /// Notifies all tasks listening for the receiver being dropped + notify_rx_closed: Notify, + /// Handle to the push half of the lock-free list. tx: list::Tx<T>, @@ -102,6 +106,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { + notify_rx_closed: Notify::new(), tx, semaphore, rx_waker: AtomicWaker::new(), @@ -137,6 +142,35 @@ impl<T, S> Tx<T, S> { } } +impl<T, S: Semaphore> Tx<T, S> { + pub(crate) async fn closed(&mut self) { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. Requesting the notification + // requires polling the future once. + let notified = self.inner.notify_rx_closed.notified(); + pin!(notified); + + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) + }) + .await; + + if self.inner.semaphore.is_closed() { + return; + } + notified.await; + } +} + impl<T, S> Clone for Tx<T, S> { fn clone(&self) -> Tx<T, S> { // Using a Relaxed ordering here is sufficient as the caller holds a @@ -182,6 +216,7 @@ impl<T, S: Semaphore> Rx<T, S> { }); self.inner.semaphore.close(); + self.inner.notify_rx_closed.notify_waiters(); } /// Receive the next value diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 59456375..09f71f21 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -210,4 +210,39 @@ impl<T> UnboundedSender<T> { } } } + + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); + /// let mut tx2 = tx1.clone(); + /// let mut tx3 = tx1.clone(); + /// let mut tx4 = tx1.clone(); + /// let mut tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&mut self) { + self.chan.closed().await + } } diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index e8db2dea..330e798b 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -41,6 +41,34 @@ fn closing_unbounded_tx() { } #[test] +fn closing_bounded_rx() { + loom::model(|| { + let (mut tx1, rx) = mpsc::channel::<()>(16); + let mut tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + +#[test] +fn closing_unbounded_rx() { + loom::model(|| { + let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); + let mut tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + +#[test] fn dropping_tx() { loom::model(|| { let (tx, mut rx) = mpsc::channel::<()>(16); |