diff options
Diffstat (limited to 'tokio/src/sync/rwlock.rs')
-rw-r--r-- | tokio/src/sync/rwlock.rs | 66 |
1 files changed, 32 insertions, 34 deletions
diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 97921b9f..7cce69a5 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -1,8 +1,7 @@ -use crate::future::poll_fn; -use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore}; +use crate::coop::CoopFutureExt; +use crate::sync::batch_semaphore::{AcquireError, Semaphore}; use std::cell::UnsafeCell; use std::ops; -use std::task::{Context, Poll}; #[cfg(not(loom))] const MAX_READS: usize = 32; @@ -109,29 +108,42 @@ pub struct RwLockWriteGuard<'a, T> { #[derive(Debug)] struct ReleasingPermit<'a, T> { num_permits: u16, - permit: Permit, lock: &'a RwLock<T>, } impl<'a, T> ReleasingPermit<'a, T> { - fn poll_acquire( - &mut self, - cx: &mut Context<'_>, - s: &Semaphore, - ) -> Poll<Result<(), AcquireError>> { - // Keep track of task budget - ready!(crate::coop::poll_proceed(cx)); - - self.permit.poll_acquire(cx, self.num_permits, s) + async fn acquire( + lock: &'a RwLock<T>, + num_permits: u16, + ) -> Result<ReleasingPermit<'a, T>, AcquireError> { + lock.s.acquire(num_permits).cooperate().await?; + Ok(Self { num_permits, lock }) } } impl<'a, T> Drop for ReleasingPermit<'a, T> { fn drop(&mut self) { - self.permit.release(self.num_permits, &self.lock.s); + self.lock.s.release(self.num_permits as usize); } } +#[test] +#[cfg(not(loom))] +fn bounds() { + fn check_send<T: Send>() {} + fn check_sync<T: Sync>() {} + fn check_unpin<T: Unpin>() {} + check_send::<RwLock<u32>>(); + check_sync::<RwLock<u32>>(); + check_unpin::<RwLock<u32>>(); + + check_sync::<RwLockReadGuard<'_, u32>>(); + check_unpin::<RwLockReadGuard<'_, u32>>(); + + check_sync::<RwLockWriteGuard<'_, u32>>(); + check_unpin::<RwLockWriteGuard<'_, u32>>(); +} + // As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads. // If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through // RwLock<T>. @@ -189,19 +201,11 @@ impl<T> RwLock<T> { ///} /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - let mut permit = ReleasingPermit { - num_permits: 1, - permit: Permit::new(), - lock: self, - }; - - poll_fn(|cx| permit.poll_acquire(cx, &self.s)) - .await - .unwrap_or_else(|_| { - // The semaphore was closed. but, we never explicitly close it, and we have a - // handle to it through the Arc, which means that this can never happen. - unreachable!() - }); + let permit = ReleasingPermit::acquire(self, 1).await.unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); RwLockReadGuard { lock: self, permit } } @@ -228,13 +232,7 @@ impl<T> RwLock<T> { ///} /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - let mut permit = ReleasingPermit { - num_permits: MAX_READS as u16, - permit: Permit::new(), - lock: self, - }; - - poll_fn(|cx| permit.poll_acquire(cx, &self.s)) + let permit = ReleasingPermit::acquire(self, MAX_READS as u16) .await .unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a |