diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-09-25 18:38:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-25 08:38:13 -0700 |
commit | 444660664b96f758610a0e7201a6a1a31a0f2405 (patch) | |
tree | 0b6829c695a5a0e2cec1157f84ecbe10a0780a3c /tokio/src/sync | |
parent | cf025ba45f68934ae2138bb75ee2a5ee50506d1b (diff) |
chore: handle std `Mutex` poisoning in a shim (#2872)
As tokio does not rely on poisoning, we can
avoid always unwrapping when locking by handling
the `PoisonError` in the Mutex shim.
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/batch_semaphore.rs | 19 | ||||
-rw-r--r-- | tokio/src/sync/broadcast.rs | 12 | ||||
-rw-r--r-- | tokio/src/sync/notify.rs | 10 |
3 files changed, 17 insertions, 24 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 9f324f9c..6305485b 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -158,7 +158,7 @@ impl Semaphore { } // Assign permits to the wait queue - self.add_permits_locked(added, self.waiters.lock().unwrap()); + self.add_permits_locked(added, self.waiters.lock()); } /// Closes the semaphore. This prevents the semaphore from issuing new @@ -166,7 +166,7 @@ impl Semaphore { // This will be used once the bounded MPSC is updated to use the new // semaphore implementation. pub(crate) fn close(&self) { - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // If the semaphore's permits counter has enough permits for an // unqueued waiter to acquire all the permits it needs immediately, // it won't touch the wait list. Therefore, we have to set a bit on @@ -231,7 +231,7 @@ impl Semaphore { let mut lock = Some(waiters); let mut is_empty = false; while rem > 0 { - let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap()); + let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); 'inner: for slot in &mut wakers[..] { // Was the waiter assigned enough permits to wake it? match waiters.queue.last() { @@ -324,7 +324,7 @@ impl Semaphore { // counter. Otherwise, if we subtract the permits and then // acquire the lock, we might miss additional permits being // added while waiting for the lock. - lock = Some(self.waiters.lock().unwrap()); + lock = Some(self.waiters.lock()); } match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { @@ -334,7 +334,7 @@ impl Semaphore { if !queued { return Ready(Ok(())); } else if lock.is_none() { - break self.waiters.lock().unwrap(); + break self.waiters.lock(); } } break lock.expect("lock must be acquired before waiting"); @@ -484,14 +484,7 @@ impl Drop for Acquire<'_> { // This is where we ensure safety. The future is being dropped, // which means we must ensure that the waiter entry is no longer stored // in the linked list. - let mut waiters = match self.semaphore.waiters.lock() { - Ok(lock) => lock, - // Removing the node from the linked list is necessary to ensure - // safety. Even if the lock was poisoned, we need to make sure it is - // removed from the linked list before dropping it --- otherwise, - // the list will contain a dangling pointer to this node. - Err(e) => e.into_inner(), - }; + let mut waiters = self.semaphore.waiters.lock(); // remove the entry from the list let node = NonNull::from(&mut self.node); diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 9484e130..fe826290 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -529,7 +529,7 @@ impl<T> Sender<T> { pub fn subscribe(&self) -> Receiver<T> { let shared = self.shared.clone(); - let mut tail = shared.tail.lock().unwrap(); + let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { panic!("max receivers"); @@ -584,12 +584,12 @@ impl<T> Sender<T> { /// } /// ``` pub fn receiver_count(&self) -> usize { - let tail = self.shared.tail.lock().unwrap(); + let tail = self.shared.tail.lock(); tail.rx_cnt } fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> { - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); if tail.rx_cnt == 0 { return Err(SendError(value)); @@ -695,7 +695,7 @@ impl<T> Receiver<T> { // the slot lock. drop(slot); - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); @@ -979,7 +979,7 @@ where impl<T> Drop for Receiver<T> { fn drop(&mut self) { - let mut tail = self.shared.tail.lock().unwrap(); + let mut tail = self.shared.tail.lock(); if let Some(waiter) = &self.waiter { // safety: tail lock is held @@ -1142,7 +1142,7 @@ where fn drop(&mut self) { // Acquire the tail lock. This is required for safety before accessing // the waiter node. - let mut tail = self.receiver.as_mut().shared.tail.lock().unwrap(); + let mut tail = self.receiver.as_mut().shared.tail.lock(); // safety: tail lock is held let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index d319e8aa..17117bfe 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -306,7 +306,7 @@ impl Notify { } // There are waiters, the lock must be acquired to notify. - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // The state must be reloaded while the lock is held. The state may only // transition out of WAITING while the lock is held. @@ -321,7 +321,7 @@ impl Notify { /// Notifies all waiting tasks pub(crate) fn notify_waiters(&self) { // There are waiters, the lock must be acquired to notify. - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // The state must be reloaded while the lock is held. The state may only // transition out of WAITING while the lock is held. @@ -452,7 +452,7 @@ impl Future for Notified<'_> { // Acquire the lock and attempt to transition to the waiting // state. - let mut waiters = notify.waiters.lock().unwrap(); + let mut waiters = notify.waiters.lock(); // Reload the state with the lock held let mut curr = notify.state.load(SeqCst); @@ -516,7 +516,7 @@ impl Future for Notified<'_> { // `notify.waiters`). In order to access the waker fields, // we must hold the lock. - let waiters = notify.waiters.lock().unwrap(); + let waiters = notify.waiters.lock(); // Safety: called while locked let w = unsafe { &mut *waiter.get() }; @@ -564,7 +564,7 @@ impl Drop for Notified<'_> { // longer stored in the linked list. if let Waiting = *state { let mut notify_state = WAITING; - let mut waiters = notify.waiters.lock().unwrap(); + let mut waiters = notify.waiters.lock(); // `Notify.state` may be in any of the three states (Empty, Waiting, // Notified). It doesn't actually matter what the atomic is set to |