summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/rwlock.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/rwlock.rs')
-rw-r--r--tokio/src/sync/rwlock.rs66
1 files changed, 32 insertions, 34 deletions
diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs
index 97921b9f..7cce69a5 100644
--- a/tokio/src/sync/rwlock.rs
+++ b/tokio/src/sync/rwlock.rs
@@ -1,8 +1,7 @@
-use crate::future::poll_fn;
-use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore};
+use crate::coop::CoopFutureExt;
+use crate::sync::batch_semaphore::{AcquireError, Semaphore};
use std::cell::UnsafeCell;
use std::ops;
-use std::task::{Context, Poll};
#[cfg(not(loom))]
const MAX_READS: usize = 32;
@@ -109,29 +108,42 @@ pub struct RwLockWriteGuard<'a, T> {
#[derive(Debug)]
struct ReleasingPermit<'a, T> {
num_permits: u16,
- permit: Permit,
lock: &'a RwLock<T>,
}
impl<'a, T> ReleasingPermit<'a, T> {
- fn poll_acquire(
- &mut self,
- cx: &mut Context<'_>,
- s: &Semaphore,
- ) -> Poll<Result<(), AcquireError>> {
- // Keep track of task budget
- ready!(crate::coop::poll_proceed(cx));
-
- self.permit.poll_acquire(cx, self.num_permits, s)
+ async fn acquire(
+ lock: &'a RwLock<T>,
+ num_permits: u16,
+ ) -> Result<ReleasingPermit<'a, T>, AcquireError> {
+ lock.s.acquire(num_permits).cooperate().await?;
+ Ok(Self { num_permits, lock })
}
}
impl<'a, T> Drop for ReleasingPermit<'a, T> {
fn drop(&mut self) {
- self.permit.release(self.num_permits, &self.lock.s);
+ self.lock.s.release(self.num_permits as usize);
}
}
+#[test]
+#[cfg(not(loom))]
+fn bounds() {
+ fn check_send<T: Send>() {}
+ fn check_sync<T: Sync>() {}
+ fn check_unpin<T: Unpin>() {}
+ check_send::<RwLock<u32>>();
+ check_sync::<RwLock<u32>>();
+ check_unpin::<RwLock<u32>>();
+
+ check_sync::<RwLockReadGuard<'_, u32>>();
+ check_unpin::<RwLockReadGuard<'_, u32>>();
+
+ check_sync::<RwLockWriteGuard<'_, u32>>();
+ check_unpin::<RwLockWriteGuard<'_, u32>>();
+}
+
// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
// RwLock<T>.
@@ -189,19 +201,11 @@ impl<T> RwLock<T> {
///}
/// ```
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
- let mut permit = ReleasingPermit {
- num_permits: 1,
- permit: Permit::new(),
- lock: self,
- };
-
- poll_fn(|cx| permit.poll_acquire(cx, &self.s))
- .await
- .unwrap_or_else(|_| {
- // The semaphore was closed. but, we never explicitly close it, and we have a
- // handle to it through the Arc, which means that this can never happen.
- unreachable!()
- });
+ let permit = ReleasingPermit::acquire(self, 1).await.unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
RwLockReadGuard { lock: self, permit }
}
@@ -228,13 +232,7 @@ impl<T> RwLock<T> {
///}
/// ```
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
- let mut permit = ReleasingPermit {
- num_permits: MAX_READS as u16,
- permit: Permit::new(),
- lock: self,
- };
-
- poll_fn(|cx| permit.poll_acquire(cx, &self.s))
+ let permit = ReleasingPermit::acquire(self, MAX_READS as u16)
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a