summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorJoão Oliveira <hello@jxs.pt>2020-01-04 05:03:26 +0000
committerCarl Lerche <me@carllerche.com>2020-01-03 21:03:26 -0800
commit32e15b3a24ac177c92a78eb04e233534583eae17 (patch)
treed64a660fc2e497288880994f61c778c84e2e8edf /tokio
parentefcbf9613f2d5048550f9c828e3be422644f1391 (diff)
sync: add RwLock (#1699)
Provides a `RwLock` based on a semaphore. The semaphore is initialized with 32 permits. A read acquires a single permit and a write acquires all 32 permits. This ensures that reads (up to 32) may happen concurrently and writes happen exclusively.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/sync/mod.rs3
-rw-r--r--tokio/src/sync/rwlock.rs256
-rw-r--r--tokio/src/sync/tests/loom_rwlock.rs78
-rw-r--r--tokio/tests/sync_rwlock.rs231
4 files changed, 568 insertions, 0 deletions
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index 6ed91607..beb96920 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -32,6 +32,9 @@ cfg_sync! {
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit};
+ mod rwlock;
+ pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+
mod task;
pub(crate) use task::AtomicWaker;
diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs
new file mode 100644
index 00000000..ad27dbff
--- /dev/null
+++ b/tokio/src/sync/rwlock.rs
@@ -0,0 +1,256 @@
+use crate::future::poll_fn;
+use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore};
+use std::cell::UnsafeCell;
+use std::ops;
+use std::task::{Context, Poll};
+
+#[cfg(not(loom))]
+const MAX_READS: usize = 32;
+
+#[cfg(loom)]
+const MAX_READS: usize = 10;
+
+/// An asynchronous reader-writer lock
+///
+/// This type of lock allows a number of readers or at most one writer at any
+/// point in time. The write portion of this lock typically allows modification
+/// of the underlying data (exclusive access) and the read portion of this lock
+/// typically allows for read-only access (shared access).
+///
+/// In comparison, a [`Mutex`] does not distinguish between readers or writers
+/// that acquire the lock, therefore blocking any tasks waiting for the lock to
+/// become available. An `RwLock` will allow any number of readers to acquire the
+/// lock as long as a writer is not holding the lock.
+///
+/// The priority policy of the lock is dependent on the underlying operating
+/// system's implementation, and this type does not guarantee that any
+/// particular policy will be used.
+///
+/// The type parameter `T` represents the data that this lock protects. It is
+/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
+/// returned from the locking methods implement [`Deref`](https://doc.rust-lang.org/std/ops/trait.Deref.html)
+/// (and [`DerefMut`](https://doc.rust-lang.org/std/ops/trait.DerefMut.html)
+/// for the `write` methods) to allow access to the content of the lock.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::RwLock;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let lock = RwLock::new(5);
+///
+/// // many reader locks can be held at once
+/// {
+/// let r1 = lock.read().await;
+/// let r2 = lock.read().await;
+/// assert_eq!(*r1, 5);
+/// assert_eq!(*r2, 5);
+/// } // read locks are dropped at this point
+///
+/// // only one write lock may be held, however
+/// {
+/// let mut w = lock.write().await;
+/// *w += 1;
+/// assert_eq!(*w, 6);
+/// } // write lock is dropped here
+/// }
+/// ```
+///
+/// [`Mutex`]: struct.Mutex.html
+/// [`RwLock`]: struct.RwLock.html
+/// [`RwLockReadGuard`]: struct.RwLockReadGuard.html
+/// [`RwLockWriteGuard`]: struct.RwLockWriteGuard.html
+/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
+#[derive(Debug)]
+pub struct RwLock<T> {
+ //semaphore to coordinate read and write access to T
+ s: Semaphore,
+
+ //inner data T
+ c: UnsafeCell<T>,
+}
+
+/// RAII structure used to release the shared read access of a lock when
+/// dropped.
+///
+/// This structure is created by the [`read`] method on
+/// [`RwLock`].
+///
+/// [`read`]: struct.RwLock.html#method.read
+#[derive(Debug)]
+pub struct RwLockReadGuard<'a, T> {
+ permit: ReleasingPermit<'a, T>,
+ lock: &'a RwLock<T>,
+}
+
+/// RAII structure used to release the exclusive write access of a lock when
+/// dropped.
+///
+/// This structure is created by the [`write`] and method
+/// on [`RwLock`].
+///
+/// [`write`]: struct.RwLock.html#method.write
+/// [`RwLock`]: struct.RwLock.html
+#[derive(Debug)]
+pub struct RwLockWriteGuard<'a, T> {
+ permit: ReleasingPermit<'a, T>,
+ lock: &'a RwLock<T>,
+}
+
+// Wrapper arround Permit that releases on Drop
+#[derive(Debug)]
+struct ReleasingPermit<'a, T> {
+ num_permits: u16,
+ permit: Permit,
+ lock: &'a RwLock<T>,
+}
+
+impl<'a, T> ReleasingPermit<'a, T> {
+ fn poll_acquire(
+ &mut self,
+ cx: &mut Context<'_>,
+ s: &Semaphore,
+ ) -> Poll<Result<(), AcquireError>> {
+ self.permit.poll_acquire(cx, self.num_permits, s)
+ }
+}
+
+impl<'a, T> Drop for ReleasingPermit<'a, T> {
+ fn drop(&mut self) {
+ self.permit.release(self.num_permits, &self.lock.s);
+ }
+}
+
+// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
+// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
+// RwLock<T>.
+unsafe impl<T> Send for RwLock<T> where T: Send {}
+unsafe impl<T> Sync for RwLock<T> where T: Send + Sync {}
+unsafe impl<'a, T> Sync for RwLockReadGuard<'a, T> where T: Send + Sync {}
+unsafe impl<'a, T> Sync for RwLockWriteGuard<'a, T> where T: Send + Sync {}
+
+impl<T> RwLock<T> {
+ /// Creates a new instance of an `RwLock<T>` which is unlocked.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// let lock = RwLock::new(5);
+ /// ```
+ pub fn new(value: T) -> RwLock<T> {
+ RwLock {
+ c: UnsafeCell::new(value),
+ s: Semaphore::new(MAX_READS),
+ }
+ }
+
+ /// Locks this rwlock with shared read access, blocking the current task
+ /// until it can be acquired.
+ ///
+ /// The calling task will be blocked until there are no more writers which
+ /// hold the lock. There may be other readers currently inside the lock when
+ /// this method returns.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let n = lock.read().await;
+ /// assert_eq!(*n, 1);
+ ///
+ /// tokio::spawn(async move {
+ /// let r = c_lock.read().await;
+ /// assert_eq!(*r, 1);
+ /// });
+ ///}
+ /// ```
+ pub async fn read(&self) -> RwLockReadGuard<'_, T> {
+ let mut permit = ReleasingPermit {
+ num_permits: 1,
+ permit: Permit::new(),
+ lock: self,
+ };
+
+ poll_fn(|cx| permit.poll_acquire(cx, &self.s))
+ .await
+ .unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+ RwLockReadGuard { lock: self, permit }
+ }
+
+ /// Locks this rwlock with exclusive write access, blocking the current
+ /// task until it can be acquired.
+ ///
+ /// This function will not return while other writers or other readers
+ /// currently have access to the lock.
+ ///
+ /// Returns an RAII guard which will drop the write access of this rwlock
+ /// when dropped.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = RwLock::new(1);
+ ///
+ /// let mut n = lock.write().await;
+ /// *n = 2;
+ ///}
+ /// ```
+ pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
+ let mut permit = ReleasingPermit {
+ num_permits: MAX_READS as u16,
+ permit: Permit::new(),
+ lock: self,
+ };
+
+ poll_fn(|cx| permit.poll_acquire(cx, &self.s))
+ .await
+ .unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+
+ RwLockWriteGuard { lock: self, permit }
+ }
+}
+
+impl<T> ops::Deref for RwLockReadGuard<'_, T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &*self.lock.c.get() }
+ }
+}
+
+impl<T> ops::Deref for RwLockWriteGuard<'_, T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &*self.lock.c.get() }
+ }
+}
+
+impl<T> ops::DerefMut for RwLockWriteGuard<'_, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.lock.c.get() }
+ }
+}
diff --git a/tokio/src/sync/tests/loom_rwlock.rs b/tokio/src/sync/tests/loom_rwlock.rs
new file mode 100644
index 00000000..48d06e1d
--- /dev/null
+++ b/tokio/src/sync/tests/loom_rwlock.rs
@@ -0,0 +1,78 @@
+use crate::sync::rwlock::*;
+
+use loom::future::block_on;
+use loom::thread;
+use std::sync::Arc;
+
+#[test]
+fn concurrent_write() {
+ let mut b = loom::model::Builder::new();
+
+ b.check(|| {
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+
+ let rwclone = rwlock.clone();
+ let t1 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t2 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ t1.join().expect("thread 1 write should not panic");
+ t2.join().expect("thread 2 write should not panic");
+ //when all threads have finished the value on the lock should be 10
+ let guard = block_on(rwlock.read());
+ assert_eq!(10, *guard);
+ });
+}
+
+#[test]
+fn concurrent_read_write() {
+ let mut b = loom::model::Builder::new();
+
+ b.check(|| {
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+
+ let rwclone = rwlock.clone();
+ let t1 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t2 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t3 = thread::spawn(move || {
+ block_on(async {
+ let guard = rwclone.read().await;
+ //at this state the value on the lock may either be 0, 5, or 10
+ assert!(*guard == 0 || *guard == 5 || *guard == 10);
+ });
+ });
+
+ t1.join().expect("thread 1 write should not panic");
+ t2.join().expect("thread 2 write should not panic");
+ t3.join().expect("thread 3 read should not panic");
+
+ let guard = block_on(rwlock.read());
+ //when all threads have finished the value on the lock should be 10
+ assert_eq!(10, *guard);
+ });
+}
diff --git a/tokio/tests/sync_rwlock.rs b/tokio/tests/sync_rwlock.rs
new file mode 100644
index 00000000..b6bbf060
--- /dev/null
+++ b/tokio/tests/sync_rwlock.rs
@@ -0,0 +1,231 @@
+#![warn(rust_2018_idioms)]
+
+use std::sync::Arc;
+use std::task::Poll;
+
+use futures::future::FutureExt;
+use futures::stream;
+use futures::stream::StreamExt;
+
+use tokio::sync::{Barrier, RwLock};
+use tokio_test::task::spawn;
+use tokio_test::{assert_pending, assert_ready};
+
+// multiple reads should be Ready
+#[test]
+fn read_shared() {
+ let rwlock = RwLock::new(100);
+
+ let mut t1 = spawn(rwlock.read());
+ let _g1 = assert_ready!(t1.poll());
+ let mut t2 = spawn(rwlock.read());
+ assert_ready!(t2.poll());
+}
+
+// When there is an active shared owner, exclusive access should not be possible
+#[test]
+fn write_shared_pending() {
+ let rwlock = RwLock::new(100);
+ let mut t1 = spawn(rwlock.read());
+
+ let _g1 = assert_ready!(t1.poll());
+ let mut t2 = spawn(rwlock.write());
+ assert_pending!(t2.poll());
+}
+
+// When there is an active exclusive owner, subsequent exclusive access should not be possible
+#[test]
+fn read_exclusive_pending() {
+ let rwlock = RwLock::new(100);
+ let mut t1 = spawn(rwlock.write());
+
+ let _g1 = assert_ready!(t1.poll());
+ let mut t2 = spawn(rwlock.read());
+ assert_pending!(t2.poll());
+}
+
+// If the max shared access is reached and subsquent shared access is pending
+// should be made available when one of the shared acesses is dropped
+#[test]
+fn exhaust_reading() {
+ let rwlock = RwLock::new(100);
+ let mut reads = Vec::new();
+ loop {
+ let mut t = spawn(rwlock.read());
+ match t.poll() {
+ Poll::Ready(guard) => reads.push(guard),
+ Poll::Pending => break,
+ }
+ }
+
+ let mut t1 = spawn(rwlock.read());
+ assert_pending!(t1.poll());
+ let g2 = reads.pop().unwrap();
+ drop(g2);
+ assert!(t1.is_woken());
+ assert_ready!(t1.poll());
+}
+
+// When there is an active exclusive owner, subsequent exclusive access should not be possible
+#[test]
+fn write_exclusive_pending() {
+ let rwlock = RwLock::new(100);
+ let mut t1 = spawn(rwlock.write());
+
+ let _g1 = assert_ready!(t1.poll());
+ let mut t2 = spawn(rwlock.write());
+ assert_pending!(t2.poll());
+}
+
+// When there is an active shared owner, exclusive access should be possible after shared is dropped
+#[test]
+fn write_shared_drop() {
+ let rwlock = RwLock::new(100);
+ let mut t1 = spawn(rwlock.read());
+
+ let g1 = assert_ready!(t1.poll());
+ let mut t2 = spawn(rwlock.write());
+ assert_pending!(t2.poll());
+ drop(g1);
+ assert!(t2.is_woken());
+ assert_ready!(t2.poll());
+}
+
+// when there is an active shared owner, and exclusive access is triggered,
+// subsequent shared access should not be possible as write gathers all the available semaphore permits
+#[test]
+fn write_read_shared_pending() {
+ let rwlock = RwLock::new(100);
+ let mut t1 = spawn(rwlock.read());
+ let _g1 = assert_ready!(t1.poll());
+
+ let mut t2 = spawn(rwlock.read());
+ assert_ready!(t2.poll());
+
+ let mut t3 = spawn(rwlock.write());
+ let mut _g2 = assert_pending!(t3.poll());
+
+ let mut t4 = spawn(rwlock.read());
+ assert_pending!(t4.poll());
+}
+
+// when there is an active shared owner, and exclusive access is triggered,
+// reading should be possible after pending exclusive access is dropped
+#[test]
+fn write_read_shared_drop_pending() {
+ let rwlock = RwLock::new(100);
+ let mut t1 = spawn(rwlock.read());
+ let _g1 = assert_ready!(t1.poll());
+
+ let mut t2 = spawn(rwlock.write());
+ assert_pending!(t2.poll());
+
+ let mut t3 = spawn(rwlock.read());
+ assert_pending!(t3.poll());
+ drop(t2);
+
+ assert!(t3.is_woken());
+ assert_ready!(t3.poll());
+}
+
+// Acquire an RwLock nonexclusively by a single task
+#[tokio::test]
+async fn read_uncontested() {
+ let rwlock = RwLock::new(100);
+ let result = *rwlock.read().await;
+
+ assert_eq!(result, 100);
+}
+
+// Acquire an uncontested RwLock in exclusive mode
+#[tokio::test]
+async fn write_uncontested() {
+ let rwlock = RwLock::new(100);
+ let mut result = rwlock.write().await;
+ *result += 50;
+ assert_eq!(*result, 150);
+}
+
+// RwLocks should be acquired in the order that their Futures are waited upon.
+#[tokio::test]
+async fn write_order() {
+ let rwlock = RwLock::<Vec<u32>>::new(vec![]);
+ let fut2 = rwlock.write().map(|mut guard| guard.push(2));
+ let fut1 = rwlock.write().map(|mut guard| guard.push(1));
+ fut1.await;
+ fut2.await;
+
+ let g = rwlock.read().await;
+ assert_eq!(*g, vec![1, 2]);
+}
+
+// A single RwLock is contested by tasks in multiple threads
+#[tokio::test(threaded_scheduler)]
+async fn multithreaded() {
+ let barrier = Arc::new(Barrier::new(5));
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+ let rwclone1 = rwlock.clone();
+ let rwclone2 = rwlock.clone();
+ let rwclone3 = rwlock.clone();
+ let rwclone4 = rwlock.clone();
+
+ let b1 = barrier.clone();
+ tokio::spawn(async move {
+ stream::iter(0..1000)
+ .for_each(move |_| {
+ let rwlock = rwclone1.clone();
+ async move {
+ let mut guard = rwlock.write().await;
+ *guard += 2;
+ }
+ })
+ .await;
+ b1.wait().await;
+ });
+
+ let b2 = barrier.clone();
+ tokio::spawn(async move {
+ stream::iter(0..1000)
+ .for_each(move |_| {
+ let rwlock = rwclone2.clone();
+ async move {
+ let mut guard = rwlock.write().await;
+ *guard += 3;
+ }
+ })
+ .await;
+ b2.wait().await;
+ });
+
+ let b3 = barrier.clone();
+ tokio::spawn(async move {
+ stream::iter(0..1000)
+ .for_each(move |_| {
+ let rwlock = rwclone3.clone();
+ async move {
+ let mut guard = rwlock.write().await;
+ *guard += 5;
+ }
+ })
+ .await;
+ b3.wait().await;
+ });
+
+ let b4 = barrier.clone();
+ tokio::spawn(async move {
+ stream::iter(0..1000)
+ .for_each(move |_| {
+ let rwlock = rwclone4.clone();
+ async move {
+ let mut guard = rwlock.write().await;
+ *guard += 7;
+ }
+ })
+ .await;
+ b4.wait().await;
+ });
+
+ barrier.wait().await;
+ let g = rwlock.read().await;
+ assert_eq!(*g, 17_000);
+}