summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-09-25 18:38:13 +0300
committerGitHub <noreply@github.com>2020-09-25 08:38:13 -0700
commit444660664b96f758610a0e7201a6a1a31a0f2405 (patch)
tree0b6829c695a5a0e2cec1157f84ecbe10a0780a3c /tokio/src/runtime
parentcf025ba45f68934ae2138bb75ee2a5ee50506d1b (diff)
chore: handle std `Mutex` poisoning in a shim (#2872)
As tokio does not rely on poisoning, we can avoid always unwrapping when locking by handling the `PoisonError` in the Mutex shim. Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Diffstat (limited to 'tokio/src/runtime')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs25
-rw-r--r--tokio/src/runtime/blocking/pool.rs12
-rw-r--r--tokio/src/runtime/park.rs4
-rw-r--r--tokio/src/runtime/queue.rs10
-rw-r--r--tokio/src/runtime/thread_pool/idle.rs8
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs2
6 files changed, 25 insertions, 36 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 0c0e95a6..60fe92c3 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -10,7 +10,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::sync::{Arc, PoisonError};
+use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
@@ -170,7 +170,7 @@ impl<P: Park> BasicScheduler<P> {
}
fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
- let inner = self.inner.lock().unwrap().take()?;
+ let inner = self.inner.lock().take()?;
Some(InnerGuard {
inner: Some(inner),
@@ -280,12 +280,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.
- let mut inner = match self
- .inner
- .lock()
- .unwrap_or_else(PoisonError::into_inner)
- .take()
- {
+ let mut inner = match self.inner.lock().take() {
Some(inner) => inner,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
@@ -309,7 +304,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
}
// Drain remote queue
- for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
+ for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
}
@@ -339,7 +334,7 @@ impl Spawner {
}
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
- self.shared.queue.lock().unwrap().pop_front()
+ self.shared.queue.lock().pop_front()
}
fn waker_ref(&self) -> WakerRef<'_> {
@@ -384,7 +379,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
- self.queue.lock().unwrap().push_back(task);
+ self.queue.lock().push_back(task);
self.unpark.unpark();
}
});
@@ -423,13 +418,7 @@ impl<P: Park> InnerGuard<'_, P> {
impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
- // We can ignore the poison error here since we are
- // just replacing the state.
- let mut lock = self
- .basic_scheduler
- .inner
- .lock()
- .unwrap_or_else(PoisonError::into_inner);
+ let mut lock = self.basic_scheduler.inner.lock();
// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 633021ed..df0175b1 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -129,7 +129,7 @@ impl BlockingPool {
}
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
- let mut shared = self.spawner.inner.shared.lock().unwrap();
+ let mut shared = self.spawner.inner.shared.lock();
// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
@@ -170,7 +170,7 @@ impl fmt::Debug for BlockingPool {
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
- let mut shared = self.inner.shared.lock().unwrap();
+ let mut shared = self.inner.shared.lock();
if shared.shutdown {
// Shutdown the task
@@ -207,7 +207,7 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- let mut shared = self.inner.shared.lock().unwrap();
+ let mut shared = self.inner.shared.lock();
let entry = shared.worker_threads.vacant_entry();
let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
@@ -251,7 +251,7 @@ impl Inner {
f()
}
- let mut shared = self.shared.lock().unwrap();
+ let mut shared = self.shared.lock();
'main: loop {
// BUSY
@@ -259,7 +259,7 @@ impl Inner {
drop(shared);
task.run();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// IDLE
@@ -296,7 +296,7 @@ impl Inner {
drop(shared);
task.shutdown();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// Work was produced, and we "took" it (by decrementing num_notify).
diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs
index c994c935..033b9f20 100644
--- a/tokio/src/runtime/park.rs
+++ b/tokio/src/runtime/park.rs
@@ -142,7 +142,7 @@ impl Inner {
fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
- let mut m = self.mutex.lock().unwrap();
+ let mut m = self.mutex.lock();
match self
.state
@@ -238,7 +238,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
- drop(self.mutex.lock().unwrap());
+ drop(self.mutex.lock());
self.condvar.notify_one()
}
diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs
index c654514b..cdf4009c 100644
--- a/tokio/src/runtime/queue.rs
+++ b/tokio/src/runtime/queue.rs
@@ -481,7 +481,7 @@ impl<T: 'static> Inject<T> {
/// Close the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(super) fn close(&self) -> bool {
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if p.is_closed {
return false;
@@ -492,7 +492,7 @@ impl<T: 'static> Inject<T> {
}
pub(super) fn is_closed(&self) -> bool {
- self.pointers.lock().unwrap().is_closed
+ self.pointers.lock().is_closed
}
pub(super) fn len(&self) -> usize {
@@ -502,7 +502,7 @@ impl<T: 'static> Inject<T> {
/// Pushes a value into the queue.
pub(super) fn push(&self, task: task::Notified<T>) {
// Acquire queue lock
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if p.is_closed {
// Drop the mutex to avoid a potential deadlock when
@@ -541,7 +541,7 @@ impl<T: 'static> Inject<T> {
debug_assert!(get_next(batch_tail).is_none());
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
@@ -566,7 +566,7 @@ impl<T: 'static> Inject<T> {
return None;
}
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
// It is possible to hit null here if another thread poped the last
// task between us checking `len` and acquiring the lock.
diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs
index ae87ca4b..6e692fd8 100644
--- a/tokio/src/runtime/thread_pool/idle.rs
+++ b/tokio/src/runtime/thread_pool/idle.rs
@@ -55,7 +55,7 @@ impl Idle {
}
// Acquire the lock
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
// Check again, now that the lock is acquired
if !self.notify_should_wakeup() {
@@ -77,7 +77,7 @@ impl Idle {
/// work.
pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
// Acquire the lock
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
// Decrement the number of unparked threads
let ret = State::dec_num_unparked(&self.state, is_searching);
@@ -112,7 +112,7 @@ impl Idle {
/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
@@ -128,7 +128,7 @@ impl Idle {
/// Returns `true` if `worker_id` is contained in the sleep set
pub(super) fn is_parked(&self, worker_id: usize) -> bool {
- let sleepers = self.sleepers.lock().unwrap();
+ let sleepers = self.sleepers.lock();
sleepers.contains(&worker_id)
}
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 10e973f6..c88f9954 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -783,7 +783,7 @@ impl Shared {
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
- let mut workers = self.shutdown_workers.lock().unwrap();
+ let mut workers = self.shutdown_workers.lock();
workers.push((core, worker));
if workers.len() != self.remotes.len() {