diff options
author | Eliza Weisman <eliza@buoyant.io> | 2020-04-29 15:48:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-29 15:48:08 -0700 |
commit | 45773c56413267cbcf9d5e7877e8dc4afc1e5b07 (patch) | |
tree | 8d1353dac8323fbc6f85558b6250051f25e4d574 /tokio/src/sync/mutex.rs | |
parent | c52b78b7925f883289853dee5748b93a89e29bb1 (diff) |
mutex: add `OwnedMutexGuard` for `Arc<Mutex<T>>`s (#2455)
This PR adds a new `OwnedMutexGuard` type and `lock_owned` and
`try_lock_owned` methods for `Arc<Mutex<T>>`. This is pretty much the
same as the similar APIs added in #2421.
I've also corrected some existing documentation that incorrectly
implied that the existing `lock` method cloned an internal `Arc` — I
think this may be a holdover from `tokio` 0.1's `Lock` type?
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/sync/mutex.rs')
-rw-r--r-- | tokio/src/sync/mutex.rs | 188 |
1 files changed, 160 insertions, 28 deletions
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 69eec678..e0618a5d 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -5,6 +5,7 @@ use std::cell::UnsafeCell; use std::error::Error; use std::fmt; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; /// An asynchronous `Mutex`-like type. /// @@ -18,8 +19,8 @@ use std::ops::{Deref, DerefMut}; /// 1. The lock does not need to be held across await points. /// 2. The duration of any single lock is near-instant. /// -/// On the other hand, the Tokio mutex is for the situation where the lock needs -/// to be held for longer periods of time, or across await points. +/// On the other hand, the Tokio mutex is for the situation where the lock +/// needs to be held for longer periods of time, or across await points. /// /// # Examples: /// @@ -71,18 +72,20 @@ use std::ops::{Deref, DerefMut}; /// } /// ``` /// There are a few things of note here to pay attention to in this example. -/// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across threads. +/// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across +/// threads. /// 2. Each spawned task obtains a lock and releases it on every iteration. -/// 3. Mutation of the data protected by the Mutex is done by de-referencing the obtained lock -/// as seen on lines 12 and 19. +/// 3. Mutation of the data protected by the Mutex is done by de-referencing +/// the obtained lock as seen on lines 12 and 19. /// -/// Tokio's Mutex works in a simple FIFO (first in, first out) style where all calls -/// to [`lock`] complete in the order they were performed. In that way -/// the Mutex is "fair" and predictable in how it distributes the locks to inner data. This is why -/// the output of the program above is an in-order count to 50. Locks are released and reacquired -/// after every iteration, so basically, each thread goes to the back of the line after it increments -/// the value once. Finally, since there is only a single valid lock at any given time, there is no -/// possibility of a race condition when mutating the inner value. +/// Tokio's Mutex works in a simple FIFO (first in, first out) style where all +/// calls to [`lock`] complete in the order they were performed. In that way the +/// Mutex is "fair" and predictable in how it distributes the locks to inner +/// data. This is why the output of the program above is an in-order count to +/// 50. Locks are released and reacquired after every iteration, so basically, +/// each thread goes to the back of the line after it increments the value once. +/// Finally, since there is only a single valid lock at any given time, there is +/// no possibility of a race condition when mutating the inner value. /// /// Note that in contrast to [`std::sync::Mutex`], this implementation does not /// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a @@ -104,22 +107,42 @@ pub struct Mutex<T> { /// A handle to a held `Mutex`. /// -/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard -/// internally keeps a reference-couned pointer to the original `Mutex`, so even if the lock goes -/// away, the guard remains valid. +/// As long as you have this guard, you have exclusive access to the underlying +/// `T`. The guard internally borrows the `Mutex`, so the mutex will not be +/// dropped while a guard exists. /// -/// The lock is automatically released whenever the guard is dropped, at which point `lock` -/// will succeed yet again. +/// The lock is automatically released whenever the guard is dropped, at which +/// point `lock` will succeed yet again. pub struct MutexGuard<'a, T> { lock: &'a Mutex<T>, } +/// An owned handle to a held `Mutex`. +/// +/// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It +/// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`, +/// it clones the `Arc`, incrementing the reference count. This means that +/// unlike `MutexGuard`, it will have the `'static` lifetime. +/// +/// As long as you have this guard, you have exclusive access to the underlying +/// `T`. The guard internally keeps a reference-couned pointer to the original +/// `Mutex`, so even if the lock goes away, the guard remains valid. +/// +/// The lock is automatically released whenever the guard is dropped, at which +/// point `lock` will succeed yet again. +/// +/// [`Arc`]: std::sync::Arc +pub struct OwnedMutexGuard<T> { + lock: Arc<Mutex<T>>, +} + // As long as T: Send, it's fine to send and share Mutex<T> between threads. -// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can access T through -// Mutex<T>. +// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can +// access T through Mutex<T>. unsafe impl<T> Send for Mutex<T> where T: Send {} unsafe impl<T> Sync for Mutex<T> where T: Send {} unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {} +unsafe impl<T> Sync for OwnedMutexGuard<T> where T: Send + Sync {} /// Error returned from the [`Mutex::try_lock`] function. /// @@ -145,12 +168,20 @@ fn bounds() { // This has to take a value, since the async fn's return type is unnameable. fn check_send_sync_val<T: Send + Sync>(_t: T) {} fn check_send_sync<T: Send + Sync>() {} + fn check_static<T: 'static>() {} + fn check_static_val<T: 'static>(_t: T) {} + check_send::<MutexGuard<'_, u32>>(); + check_send::<OwnedMutexGuard<u32>>(); check_unpin::<Mutex<u32>>(); check_send_sync::<Mutex<u32>>(); + check_static::<OwnedMutexGuard<u32>>(); let mutex = Mutex::new(1); check_send_sync_val(mutex.lock()); + let arc_mutex = Arc::new(Mutex::new(1)); + check_send_sync_val(arc_mutex.clone().lock_owned()); + check_static_val(arc_mutex.lock_owned()); } impl<T> Mutex<T> { @@ -188,12 +219,47 @@ impl<T> Mutex<T> { /// } /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { + self.acquire().await; + MutexGuard { lock: self } + } + + /// Locks this mutex, causing the current task to yield until the lock has + /// been acquired. When the lock has been acquired, this returns an + /// [`OwnedMutexGuard`]. + /// + /// This method is identical to [`Mutex::lock`], except that the returned + /// guard references the `Mutex` with an [`Arc`] rather than by borrowing + /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this + /// method, and the guard will live for the `'static` lifetime, as it keeps + /// the `Mutex` alive by holding an `Arc`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let mutex = Arc::new(Mutex::new(1)); + /// + /// let mut n = mutex.clone().lock_owned().await; + /// *n = 2; + /// } + /// ``` + /// + /// [`Arc`]: std::sync::Arc + pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> { + self.acquire().await; + OwnedMutexGuard { lock: self } + } + + async fn acquire(&self) { self.s.acquire(1).cooperate().await.unwrap_or_else(|_| { - // The semaphore was closed. but, we never explicitly close it, and we have a - // handle to it through the Arc, which means that this can never happen. + // The semaphore was closed. but, we never explicitly close it, and + // we own it exclusively, which means that this can never happen. unreachable!() }); - MutexGuard { lock: self } } /// Attempts to acquire the lock, and returns [`TryLockError`] if the @@ -220,6 +286,37 @@ impl<T> Mutex<T> { } } + /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock + /// is currently held somewhere else. + /// + /// This method is identical to [`Mutex::try_lock`], except that the + /// returned guard references the `Mutex` with an [`Arc`] rather than by + /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call + /// this method, and the guard will live for the `'static` lifetime, as it + /// keeps the `Mutex` alive by holding an `Arc`. + /// + /// [`TryLockError`]: TryLockError + /// [`Arc`]: std::sync::Arc + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// use std::sync::Arc; + /// # async fn dox() -> Result<(), tokio::sync::TryLockError> { + /// + /// let mutex = Arc::new(Mutex::new(1)); + /// + /// let n = mutex.clone().try_lock_owned()?; + /// assert_eq!(*n, 1); + /// # Ok(()) + /// # } + pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> { + match self.s.try_acquire(1) { + Ok(_) => Ok(OwnedMutexGuard { lock: self }), + Err(_) => Err(TryLockError(())), + } + } + /// Consumes the mutex, returning the underlying data. /// # Examples /// @@ -239,12 +336,6 @@ impl<T> Mutex<T> { } } -impl<'a, T> Drop for MutexGuard<'a, T> { - fn drop(&mut self) { - self.lock.s.release(1) - } -} - impl<T> From<T> for Mutex<T> { fn from(s: T) -> Self { Self::new(s) @@ -260,6 +351,14 @@ where } } +// === impl MutexGuard === + +impl<'a, T> Drop for MutexGuard<'a, T> { + fn drop(&mut self) { + self.lock.s.release(1) + } +} + impl<'a, T> Deref for MutexGuard<'a, T> { type Target = T; fn deref(&self) -> &Self::Target { @@ -284,3 +383,36 @@ impl<'a, T: fmt::Display> fmt::Display for MutexGuard<'a, T> { fmt::Display::fmt(&**self, f) } } + +// === impl OwnedMutexGuard === + +impl<T> Drop for OwnedMutexGuard<T> { + fn drop(&mut self) { + self.lock.s.release(1) + } +} + +impl<T> Deref for OwnedMutexGuard<T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.lock.c.get() } + } +} + +impl<T> DerefMut for OwnedMutexGuard<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.lock.c.get() } + } +} + +impl<T: fmt::Debug> fmt::Debug for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<T: fmt::Display> fmt::Display for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} |