summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-19 08:01:46 -0800
committerGitHub <noreply@github.com>2019-11-19 08:01:46 -0800
commit7c8b8877d440629ab9a27a2c9dcef859835d3536 (patch)
tree0a31ff2e02c38713e3bd49dbc28e8916c980bef7 /tokio
parent0d38936b35779b604770120da2e98560bbb6241f (diff)
runtime: fix lost wakeup bug in scheduler (#1788)
When checking if a worker needs to be unparked, the SeqCst load does not provide the necessary synchronization to ensure the scheduled task is visible to the searching worker. The `load` is switched to `fetch_add(0)` which does establish the necessary synchronization. Adding unit tests catching this bug will require a fix to loom and will be done at a later time. The bug fix has been validated with manual testing. Fixes #1768
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/thread_pool/idle.rs15
-rw-r--r--tokio/src/runtime/thread_pool/queue/global.rs11
2 files changed, 14 insertions, 12 deletions
diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs
index acf80df8..ae87ca4b 100644
--- a/tokio/src/runtime/thread_pool/idle.rs
+++ b/tokio/src/runtime/thread_pool/idle.rs
@@ -4,7 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use std::fmt;
-use std::sync::atomic::Ordering::{self, AcqRel, Relaxed, SeqCst};
+use std::sync::atomic::Ordering::{self, SeqCst};
pub(super) struct Idle {
/// Tracks both the number of searching workers and the number of unparked
@@ -89,12 +89,7 @@ impl Idle {
}
pub(super) fn transition_worker_to_searching(&self) -> bool {
- // Using `Relaxed` ordering is acceptable here as it is just an
- // optimization. This load has does not need to synchronize with
- // anything, and the algorithm is correct no matter what the load
- // returns (as in, it could return absolutely any `usize` value and the
- // pool would be correct.
- let state = State::load(&self.state, Relaxed);
+ let state = State::load(&self.state, SeqCst);
if 2 * state.num_searching() >= self.num_workers {
return false;
}
@@ -102,9 +97,7 @@ impl Idle {
// It is possible for this routine to allow more than 50% of the workers
// to search. That is OK. Limiting searchers is only an optimization to
// prevent too much contention.
- //
- // At this point, we do not need a hard synchronization with `notify_work`, so `AcqRel` is sufficient.
- State::inc_num_searching(&self.state, AcqRel);
+ State::inc_num_searching(&self.state, SeqCst);
true
}
@@ -140,7 +133,7 @@ impl Idle {
}
fn notify_should_wakeup(&self) -> bool {
- let state = State::load(&self.state, SeqCst);
+ let state = State(self.state.fetch_add(0, SeqCst));
state.num_searching() == 0 && state.num_unparked() < self.num_workers
}
}
diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs
index 931b76a6..a6f49c01 100644
--- a/tokio/src/runtime/thread_pool/queue/global.rs
+++ b/tokio/src/runtime/thread_pool/queue/global.rs
@@ -51,7 +51,7 @@ impl<T: 'static> Queue<T> {
/// Close the worker queue
pub(super) fn close(&self) -> bool {
// Acquire the lock
- let _p = self.pointers.lock().unwrap();
+ let p = self.pointers.lock().unwrap();
let len = unsafe {
// Set the queue as closed. Because all mutations are synchronized by
@@ -63,6 +63,8 @@ impl<T: 'static> Queue<T> {
self.len.store(len | CLOSED, Release);
+ drop(p);
+
ret
}
@@ -119,7 +121,10 @@ impl<T: 'static> Queue<T> {
}
self.len.store(len + 2, Release);
+
f(Ok(()));
+
+ drop(p);
}
}
@@ -153,6 +158,8 @@ impl<T: 'static> Queue<T> {
}
self.len.store(len + (num << 1), Release);
+
+ drop(p);
}
}
@@ -185,6 +192,8 @@ impl<T: 'static> Queue<T> {
// Decrement by 2 to avoid touching the shutdown flag
self.len.store(self.len.unsync_load() - 2, Release);
+ drop(p);
+
Some(Task::from_raw(task))
}
}