summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-09-25 18:38:13 +0300
committerGitHub <noreply@github.com>2020-09-25 08:38:13 -0700
commit444660664b96f758610a0e7201a6a1a31a0f2405 (patch)
tree0b6829c695a5a0e2cec1157f84ecbe10a0780a3c /tokio/src/sync
parentcf025ba45f68934ae2138bb75ee2a5ee50506d1b (diff)
chore: handle std `Mutex` poisoning in a shim (#2872)
As tokio does not rely on poisoning, we can avoid always unwrapping when locking by handling the `PoisonError` in the Mutex shim. Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/batch_semaphore.rs19
-rw-r--r--tokio/src/sync/broadcast.rs12
-rw-r--r--tokio/src/sync/notify.rs10
3 files changed, 17 insertions, 24 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index 9f324f9c..6305485b 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -158,7 +158,7 @@ impl Semaphore {
}
// Assign permits to the wait queue
- self.add_permits_locked(added, self.waiters.lock().unwrap());
+ self.add_permits_locked(added, self.waiters.lock());
}
/// Closes the semaphore. This prevents the semaphore from issuing new
@@ -166,7 +166,7 @@ impl Semaphore {
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
pub(crate) fn close(&self) {
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// If the semaphore's permits counter has enough permits for an
// unqueued waiter to acquire all the permits it needs immediately,
// it won't touch the wait list. Therefore, we have to set a bit on
@@ -231,7 +231,7 @@ impl Semaphore {
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
- let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
+ let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
'inner: for slot in &mut wakers[..] {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
@@ -324,7 +324,7 @@ impl Semaphore {
// counter. Otherwise, if we subtract the permits and then
// acquire the lock, we might miss additional permits being
// added while waiting for the lock.
- lock = Some(self.waiters.lock().unwrap());
+ lock = Some(self.waiters.lock());
}
match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
@@ -334,7 +334,7 @@ impl Semaphore {
if !queued {
return Ready(Ok(()));
} else if lock.is_none() {
- break self.waiters.lock().unwrap();
+ break self.waiters.lock();
}
}
break lock.expect("lock must be acquired before waiting");
@@ -484,14 +484,7 @@ impl Drop for Acquire<'_> {
// This is where we ensure safety. The future is being dropped,
// which means we must ensure that the waiter entry is no longer stored
// in the linked list.
- let mut waiters = match self.semaphore.waiters.lock() {
- Ok(lock) => lock,
- // Removing the node from the linked list is necessary to ensure
- // safety. Even if the lock was poisoned, we need to make sure it is
- // removed from the linked list before dropping it --- otherwise,
- // the list will contain a dangling pointer to this node.
- Err(e) => e.into_inner(),
- };
+ let mut waiters = self.semaphore.waiters.lock();
// remove the entry from the list
let node = NonNull::from(&mut self.node);
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index 9484e130..fe826290 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -529,7 +529,7 @@ impl<T> Sender<T> {
pub fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
- let mut tail = shared.tail.lock().unwrap();
+ let mut tail = shared.tail.lock();
if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
@@ -584,12 +584,12 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn receiver_count(&self) -> usize {
- let tail = self.shared.tail.lock().unwrap();
+ let tail = self.shared.tail.lock();
tail.rx_cnt
}
fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
- let mut tail = self.shared.tail.lock().unwrap();
+ let mut tail = self.shared.tail.lock();
if tail.rx_cnt == 0 {
return Err(SendError(value));
@@ -695,7 +695,7 @@ impl<T> Receiver<T> {
// the slot lock.
drop(slot);
- let mut tail = self.shared.tail.lock().unwrap();
+ let mut tail = self.shared.tail.lock();
// Acquire slot lock again
slot = self.shared.buffer[idx].read().unwrap();
@@ -979,7 +979,7 @@ where
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
- let mut tail = self.shared.tail.lock().unwrap();
+ let mut tail = self.shared.tail.lock();
if let Some(waiter) = &self.waiter {
// safety: tail lock is held
@@ -1142,7 +1142,7 @@ where
fn drop(&mut self) {
// Acquire the tail lock. This is required for safety before accessing
// the waiter node.
- let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap();
+ let mut tail = self.receiver.as_mut().shared.tail.lock();
// safety: tail lock is held
let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
index d319e8aa..17117bfe 100644
--- a/tokio/src/sync/notify.rs
+++ b/tokio/src/sync/notify.rs
@@ -306,7 +306,7 @@ impl Notify {
}
// There are waiters, the lock must be acquired to notify.
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
@@ -321,7 +321,7 @@ impl Notify {
/// Notifies all waiting tasks
pub(crate) fn notify_waiters(&self) {
// There are waiters, the lock must be acquired to notify.
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
@@ -452,7 +452,7 @@ impl Future for Notified<'_> {
// Acquire the lock and attempt to transition to the waiting
// state.
- let mut waiters = notify.waiters.lock().unwrap();
+ let mut waiters = notify.waiters.lock();
// Reload the state with the lock held
let mut curr = notify.state.load(SeqCst);
@@ -516,7 +516,7 @@ impl Future for Notified<'_> {
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
- let waiters = notify.waiters.lock().unwrap();
+ let waiters = notify.waiters.lock();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
@@ -564,7 +564,7 @@ impl Drop for Notified<'_> {
// longer stored in the linked list.
if let Waiting = *state {
let mut notify_state = WAITING;
- let mut waiters = notify.waiters.lock().unwrap();
+ let mut waiters = notify.waiters.lock();
// `Notify.state` may be in any of the three states (Empty, Waiting,
// Notified). It doesn't actually matter what the atomic is set to