From cf025ba45f68934ae2138bb75ee2a5ee50506d1b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 24 Sep 2020 17:26:38 -0700 Subject: sync: support mpsc send with `&self` (#2861) Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters) --- tokio/src/sync/batch_semaphore.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'tokio/src/sync/batch_semaphore.rs') diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index a1048ca3..9f324f9c 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -165,7 +165,6 @@ impl Semaphore { /// permits and notifies all pending waiters. // This will be used once the bounded MPSC is updated to use the new // semaphore implementation. - #[allow(dead_code)] pub(crate) fn close(&self) { let mut waiters = self.waiters.lock().unwrap(); // If the semaphore's permits counter has enough permits for an @@ -185,6 +184,11 @@ impl Semaphore { } } + /// Returns true if the semaphore is closed + pub(crate) fn is_closed(&self) -> bool { + self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED + } + pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { assert!( num_permits as usize <= Self::MAX_PERMITS, @@ -194,8 +198,8 @@ impl Semaphore { let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; let mut curr = self.permits.load(Acquire); loop { - // Has the semaphore closed?git - if curr & Self::CLOSED > 0 { + // Has the semaphore closed? + if curr & Self::CLOSED == Self::CLOSED { return Err(TryAcquireError::Closed); } -- cgit v1.2.3