diff options
author | João Oliveira <hello@jxs.pt> | 2020-01-04 05:03:26 +0000 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2020-01-03 21:03:26 -0800 |
commit | 32e15b3a24ac177c92a78eb04e233534583eae17 (patch) | |
tree | d64a660fc2e497288880994f61c778c84e2e8edf /tokio | |
parent | efcbf9613f2d5048550f9c828e3be422644f1391 (diff) |
sync: add RwLock (#1699)
Provides a `RwLock` based on a semaphore. The semaphore is initialized
with 32 permits. A read acquires a single permit and a write acquires all 32
permits. This ensures that reads (up to 32) may happen concurrently and
writes happen exclusively.
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/sync/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/sync/rwlock.rs | 256 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_rwlock.rs | 78 | ||||
-rw-r--r-- | tokio/tests/sync_rwlock.rs | 231 |
4 files changed, 568 insertions, 0 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 6ed91607..beb96920 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -32,6 +32,9 @@ cfg_sync! { mod semaphore; pub use semaphore::{Semaphore, SemaphorePermit}; + mod rwlock; + pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + mod task; pub(crate) use task::AtomicWaker; diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs new file mode 100644 index 00000000..ad27dbff --- /dev/null +++ b/tokio/src/sync/rwlock.rs @@ -0,0 +1,256 @@ +use crate::future::poll_fn; +use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore}; +use std::cell::UnsafeCell; +use std::ops; +use std::task::{Context, Poll}; + +#[cfg(not(loom))] +const MAX_READS: usize = 32; + +#[cfg(loom)] +const MAX_READS: usize = 10; + +/// An asynchronous reader-writer lock +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// In comparison, a [`Mutex`] does not distinguish between readers or writers +/// that acquire the lock, therefore blocking any tasks waiting for the lock to +/// become available. An `RwLock` will allow any number of readers to acquire the +/// lock as long as a writer is not holding the lock. +/// +/// The priority policy of the lock is dependent on the underlying operating +/// system's implementation, and this type does not guarantee that any +/// particular policy will be used. +/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards +/// returned from the locking methods implement [`Deref`](https://doc.rust-lang.org/std/ops/trait.Deref.html) +/// (and [`DerefMut`](https://doc.rust-lang.org/std/ops/trait.DerefMut.html) +/// for the `write` methods) to allow access to the content of the lock. +/// +/// # Examples +/// +/// ``` +/// use tokio::sync::RwLock; +/// +/// #[tokio::main] +/// async fn main() { +/// let lock = RwLock::new(5); +/// +/// // many reader locks can be held at once +/// { +/// let r1 = lock.read().await; +/// let r2 = lock.read().await; +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // read locks are dropped at this point +/// +/// // only one write lock may be held, however +/// { +/// let mut w = lock.write().await; +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // write lock is dropped here +/// } +/// ``` +/// +/// [`Mutex`]: struct.Mutex.html +/// [`RwLock`]: struct.RwLock.html +/// [`RwLockReadGuard`]: struct.RwLockReadGuard.html +/// [`RwLockWriteGuard`]: struct.RwLockWriteGuard.html +/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html +#[derive(Debug)] +pub struct RwLock<T> { + //semaphore to coordinate read and write access to T + s: Semaphore, + + //inner data T + c: UnsafeCell<T>, +} + +/// RAII structure used to release the shared read access of a lock when +/// dropped. +/// +/// This structure is created by the [`read`] method on +/// [`RwLock`]. +/// +/// [`read`]: struct.RwLock.html#method.read +#[derive(Debug)] +pub struct RwLockReadGuard<'a, T> { + permit: ReleasingPermit<'a, T>, + lock: &'a RwLock<T>, +} + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped. +/// +/// This structure is created by the [`write`] and method +/// on [`RwLock`]. +/// +/// [`write`]: struct.RwLock.html#method.write +/// [`RwLock`]: struct.RwLock.html +#[derive(Debug)] +pub struct RwLockWriteGuard<'a, T> { + permit: ReleasingPermit<'a, T>, + lock: &'a RwLock<T>, +} + +// Wrapper arround Permit that releases on Drop +#[derive(Debug)] +struct ReleasingPermit<'a, T> { + num_permits: u16, + permit: Permit, + lock: &'a RwLock<T>, +} + +impl<'a, T> ReleasingPermit<'a, T> { + fn poll_acquire( + &mut self, + cx: &mut Context<'_>, + s: &Semaphore, + ) -> Poll<Result<(), AcquireError>> { + self.permit.poll_acquire(cx, self.num_permits, s) + } +} + +impl<'a, T> Drop for ReleasingPermit<'a, T> { + fn drop(&mut self) { + self.permit.release(self.num_permits, &self.lock.s); + } +} + +// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads. +// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through +// RwLock<T>. +unsafe impl<T> Send for RwLock<T> where T: Send {} +unsafe impl<T> Sync for RwLock<T> where T: Send + Sync {} +unsafe impl<'a, T> Sync for RwLockReadGuard<'a, T> where T: Send + Sync {} +unsafe impl<'a, T> Sync for RwLockWriteGuard<'a, T> where T: Send + Sync {} + +impl<T> RwLock<T> { + /// Creates a new instance of an `RwLock<T>` which is unlocked. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::RwLock; + /// + /// let lock = RwLock::new(5); + /// ``` + pub fn new(value: T) -> RwLock<T> { + RwLock { + c: UnsafeCell::new(value), + s: Semaphore::new(MAX_READS), + } + } + + /// Locks this rwlock with shared read access, blocking the current task + /// until it can be acquired. + /// + /// The calling task will be blocked until there are no more writers which + /// hold the lock. There may be other readers currently inside the lock when + /// this method returns. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::RwLock; + /// + /// #[tokio::main] + /// async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().await; + /// assert_eq!(*n, 1); + /// + /// tokio::spawn(async move { + /// let r = c_lock.read().await; + /// assert_eq!(*r, 1); + /// }); + ///} + /// ``` + pub async fn read(&self) -> RwLockReadGuard<'_, T> { + let mut permit = ReleasingPermit { + num_permits: 1, + permit: Permit::new(), + lock: self, + }; + + poll_fn(|cx| permit.poll_acquire(cx, &self.s)) + .await + .unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); + RwLockReadGuard { lock: self, permit } + } + + /// Locks this rwlock with exclusive write access, blocking the current + /// task until it can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock + /// when dropped. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::RwLock; + /// + /// #[tokio::main] + /// async fn main() { + /// let lock = RwLock::new(1); + /// + /// let mut n = lock.write().await; + /// *n = 2; + ///} + /// ``` + pub async fn write(&self) -> RwLockWriteGuard<'_, T> { + let mut permit = ReleasingPermit { + num_permits: MAX_READS as u16, + permit: Permit::new(), + lock: self, + }; + + poll_fn(|cx| permit.poll_acquire(cx, &self.s)) + .await + .unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); + + RwLockWriteGuard { lock: self, permit } + } +} + +impl<T> ops::Deref for RwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.c.get() } + } +} + +impl<T> ops::Deref for RwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.c.get() } + } +} + +impl<T> ops::DerefMut for RwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.c.get() } + } +} diff --git a/tokio/src/sync/tests/loom_rwlock.rs b/tokio/src/sync/tests/loom_rwlock.rs new file mode 100644 index 00000000..48d06e1d --- /dev/null +++ b/tokio/src/sync/tests/loom_rwlock.rs @@ -0,0 +1,78 @@ +use crate::sync::rwlock::*; + +use loom::future::block_on; +use loom::thread; +use std::sync::Arc; + +#[test] +fn concurrent_write() { + let mut b = loom::model::Builder::new(); + + b.check(|| { + let rwlock = Arc::new(RwLock::<u32>::new(0)); + + let rwclone = rwlock.clone(); + let t1 = thread::spawn(move || { + block_on(async { + let mut guard = rwclone.write().await; + *guard += 5; + }); + }); + + let rwclone = rwlock.clone(); + let t2 = thread::spawn(move || { + block_on(async { + let mut guard = rwclone.write().await; + *guard += 5; + }); + }); + + t1.join().expect("thread 1 write should not panic"); + t2.join().expect("thread 2 write should not panic"); + //when all threads have finished the value on the lock should be 10 + let guard = block_on(rwlock.read()); + assert_eq!(10, *guard); + }); +} + +#[test] +fn concurrent_read_write() { + let mut b = loom::model::Builder::new(); + + b.check(|| { + let rwlock = Arc::new(RwLock::<u32>::new(0)); + + let rwclone = rwlock.clone(); + let t1 = thread::spawn(move || { + block_on(async { + let mut guard = rwclone.write().await; + *guard += 5; + }); + }); + + let rwclone = rwlock.clone(); + let t2 = thread::spawn(move || { + block_on(async { + let mut guard = rwclone.write().await; + *guard += 5; + }); + }); + + let rwclone = rwlock.clone(); + let t3 = thread::spawn(move || { + block_on(async { + let guard = rwclone.read().await; + //at this state the value on the lock may either be 0, 5, or 10 + assert!(*guard == 0 || *guard == 5 || *guard == 10); + }); + }); + + t1.join().expect("thread 1 write should not panic"); + t2.join().expect("thread 2 write should not panic"); + t3.join().expect("thread 3 read should not panic"); + + let guard = block_on(rwlock.read()); + //when all threads have finished the value on the lock should be 10 + assert_eq!(10, *guard); + }); +} diff --git a/tokio/tests/sync_rwlock.rs b/tokio/tests/sync_rwlock.rs new file mode 100644 index 00000000..b6bbf060 --- /dev/null +++ b/tokio/tests/sync_rwlock.rs @@ -0,0 +1,231 @@ +#![warn(rust_2018_idioms)] + +use std::sync::Arc; +use std::task::Poll; + +use futures::future::FutureExt; +use futures::stream; +use futures::stream::StreamExt; + +use tokio::sync::{Barrier, RwLock}; +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; + +// multiple reads should be Ready +#[test] +fn read_shared() { + let rwlock = RwLock::new(100); + + let mut t1 = spawn(rwlock.read()); + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.read()); + assert_ready!(t2.poll()); +} + +// When there is an active shared owner, exclusive access should not be possible +#[test] +fn write_shared_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); +} + +// When there is an active exclusive owner, subsequent exclusive access should not be possible +#[test] +fn read_exclusive_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.write()); + + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.read()); + assert_pending!(t2.poll()); +} + +// If the max shared access is reached and subsquent shared access is pending +// should be made available when one of the shared acesses is dropped +#[test] +fn exhaust_reading() { + let rwlock = RwLock::new(100); + let mut reads = Vec::new(); + loop { + let mut t = spawn(rwlock.read()); + match t.poll() { + Poll::Ready(guard) => reads.push(guard), + Poll::Pending => break, + } + } + + let mut t1 = spawn(rwlock.read()); + assert_pending!(t1.poll()); + let g2 = reads.pop().unwrap(); + drop(g2); + assert!(t1.is_woken()); + assert_ready!(t1.poll()); +} + +// When there is an active exclusive owner, subsequent exclusive access should not be possible +#[test] +fn write_exclusive_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.write()); + + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); +} + +// When there is an active shared owner, exclusive access should be possible after shared is dropped +#[test] +fn write_shared_drop() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + + let g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); + drop(g1); + assert!(t2.is_woken()); + assert_ready!(t2.poll()); +} + +// when there is an active shared owner, and exclusive access is triggered, +// subsequent shared access should not be possible as write gathers all the available semaphore permits +#[test] +fn write_read_shared_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + let _g1 = assert_ready!(t1.poll()); + + let mut t2 = spawn(rwlock.read()); + assert_ready!(t2.poll()); + + let mut t3 = spawn(rwlock.write()); + let mut _g2 = assert_pending!(t3.poll()); + + let mut t4 = spawn(rwlock.read()); + assert_pending!(t4.poll()); +} + +// when there is an active shared owner, and exclusive access is triggered, +// reading should be possible after pending exclusive access is dropped +#[test] +fn write_read_shared_drop_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + let _g1 = assert_ready!(t1.poll()); + + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); + + let mut t3 = spawn(rwlock.read()); + assert_pending!(t3.poll()); + drop(t2); + + assert!(t3.is_woken()); + assert_ready!(t3.poll()); +} + +// Acquire an RwLock nonexclusively by a single task +#[tokio::test] +async fn read_uncontested() { + let rwlock = RwLock::new(100); + let result = *rwlock.read().await; + + assert_eq!(result, 100); +} + +// Acquire an uncontested RwLock in exclusive mode +#[tokio::test] +async fn write_uncontested() { + let rwlock = RwLock::new(100); + let mut result = rwlock.write().await; + *result += 50; + assert_eq!(*result, 150); +} + +// RwLocks should be acquired in the order that their Futures are waited upon. +#[tokio::test] +async fn write_order() { + let rwlock = RwLock::<Vec<u32>>::new(vec![]); + let fut2 = rwlock.write().map(|mut guard| guard.push(2)); + let fut1 = rwlock.write().map(|mut guard| guard.push(1)); + fut1.await; + fut2.await; + + let g = rwlock.read().await; + assert_eq!(*g, vec![1, 2]); +} + +// A single RwLock is contested by tasks in multiple threads +#[tokio::test(threaded_scheduler)] +async fn multithreaded() { + let barrier = Arc::new(Barrier::new(5)); + let rwlock = Arc::new(RwLock::<u32>::new(0)); + let rwclone1 = rwlock.clone(); + let rwclone2 = rwlock.clone(); + let rwclone3 = rwlock.clone(); + let rwclone4 = rwlock.clone(); + + let b1 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone1.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 2; + } + }) + .await; + b1.wait().await; + }); + + let b2 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone2.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 3; + } + }) + .await; + b2.wait().await; + }); + + let b3 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone3.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 5; + } + }) + .await; + b3.wait().await; + }); + + let b4 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone4.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 7; + } + }) + .await; + b4.wait().await; + }); + + barrier.wait().await; + let g = rwlock.read().await; + assert_eq!(*g, 17_000); +} |