diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-09-25 18:38:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-25 08:38:13 -0700 |
commit | 444660664b96f758610a0e7201a6a1a31a0f2405 (patch) | |
tree | 0b6829c695a5a0e2cec1157f84ecbe10a0780a3c /tokio/src/runtime | |
parent | cf025ba45f68934ae2138bb75ee2a5ee50506d1b (diff) |
chore: handle std `Mutex` poisoning in a shim (#2872)
As tokio does not rely on poisoning, we can
avoid always unwrapping when locking by handling
the `PoisonError` in the Mutex shim.
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 25 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 12 | ||||
-rw-r--r-- | tokio/src/runtime/park.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/queue.rs | 10 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/idle.rs | 8 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 2 |
6 files changed, 25 insertions, 36 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 0c0e95a6..60fe92c3 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -10,7 +10,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, PoisonError}; +use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::time::Duration; @@ -170,7 +170,7 @@ impl<P: Park> BasicScheduler<P> { } fn take_inner(&self) -> Option<InnerGuard<'_, P>> { - let inner = self.inner.lock().unwrap().take()?; + let inner = self.inner.lock().take()?; Some(InnerGuard { inner: Some(inner), @@ -280,12 +280,7 @@ impl<P: Park> Drop for BasicScheduler<P> { // Avoid a double panic if we are currently panicking and // the lock may be poisoned. - let mut inner = match self - .inner - .lock() - .unwrap_or_else(PoisonError::into_inner) - .take() - { + let mut inner = match self.inner.lock().take() { Some(inner) => inner, None if std::thread::panicking() => return, None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), @@ -309,7 +304,7 @@ impl<P: Park> Drop for BasicScheduler<P> { } // Drain remote queue - for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + for task in scheduler.spawner.shared.queue.lock().drain(..) { task.shutdown(); } @@ -339,7 +334,7 @@ impl Spawner { } fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { - self.shared.queue.lock().unwrap().pop_front() + self.shared.queue.lock().pop_front() } fn waker_ref(&self) -> WakerRef<'_> { @@ -384,7 +379,7 @@ impl Schedule for Arc<Shared> { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().unwrap().push_back(task); + self.queue.lock().push_back(task); self.unpark.unpark(); } }); @@ -423,13 +418,7 @@ impl<P: Park> InnerGuard<'_, P> { impl<P: Park> Drop for InnerGuard<'_, P> { fn drop(&mut self) { if let Some(scheduler) = self.inner.take() { - // We can ignore the poison error here since we are - // just replacing the state. - let mut lock = self - .basic_scheduler - .inner - .lock() - .unwrap_or_else(PoisonError::into_inner); + let mut lock = self.basic_scheduler.inner.lock(); // Replace old scheduler back into the state to allow // other threads to pick it up and drive it. diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 633021ed..df0175b1 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -129,7 +129,7 @@ impl BlockingPool { } pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { - let mut shared = self.spawner.inner.shared.lock().unwrap(); + let mut shared = self.spawner.inner.shared.lock(); // The function can be called multiple times. First, by explicitly // calling `shutdown` then by the drop handler calling `shutdown`. This @@ -170,7 +170,7 @@ impl fmt::Debug for BlockingPool { impl Spawner { pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); if shared.shutdown { // Shutdown the task @@ -207,7 +207,7 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); let entry = shared.worker_threads.vacant_entry(); let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); @@ -251,7 +251,7 @@ impl Inner { f() } - let mut shared = self.shared.lock().unwrap(); + let mut shared = self.shared.lock(); 'main: loop { // BUSY @@ -259,7 +259,7 @@ impl Inner { drop(shared); task.run(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // IDLE @@ -296,7 +296,7 @@ impl Inner { drop(shared); task.shutdown(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // Work was produced, and we "took" it (by decrementing num_notify). diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index c994c935..033b9f20 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -142,7 +142,7 @@ impl Inner { fn park_condvar(&self) { // Otherwise we need to coordinate going to sleep - let mut m = self.mutex.lock().unwrap(); + let mut m = self.mutex.lock(); match self .state @@ -238,7 +238,7 @@ impl Inner { // Releasing `lock` before the call to `notify_one` means that when the // parked thread wakes it doesn't get woken only to have to wait for us // to release `lock`. - drop(self.mutex.lock().unwrap()); + drop(self.mutex.lock()); self.condvar.notify_one() } diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index c654514b..cdf4009c 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -481,7 +481,7 @@ impl<T: 'static> Inject<T> { /// Close the injection queue, returns `true` if the queue is open when the /// transition is made. pub(super) fn close(&self) -> bool { - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if p.is_closed { return false; @@ -492,7 +492,7 @@ impl<T: 'static> Inject<T> { } pub(super) fn is_closed(&self) -> bool { - self.pointers.lock().unwrap().is_closed + self.pointers.lock().is_closed } pub(super) fn len(&self) -> usize { @@ -502,7 +502,7 @@ impl<T: 'static> Inject<T> { /// Pushes a value into the queue. pub(super) fn push(&self, task: task::Notified<T>) { // Acquire queue lock - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if p.is_closed { // Drop the mutex to avoid a potential deadlock when @@ -541,7 +541,7 @@ impl<T: 'static> Inject<T> { debug_assert!(get_next(batch_tail).is_none()); - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if let Some(tail) = p.tail { set_next(tail, Some(batch_head)); @@ -566,7 +566,7 @@ impl<T: 'static> Inject<T> { return None; } - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); // It is possible to hit null here if another thread poped the last // task between us checking `len` and acquiring the lock. diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs index ae87ca4b..6e692fd8 100644 --- a/tokio/src/runtime/thread_pool/idle.rs +++ b/tokio/src/runtime/thread_pool/idle.rs @@ -55,7 +55,7 @@ impl Idle { } // Acquire the lock - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); // Check again, now that the lock is acquired if !self.notify_should_wakeup() { @@ -77,7 +77,7 @@ impl Idle { /// work. pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { // Acquire the lock - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); // Decrement the number of unparked threads let ret = State::dec_num_unparked(&self.state, is_searching); @@ -112,7 +112,7 @@ impl Idle { /// Unpark a specific worker. This happens if tasks are submitted from /// within the worker's park routine. pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); for index in 0..sleepers.len() { if sleepers[index] == worker_id { @@ -128,7 +128,7 @@ impl Idle { /// Returns `true` if `worker_id` is contained in the sleep set pub(super) fn is_parked(&self, worker_id: usize) -> bool { - let sleepers = self.sleepers.lock().unwrap(); + let sleepers = self.sleepers.lock(); sleepers.contains(&worker_id) } diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 10e973f6..c88f9954 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -783,7 +783,7 @@ impl Shared { /// /// If all workers have reached this point, the final cleanup is performed. fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) { - let mut workers = self.shutdown_workers.lock().unwrap(); + let mut workers = self.shutdown_workers.lock(); workers.push((core, worker)); if workers.len() != self.remotes.len() { |