summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mutex.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mutex.rs
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/mutex.rs')
-rw-r--r--tokio/src/sync/mutex.rs149
1 files changed, 149 insertions, 0 deletions
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
new file mode 100644
index 00000000..ae45c666
--- /dev/null
+++ b/tokio/src/sync/mutex.rs
@@ -0,0 +1,149 @@
+//! An asynchronous `Mutex`-like type.
+//!
+//! This module provides [`Mutex`], a type that acts similarly to an asynchronous `Mutex`, with one
+//! major difference: the [`MutexGuard`] returned by `lock` is not tied to the lifetime of the
+//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
+//! release it at some later point in time.
+//!
+//! This allows you to do something along the lines of:
+//!
+//! ```rust,no_run
+//! use tokio::sync::Mutex;
+//! use std::sync::Arc;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let data1 = Arc::new(Mutex::new(0));
+//! let data2 = Arc::clone(&data1);
+//!
+//! tokio::spawn(async move {
+//! let mut lock = data2.lock().await;
+//! *lock += 1;
+//! });
+//!
+//! let mut lock = data1.lock().await;
+//! *lock += 1;
+//! }
+//! ```
+//!
+//! [`Mutex`]: struct.Mutex.html
+//! [`MutexGuard`]: struct.MutexGuard.html
+
+use crate::sync::semaphore;
+
+use futures_util::future::poll_fn;
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::ops::{Deref, DerefMut};
+
+/// An asynchronous mutual exclusion primitive useful for protecting shared data
+///
+/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
+/// can only be accessed through the RAII guards returned from `lock`, which
+/// guarantees that the data is only ever accessed when the mutex is locked.
+#[derive(Debug)]
+pub struct Mutex<T> {
+ c: UnsafeCell<T>,
+ s: semaphore::Semaphore,
+}
+
+/// A handle to a held `Mutex`.
+///
+/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
+/// internally keeps a reference-couned pointer to the original `Mutex`, so even if the lock goes
+/// away, the guard remains valid.
+///
+/// The lock is automatically released whenever the guard is dropped, at which point `lock`
+/// will succeed yet again.
+#[derive(Debug)]
+pub struct MutexGuard<'a, T> {
+ lock: &'a Mutex<T>,
+ permit: semaphore::Permit,
+}
+
+// As long as T: Send, it's fine to send and share Mutex<T> between threads.
+// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can access T through
+// Mutex<T>.
+unsafe impl<T> Send for Mutex<T> where T: Send {}
+unsafe impl<T> Sync for Mutex<T> where T: Send {}
+unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
+
+#[test]
+#[cfg(not(loom))]
+fn bounds() {
+ fn check<T: Send>() {}
+ check::<MutexGuard<'_, u32>>();
+}
+
+impl<T> Mutex<T> {
+ /// Creates a new lock in an unlocked state ready for use.
+ pub fn new(t: T) -> Self {
+ Self {
+ c: UnsafeCell::new(t),
+ s: semaphore::Semaphore::new(1),
+ }
+ }
+
+ /// A future that resolves on acquiring the lock and returns the `MutexGuard`.
+ pub async fn lock(&self) -> MutexGuard<'_, T> {
+ let mut permit = semaphore::Permit::new();
+ poll_fn(|cx| permit.poll_acquire(cx, &self.s))
+ .await
+ .unwrap_or_else(|_| {
+ // The semaphore was closed. but, we never explicitly close it, and we have a
+ // handle to it through the Arc, which means that this can never happen.
+ unreachable!()
+ });
+
+ MutexGuard { lock: self, permit }
+ }
+}
+
+impl<'a, T> Drop for MutexGuard<'a, T> {
+ fn drop(&mut self) {
+ if self.permit.is_acquired() {
+ self.permit.release(&self.lock.s);
+ } else if ::std::thread::panicking() {
+ // A guard _should_ always hold its permit, but if the thread is already panicking,
+ // we don't want to generate a panic-while-panicing, since that's just unhelpful!
+ } else {
+ unreachable!("Permit not held when MutexGuard was dropped")
+ }
+ }
+}
+
+impl<T> From<T> for Mutex<T> {
+ fn from(s: T) -> Self {
+ Self::new(s)
+ }
+}
+
+impl<T> Default for Mutex<T>
+where
+ T: Default,
+{
+ fn default() -> Self {
+ Self::new(T::default())
+ }
+}
+
+impl<'a, T> Deref for MutexGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &Self::Target {
+ assert!(self.permit.is_acquired());
+ unsafe { &*self.lock.c.get() }
+ }
+}
+
+impl<'a, T> DerefMut for MutexGuard<'a, T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ assert!(self.permit.is_acquired());
+ unsafe { &mut *self.lock.c.get() }
+ }
+}
+
+impl<'a, T: fmt::Display> fmt::Display for MutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&**self, f)
+ }
+}