diff options
author | Carl Lerche <me@carllerche.com> | 2020-09-21 14:29:22 -0700 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2020-09-21 14:35:09 -0700 |
commit | 93f8cb8df2bb0dceb7921556165a8ed8efed9151 (patch) | |
tree | 964a5ce89407b7ad602ab148927087fbfbbaa86e /tokio/src/sync | |
parent | 1ac10fa80aa847c68e376ebf9ef4e2be891e41d3 (diff) |
sync: fix missing notification during mpsc close (#2854)
When the mpsc channel receiver closes the channel, receiving should
return `None` once all in-progress sends have completed. When a sender
reserves capacity, this prevents the receiver from fully shutting down.
Previously, when the sender, after reserving capacity, dropped without
sending a message, the receiver was not notified. This results in
blocking the shutdown process until all sender handles drop.
This patch adds a receiver notification when the channel is both closed
and all outstanding sends have completed.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 148ee3ad..0a53cda2 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -75,7 +75,11 @@ pub(crate) trait Semaphore { /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. - fn drop_permit(&self, permit: &mut Self::Permit); + /// + /// # Return + /// + /// Returns true if the permit was acquired. + fn drop_permit(&self, permit: &mut Self::Permit) -> bool; fn is_idle(&self) -> bool; @@ -192,7 +196,7 @@ where pub(crate) fn disarm(&mut self) { // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit) + self.inner.semaphore.drop_permit(&mut self.permit); } /// Send a message and notify the receiver. @@ -234,7 +238,11 @@ where S: Semaphore, { fn drop(&mut self) { - self.inner.semaphore.drop_permit(&mut self.permit); + let notify = self.inner.semaphore.drop_permit(&mut self.permit); + + if notify && self.inner.semaphore.is_idle() { + self.inner.rx_waker.wake(); + } if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; @@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { Permit::new() } - fn drop_permit(&self, permit: &mut Permit) { + fn drop_permit(&self, permit: &mut Permit) -> bool { + let ret = permit.is_acquired(); permit.release(1, &self.0); + ret } fn add_permit(&self) { @@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize { fn new_permit() {} - fn drop_permit(&self, _permit: &mut ()) {} + fn drop_permit(&self, _permit: &mut ()) -> bool { + false + } fn add_permit(&self) { let prev = self.fetch_sub(2, Release); |