summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/batch_semaphore.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:38 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:38 -0700
commitcf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch)
tree39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/src/sync/batch_semaphore.rs
parent4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff)
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/src/sync/batch_semaphore.rs')
-rw-r--r--tokio/src/sync/batch_semaphore.rs10
1 files changed, 7 insertions, 3 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index a1048ca3..9f324f9c 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -165,7 +165,6 @@ impl Semaphore {
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
- #[allow(dead_code)]
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
@@ -185,6 +184,11 @@ impl Semaphore {
}
}
+ /// Returns true if the semaphore is closed
+ pub(crate) fn is_closed(&self) -> bool {
+ self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
+ }
+
pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
@@ -194,8 +198,8 @@ impl Semaphore {
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
- // Has the semaphore closed?git
- if curr & Self::CLOSED > 0 {
+ // Has the semaphore closed?
+ if curr & Self::CLOSED == Self::CLOSED {
return Err(TryAcquireError::Closed);
}