summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mutex.rs
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2020-04-29 15:48:08 -0700
committerGitHub <noreply@github.com>2020-04-29 15:48:08 -0700
commit45773c56413267cbcf9d5e7877e8dc4afc1e5b07 (patch)
tree8d1353dac8323fbc6f85558b6250051f25e4d574 /tokio/src/sync/mutex.rs
parentc52b78b7925f883289853dee5748b93a89e29bb1 (diff)
mutex: add `OwnedMutexGuard` for `Arc<Mutex<T>>`s (#2455)
This PR adds a new `OwnedMutexGuard` type and `lock_owned` and `try_lock_owned` methods for `Arc<Mutex<T>>`. This is pretty much the same as the similar APIs added in #2421. I've also corrected some existing documentation that incorrectly implied that the existing `lock` method cloned an internal `Arc` — I think this may be a holdover from `tokio` 0.1's `Lock` type? Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/sync/mutex.rs')
-rw-r--r--tokio/src/sync/mutex.rs188
1 files changed, 160 insertions, 28 deletions
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index 69eec678..e0618a5d 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -5,6 +5,7 @@ use std::cell::UnsafeCell;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
+use std::sync::Arc;
/// An asynchronous `Mutex`-like type.
///
@@ -18,8 +19,8 @@ use std::ops::{Deref, DerefMut};
/// 1. The lock does not need to be held across await points.
/// 2. The duration of any single lock is near-instant.
///
-/// On the other hand, the Tokio mutex is for the situation where the lock needs
-/// to be held for longer periods of time, or across await points.
+/// On the other hand, the Tokio mutex is for the situation where the lock
+/// needs to be held for longer periods of time, or across await points.
///
/// # Examples:
///
@@ -71,18 +72,20 @@ use std::ops::{Deref, DerefMut};
/// }
/// ```
/// There are a few things of note here to pay attention to in this example.
-/// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across threads.
+/// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across
+/// threads.
/// 2. Each spawned task obtains a lock and releases it on every iteration.
-/// 3. Mutation of the data protected by the Mutex is done by de-referencing the obtained lock
-/// as seen on lines 12 and 19.
+/// 3. Mutation of the data protected by the Mutex is done by de-referencing
+/// the obtained lock as seen on lines 12 and 19.
///
-/// Tokio's Mutex works in a simple FIFO (first in, first out) style where all calls
-/// to [`lock`] complete in the order they were performed. In that way
-/// the Mutex is "fair" and predictable in how it distributes the locks to inner data. This is why
-/// the output of the program above is an in-order count to 50. Locks are released and reacquired
-/// after every iteration, so basically, each thread goes to the back of the line after it increments
-/// the value once. Finally, since there is only a single valid lock at any given time, there is no
-/// possibility of a race condition when mutating the inner value.
+/// Tokio's Mutex works in a simple FIFO (first in, first out) style where all
+/// calls to [`lock`] complete in the order they were performed. In that way the
+/// Mutex is "fair" and predictable in how it distributes the locks to inner
+/// data. This is why the output of the program above is an in-order count to
+/// 50. Locks are released and reacquired after every iteration, so basically,
+/// each thread goes to the back of the line after it increments the value once.
+/// Finally, since there is only a single valid lock at any given time, there is
+/// no possibility of a race condition when mutating the inner value.
///
/// Note that in contrast to [`std::sync::Mutex`], this implementation does not
/// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a
@@ -104,22 +107,42 @@ pub struct Mutex<T> {
/// A handle to a held `Mutex`.
///
-/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
-/// internally keeps a reference-couned pointer to the original `Mutex`, so even if the lock goes
-/// away, the guard remains valid.
+/// As long as you have this guard, you have exclusive access to the underlying
+/// `T`. The guard internally borrows the `Mutex`, so the mutex will not be
+/// dropped while a guard exists.
///
-/// The lock is automatically released whenever the guard is dropped, at which point `lock`
-/// will succeed yet again.
+/// The lock is automatically released whenever the guard is dropped, at which
+/// point `lock` will succeed yet again.
pub struct MutexGuard<'a, T> {
lock: &'a Mutex<T>,
}
+/// An owned handle to a held `Mutex`.
+///
+/// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It
+/// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`,
+/// it clones the `Arc`, incrementing the reference count. This means that
+/// unlike `MutexGuard`, it will have the `'static` lifetime.
+///
+/// As long as you have this guard, you have exclusive access to the underlying
+/// `T`. The guard internally keeps a reference-couned pointer to the original
+/// `Mutex`, so even if the lock goes away, the guard remains valid.
+///
+/// The lock is automatically released whenever the guard is dropped, at which
+/// point `lock` will succeed yet again.
+///
+/// [`Arc`]: std::sync::Arc
+pub struct OwnedMutexGuard<T> {
+ lock: Arc<Mutex<T>>,
+}
+
// As long as T: Send, it's fine to send and share Mutex<T> between threads.
-// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can access T through
-// Mutex<T>.
+// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can
+// access T through Mutex<T>.
unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
+unsafe impl<T> Sync for OwnedMutexGuard<T> where T: Send + Sync {}
/// Error returned from the [`Mutex::try_lock`] function.
///
@@ -145,12 +168,20 @@ fn bounds() {
// This has to take a value, since the async fn's return type is unnameable.
fn check_send_sync_val<T: Send + Sync>(_t: T) {}
fn check_send_sync<T: Send + Sync>() {}
+ fn check_static<T: 'static>() {}
+ fn check_static_val<T: 'static>(_t: T) {}
+
check_send::<MutexGuard<'_, u32>>();
+ check_send::<OwnedMutexGuard<u32>>();
check_unpin::<Mutex<u32>>();
check_send_sync::<Mutex<u32>>();
+ check_static::<OwnedMutexGuard<u32>>();
let mutex = Mutex::new(1);
check_send_sync_val(mutex.lock());
+ let arc_mutex = Arc::new(Mutex::new(1));
+ check_send_sync_val(arc_mutex.clone().lock_owned());
+ check_static_val(arc_mutex.lock_owned());
}
impl<T> Mutex<T> {
@@ -188,12 +219,47 @@ impl<T> Mutex<T> {
/// }
/// ```
pub async fn lock(&self) -> MutexGuard<'_, T> {
+ self.acquire().await;
+ MutexGuard { lock: self }
+ }
+
+ /// Locks this mutex, causing the current task to yield until the lock has
+ /// been acquired. When the lock has been acquired, this returns an
+ /// [`OwnedMutexGuard`].
+ ///
+ /// This method is identical to [`Mutex::lock`], except that the returned
+ /// guard references the `Mutex` with an [`Arc`] rather than by borrowing
+ /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this
+ /// method, and the guard will live for the `'static` lifetime, as it keeps
+ /// the `Mutex` alive by holding an `Arc`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Mutex;
+ /// use std::sync::Arc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mutex = Arc::new(Mutex::new(1));
+ ///
+ /// let mut n = mutex.clone().lock_owned().await;
+ /// *n = 2;
+ /// }
+ /// ```
+ ///
+ /// [`Arc`]: std::sync::Arc
+ pub async fn lock_owned(self: Arc<Self>) -> OwnedMutexGuard<T> {
+ self.acquire().await;
+ OwnedMutexGuard { lock: self }
+ }
+
+ async fn acquire(&self) {
self.s.acquire(1).cooperate().await.unwrap_or_else(|_| {
- // The semaphore was closed. but, we never explicitly close it, and we have a
- // handle to it through the Arc, which means that this can never happen.
+ // The semaphore was closed. but, we never explicitly close it, and
+ // we own it exclusively, which means that this can never happen.
unreachable!()
});
- MutexGuard { lock: self }
}
/// Attempts to acquire the lock, and returns [`TryLockError`] if the
@@ -220,6 +286,37 @@ impl<T> Mutex<T> {
}
}
+ /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock
+ /// is currently held somewhere else.
+ ///
+ /// This method is identical to [`Mutex::try_lock`], except that the
+ /// returned guard references the `Mutex` with an [`Arc`] rather than by
+ /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call
+ /// this method, and the guard will live for the `'static` lifetime, as it
+ /// keeps the `Mutex` alive by holding an `Arc`.
+ ///
+ /// [`TryLockError`]: TryLockError
+ /// [`Arc`]: std::sync::Arc
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Mutex;
+ /// use std::sync::Arc;
+ /// # async fn dox() -> Result<(), tokio::sync::TryLockError> {
+ ///
+ /// let mutex = Arc::new(Mutex::new(1));
+ ///
+ /// let n = mutex.clone().try_lock_owned()?;
+ /// assert_eq!(*n, 1);
+ /// # Ok(())
+ /// # }
+ pub fn try_lock_owned(self: Arc<Self>) -> Result<OwnedMutexGuard<T>, TryLockError> {
+ match self.s.try_acquire(1) {
+ Ok(_) => Ok(OwnedMutexGuard { lock: self }),
+ Err(_) => Err(TryLockError(())),
+ }
+ }
+
/// Consumes the mutex, returning the underlying data.
/// # Examples
///
@@ -239,12 +336,6 @@ impl<T> Mutex<T> {
}
}
-impl<'a, T> Drop for MutexGuard<'a, T> {
- fn drop(&mut self) {
- self.lock.s.release(1)
- }
-}
-
impl<T> From<T> for Mutex<T> {
fn from(s: T) -> Self {
Self::new(s)
@@ -260,6 +351,14 @@ where
}
}
+// === impl MutexGuard ===
+
+impl<'a, T> Drop for MutexGuard<'a, T> {
+ fn drop(&mut self) {
+ self.lock.s.release(1)
+ }
+}
+
impl<'a, T> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
@@ -284,3 +383,36 @@ impl<'a, T: fmt::Display> fmt::Display for MutexGuard<'a, T> {
fmt::Display::fmt(&**self, f)
}
}
+
+// === impl OwnedMutexGuard ===
+
+impl<T> Drop for OwnedMutexGuard<T> {
+ fn drop(&mut self) {
+ self.lock.s.release(1)
+ }
+}
+
+impl<T> Deref for OwnedMutexGuard<T> {
+ type Target = T;
+ fn deref(&self) -> &Self::Target {
+ unsafe { &*self.lock.c.get() }
+ }
+}
+
+impl<T> DerefMut for OwnedMutexGuard<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ unsafe { &mut *self.lock.c.get() }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&**self, f)
+ }
+}
+
+impl<T: fmt::Display> fmt::Display for OwnedMutexGuard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&**self, f)
+ }
+}