summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/batch_semaphore.rs
diff options
context:
space:
mode:
authorKornel <kornel@geekhood.net>2020-07-22 00:51:42 +0100
committerGitHub <noreply@github.com>2020-07-21 16:51:42 -0700
commitc344aac9252c34fcce196200a99529734b5cb9e8 (patch)
tree79647faf1bbd2e586411b9ba1a2cb5cedc98cf14 /tokio/src/sync/batch_semaphore.rs
parentcbb4abc8aeee1f7304ce6c0d6b160ce99dd2c8cf (diff)
sync: support larger number of semaphore permits (#2607)
Diffstat (limited to 'tokio/src/sync/batch_semaphore.rs')
-rw-r--r--tokio/src/sync/batch_semaphore.rs25
1 files changed, 16 insertions, 9 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index 8cd1cdd9..070cd201 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -53,7 +53,7 @@ pub(crate) struct AcquireError(());
pub(crate) struct Acquire<'a> {
node: Waiter,
semaphore: &'a Semaphore,
- num_permits: u16,
+ num_permits: u32,
queued: bool,
}
@@ -103,6 +103,8 @@ impl Semaphore {
const PERMIT_SHIFT: usize = 1;
/// Creates a new semaphore with the initial number of permits
+ ///
+ /// Maximum number of permits on 32-bit platforms is `1<<29`.
pub(crate) fn new(permits: usize) -> Self {
assert!(
permits <= Self::MAX_PERMITS,
@@ -159,9 +161,14 @@ impl Semaphore {
}
}
- pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
- let mut curr = self.permits.load(Acquire);
+ pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
+ assert!(
+ num_permits as usize <= Self::MAX_PERMITS,
+ "a semaphore may not have more than MAX_PERMITS permits ({})",
+ Self::MAX_PERMITS
+ );
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
+ let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?git
if curr & Self::CLOSED > 0 {
@@ -182,7 +189,7 @@ impl Semaphore {
}
}
- pub(crate) fn acquire(&self, num_permits: u16) -> Acquire<'_> {
+ pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
Acquire::new(self, num_permits)
}
@@ -247,7 +254,7 @@ impl Semaphore {
fn poll_acquire(
&self,
cx: &mut Context<'_>,
- num_permits: u16,
+ num_permits: u32,
node: Pin<&mut Waiter>,
queued: bool,
) -> Poll<Result<(), AcquireError>> {
@@ -356,7 +363,7 @@ impl fmt::Debug for Semaphore {
}
impl Waiter {
- fn new(num_permits: u16) -> Self {
+ fn new(num_permits: u32) -> Self {
Waiter {
waker: UnsafeCell::new(None),
state: AtomicUsize::new(num_permits as usize),
@@ -409,7 +416,7 @@ impl Future for Acquire<'_> {
}
impl<'a> Acquire<'a> {
- fn new(semaphore: &'a Semaphore, num_permits: u16) -> Self {
+ fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
Self {
node: Waiter::new(num_permits),
semaphore,
@@ -418,14 +425,14 @@ impl<'a> Acquire<'a> {
}
}
- fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u16, &mut bool) {
+ fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
fn is_unpin<T: Unpin>() {}
unsafe {
// Safety: all fields other than `node` are `Unpin`
is_unpin::<&Semaphore>();
is_unpin::<&mut bool>();
- is_unpin::<u16>();
+ is_unpin::<u32>();
let this = self.get_unchecked_mut();
(