summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-21 14:29:22 -0700
committerCarl Lerche <me@carllerche.com>2020-09-21 14:35:09 -0700
commit93f8cb8df2bb0dceb7921556165a8ed8efed9151 (patch)
tree964a5ce89407b7ad602ab148927087fbfbbaa86e /tokio/src/sync
parent1ac10fa80aa847c68e376ebf9ef4e2be891e41d3 (diff)
sync: fix missing notification during mpsc close (#2854)
When the mpsc channel receiver closes the channel, receiving should return `None` once all in-progress sends have completed. When a sender reserves capacity, this prevents the receiver from fully shutting down. Previously, when the sender, after reserving capacity, dropped without sending a message, the receiver was not notified. This results in blocking the shutdown process until all sender handles drop. This patch adds a receiver notification when the channel is both closed and all outstanding sends have completed.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/chan.rs22
1 files changed, 17 insertions, 5 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 148ee3ad..0a53cda2 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -75,7 +75,11 @@ pub(crate) trait Semaphore {
/// The permit is dropped without a value being sent. In this case, the
/// permit must be returned to the semaphore.
- fn drop_permit(&self, permit: &mut Self::Permit);
+ ///
+ /// # Return
+ ///
+ /// Returns true if the permit was acquired.
+ fn drop_permit(&self, permit: &mut Self::Permit) -> bool;
fn is_idle(&self) -> bool;
@@ -192,7 +196,7 @@ where
pub(crate) fn disarm(&mut self) {
// TODO: should this error if not acquired?
- self.inner.semaphore.drop_permit(&mut self.permit)
+ self.inner.semaphore.drop_permit(&mut self.permit);
}
/// Send a message and notify the receiver.
@@ -234,7 +238,11 @@ where
S: Semaphore,
{
fn drop(&mut self) {
- self.inner.semaphore.drop_permit(&mut self.permit);
+ let notify = self.inner.semaphore.drop_permit(&mut self.permit);
+
+ if notify && self.inner.semaphore.is_idle() {
+ self.inner.rx_waker.wake();
+ }
if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
return;
@@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
Permit::new()
}
- fn drop_permit(&self, permit: &mut Permit) {
+ fn drop_permit(&self, permit: &mut Permit) -> bool {
+ let ret = permit.is_acquired();
permit.release(1, &self.0);
+ ret
}
fn add_permit(&self) {
@@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize {
fn new_permit() {}
- fn drop_permit(&self, _permit: &mut ()) {}
+ fn drop_permit(&self, _permit: &mut ()) -> bool {
+ false
+ }
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);