From c344aac9252c34fcce196200a99529734b5cb9e8 Mon Sep 17 00:00:00 2001 From: Kornel Date: Wed, 22 Jul 2020 00:51:42 +0100 Subject: sync: support larger number of semaphore permits (#2607) --- tokio/src/sync/batch_semaphore.rs | 25 ++++++++++++++++--------- tokio/src/sync/rwlock.rs | 2 +- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 8cd1cdd9..070cd201 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -53,7 +53,7 @@ pub(crate) struct AcquireError(()); pub(crate) struct Acquire<'a> { node: Waiter, semaphore: &'a Semaphore, - num_permits: u16, + num_permits: u32, queued: bool, } @@ -103,6 +103,8 @@ impl Semaphore { const PERMIT_SHIFT: usize = 1; /// Creates a new semaphore with the initial number of permits + /// + /// Maximum number of permits on 32-bit platforms is `1<<29`. pub(crate) fn new(permits: usize) -> Self { assert!( permits <= Self::MAX_PERMITS, @@ -159,9 +161,14 @@ impl Semaphore { } } - pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> { - let mut curr = self.permits.load(Acquire); + pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { + assert!( + num_permits as usize <= Self::MAX_PERMITS, + "a semaphore may not have more than MAX_PERMITS permits ({})", + Self::MAX_PERMITS + ); let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; + let mut curr = self.permits.load(Acquire); loop { // Has the semaphore closed?git if curr & Self::CLOSED > 0 { @@ -182,7 +189,7 @@ impl Semaphore { } } - pub(crate) fn acquire(&self, num_permits: u16) -> Acquire<'_> { + pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> { Acquire::new(self, num_permits) } @@ -247,7 +254,7 @@ impl Semaphore { fn poll_acquire( &self, cx: &mut Context<'_>, - num_permits: u16, + num_permits: u32, node: Pin<&mut Waiter>, queued: bool, ) -> Poll> { @@ -356,7 +363,7 @@ impl fmt::Debug for Semaphore { } impl Waiter { - fn new(num_permits: u16) -> Self { + fn new(num_permits: u32) -> Self { Waiter { waker: UnsafeCell::new(None), state: AtomicUsize::new(num_permits as usize), @@ -409,7 +416,7 @@ impl Future for Acquire<'_> { } impl<'a> Acquire<'a> { - fn new(semaphore: &'a Semaphore, num_permits: u16) -> Self { + fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { Self { node: Waiter::new(num_permits), semaphore, @@ -418,14 +425,14 @@ impl<'a> Acquire<'a> { } } - fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u16, &mut bool) { + fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { fn is_unpin() {} unsafe { // Safety: all fields other than `node` are `Unpin` is_unpin::<&Semaphore>(); is_unpin::<&mut bool>(); - is_unpin::(); + is_unpin::(); let this = self.get_unchecked_mut(); ( diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index f6cbd2a0..3d2a2f7a 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -115,7 +115,7 @@ impl<'a, T: ?Sized> ReleasingPermit<'a, T> { lock: &'a RwLock, num_permits: u16, ) -> Result, AcquireError> { - lock.s.acquire(num_permits).await?; + lock.s.acquire(num_permits.into()).await?; Ok(Self { num_permits, lock }) } } -- cgit v1.2.3