summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/queue.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-04-09 11:35:16 -0700
committerGitHub <noreply@github.com>2020-04-09 11:35:16 -0700
commit58ba45a38cc101e695895cc39d8ee4ce74176397 (patch)
tree18dd0eef6611562bd517f49f2c298714c53f5d2d /tokio/src/runtime/queue.rs
parentde8326a5a490aafdf12848820d1496b758f1ec57 (diff)
rt: fix bug in work-stealing queue (#2387)
Fixes a couple bugs in the work-stealing queue introduced as part of #2315. First, the cursor needs to be able to represent more values than the size of the buffer. This is to be able to track if `tail` is ahead of `head` or if they are identical. This bug resulted in the "overflow" path being taken before the buffer was full. The second bug can happen when a queue is being stolen from concurrently with stealing into. In this case, it is possible for buffer slots to be overwritten before they are released by the stealer. This is harder to happen in practice due to the first bug preventing the queue from filling up 100%, but could still happen. It triggered an assertion in `steal_into`. This bug slipped through due to a bug in loom not correctly catching the case. The loom bug is fixed as part of tokio-rs/loom#119. Fixes: #2382
Diffstat (limited to 'tokio/src/runtime/queue.rs')
-rw-r--r--tokio/src/runtime/queue.rs62
1 files changed, 40 insertions, 22 deletions
diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs
index dc78dbd0..c654514b 100644
--- a/tokio/src/runtime/queue.rs
+++ b/tokio/src/runtime/queue.rs
@@ -1,7 +1,7 @@
//! Run-queue structures to support a work-stealing scheduler
use crate::loom::cell::UnsafeCell;
-use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize};
+use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task;
@@ -34,17 +34,19 @@ pub(super) struct Inject<T: 'static> {
pub(super) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
- /// Contains two `u8` values. The LSB byte is the "real" head of the queue.
- /// The `u8` in the MSB is set by a stealer in process of stealing values.
- /// It represents the first value being stolen in the batch.
+ /// Contains two `u16` values. The LSB byte is the "real" head of the queue.
+ /// The `u16` in the MSB is set by a stealer in process of stealing values.
+ /// It represents the first value being stolen in the batch. `u16` is used
+ /// in order to distinguish between `head == tail` and `head == tail -
+ /// capacity`.
///
- /// When both `u8` values are the same, there is no active stealer.
+ /// When both `u16` values are the same, there is no active stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
- head: AtomicU16,
+ head: AtomicU32,
/// Only updated by producer thread but read by many threads.
- tail: AtomicU8,
+ tail: AtomicU16,
/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
@@ -86,8 +88,8 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
}
let inner = Arc::new(Inner {
- head: AtomicU16::new(0),
- tail: AtomicU8::new(0),
+ head: AtomicU32::new(0),
+ tail: AtomicU16::new(0),
buffer: buffer.into(),
});
@@ -115,7 +117,7 @@ impl<T> Local<T> {
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
- if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK {
+ if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as u16 {
// There is capacity for the task
break tail;
} else if steal != real {
@@ -165,16 +167,16 @@ impl<T> Local<T> {
fn push_overflow(
&mut self,
task: task::Notified<T>,
- head: u8,
- tail: u8,
+ head: u16,
+ tail: u16,
inject: &Inject<T>,
) -> Result<(), task::Notified<T>> {
const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;
- let n = (LOCAL_QUEUE_CAPACITY / 2) as u8;
+ let n = (LOCAL_QUEUE_CAPACITY / 2) as u16;
assert_eq!(
tail.wrapping_sub(head) as usize,
- LOCAL_QUEUE_CAPACITY - 1,
+ LOCAL_QUEUE_CAPACITY,
"queue is not full; tail = {}; head = {}",
tail,
head
@@ -261,10 +263,12 @@ impl<T> Local<T> {
let next_real = real.wrapping_add(1);
- // Only update `steal` component if it differs from `real`.
+ // If `steal == real` there are no concurrent stealers. Both `steal`
+ // and `real` are updated.
let next = if steal == real {
pack(next_real, next_real)
} else {
+ assert_ne!(steal, next_real);
pack(steal, next_real)
};
@@ -295,6 +299,17 @@ impl<T> Steal<T> {
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
+ // To the caller, `dst` may **look** empty but still have values
+ // contained in the buffer. If another thread is concurrently stealing
+ // from `dst` there may not be enough capacity to steal.
+ let (steal, _) = unpack(dst.inner.head.load(Acquire));
+
+ if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as u16 / 2 {
+ // we *could* try to steal less here, but for simplicity, we're just
+ // going to abort.
+ return None;
+ }
+
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
@@ -327,7 +342,7 @@ impl<T> Steal<T> {
// Steal tasks from `self`, placing them into `dst`. Returns the number of
// tasks that were stolen.
- fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u8) -> u8 {
+ fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u16) -> u16 {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;
@@ -352,6 +367,7 @@ impl<T> Steal<T> {
// Update the real head index to acquire the tasks.
let steal_to = src_head_real.wrapping_add(n);
+ assert_ne!(src_head_steal, steal_to);
next_packed = pack(src_head_steal, steal_to);
// Claim all those tasks. This is done by incrementing the "real"
@@ -368,6 +384,8 @@ impl<T> Steal<T> {
}
};
+ assert!(n <= LOCAL_QUEUE_CAPACITY as u16 / 2, "actual = {}", n);
+
let (first, _) = unpack(next_packed);
// Take all the tasks
@@ -594,16 +612,16 @@ fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
/// Split the head value into the real head and the index a stealer is working
/// on.
-fn unpack(n: u16) -> (u8, u8) {
- let real = n & u8::max_value() as u16;
- let steal = n >> 8;
+fn unpack(n: u32) -> (u16, u16) {
+ let real = n & u16::max_value() as u32;
+ let steal = n >> 16;
- (steal as u8, real as u8)
+ (steal as u16, real as u16)
}
/// Join the two head values
-fn pack(steal: u8, real: u8) -> u16 {
- (real as u16) | ((steal as u16) << 8)
+fn pack(steal: u16, real: u16) -> u32 {
+ (real as u32) | ((steal as u32) << 16)
}
#[test]