summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-03-26 12:23:12 -0700
committerGitHub <noreply@github.com>2020-03-26 12:23:12 -0700
commit1cb1e291c10adf6b4e530cb1475b95ba10fa615f (patch)
treeaabaebe663e2647fb72cb609d1486adcde0c4cc4
parent186196b911bb7cbbd67e74b4ef051d3daf2d64c1 (diff)
rt: track loom changes + tweak queue (#2315)
Loom is having a big refresh to improve performance and tighten up the concurrency model. This diff tracks those changes. Included in the changes is the removal of `CausalCell` deferred checks. This is due to it technically being undefined behavior in the C++11 memory model. To address this, the work-stealing queue is updated to avoid needing this behavior. This is done by limiting the queue to have one concurrent stealer.
-rw-r--r--ci/azure-test-stable.yml4
-rw-r--r--tokio/Cargo.toml2
-rw-r--r--tokio/src/loom/std/atomic_ptr.rs32
-rw-r--r--tokio/src/loom/std/atomic_u8.rs44
-rw-r--r--tokio/src/loom/std/atomic_usize.rs5
-rw-r--r--tokio/src/loom/std/causal_cell.rs40
-rw-r--r--tokio/src/loom/std/mod.rs13
-rw-r--r--tokio/src/loom/std/unsafe_cell.rs16
-rw-r--r--tokio/src/runtime/mod.rs2
-rw-r--r--tokio/src/runtime/queue.rs (renamed from tokio/src/runtime/thread_pool/queue.rs)323
-rw-r--r--tokio/src/runtime/task/core.rs15
-rw-r--r--tokio/src/runtime/task/harness.rs3
-rw-r--r--tokio/src/runtime/task/mod.rs3
-rw-r--r--tokio/src/runtime/task/stack.rs8
-rw-r--r--tokio/src/runtime/task/state.rs19
-rw-r--r--tokio/src/runtime/tests/loom_pool.rs9
-rw-r--r--tokio/src/runtime/tests/loom_queue.rs75
-rw-r--r--tokio/src/runtime/tests/mod.rs9
-rw-r--r--tokio/src/runtime/tests/queue.rs45
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs2
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs4
-rw-r--r--tokio/src/sync/batch_semaphore.rs15
-rw-r--r--tokio/src/sync/broadcast.rs18
-rw-r--r--tokio/src/sync/mpsc/block.rs16
-rw-r--r--tokio/src/sync/mpsc/chan.rs6
-rw-r--r--tokio/src/sync/oneshot.rs16
-rw-r--r--tokio/src/sync/semaphore_ll.rs6
-rw-r--r--tokio/src/sync/task/atomic_waker.rs6
-rw-r--r--tokio/src/util/slab/page.rs10
-rw-r--r--tokio/src/util/slab/slot.rs6
-rw-r--r--tokio/src/util/slab/tests/loom_stack.rs10
31 files changed, 523 insertions, 259 deletions
diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml
index bc93febd..ce22c942 100644
--- a/ci/azure-test-stable.yml
+++ b/ci/azure-test-stable.yml
@@ -25,7 +25,7 @@ jobs:
# Run with all crate features
- script: cargo test --all-features
env:
- LOOM_MAX_PREEMPTIONS: 2
+ RUST_BACKTRACE: 1
CI: 'True'
displayName: ${{ crate }} - cargo test --all-features
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
@@ -41,7 +41,7 @@ jobs:
# Run with all crate features
- script: cargo test --all-features
env:
- LOOM_MAX_PREEMPTIONS: 2
+ RUST_BACKTRACE: 1
CI: 'True'
displayName: ${{ crate }} - cargo test --all-features
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 322f5908..1b18e6ad 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -129,7 +129,7 @@ tempfile = "3.1.0"
# loom is currently not compiling on windows.
# See: https://github.com/Xudong-Huang/generator-rs/issues/19
[target.'cfg(not(windows))'.dev-dependencies]
-loom = { version = "0.2.13", features = ["futures", "checkpoint"] }
+loom = { version = "0.3.0", features = ["futures", "checkpoint"] }
[package.metadata.docs.rs]
all-features = true
diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs
new file mode 100644
index 00000000..eb8e4755
--- /dev/null
+++ b/tokio/src/loom/std/atomic_ptr.rs
@@ -0,0 +1,32 @@
+use std::fmt;
+use std::ops::Deref;
+
+/// `AtomicPtr` providing an additional `load_unsync` function.
+pub(crate) struct AtomicPtr<T> {
+ inner: std::sync::atomic::AtomicPtr<T>,
+}
+
+impl<T> AtomicPtr<T> {
+ pub(crate) fn new(ptr: *mut T) -> AtomicPtr<T> {
+ let inner = std::sync::atomic::AtomicPtr::new(ptr);
+ AtomicPtr { inner }
+ }
+
+ pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R {
+ f(self.inner.get_mut())
+ }
+}
+
+impl<T> Deref for AtomicPtr<T> {
+ type Target = std::sync::atomic::AtomicPtr<T>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+impl<T> fmt::Debug for AtomicPtr<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.deref().fmt(fmt)
+ }
+}
diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs
new file mode 100644
index 00000000..9f394a79
--- /dev/null
+++ b/tokio/src/loom/std/atomic_u8.rs
@@ -0,0 +1,44 @@
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::ops::Deref;
+
+/// `AtomicU8` providing an additional `load_unsync` function.
+pub(crate) struct AtomicU8 {
+ inner: UnsafeCell<std::sync::atomic::AtomicU8>,
+}
+
+unsafe impl Send for AtomicU8 {}
+unsafe impl Sync for AtomicU8 {}
+
+impl AtomicU8 {
+ pub(crate) fn new(val: u8) -> AtomicU8 {
+ let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val));
+ AtomicU8 { inner }
+ }
+
+ /// Performs an unsynchronized load.
+ ///
+ /// # Safety
+ ///
+ /// All mutations must have happened before the unsynchronized load.
+ /// Additionally, there must be no concurrent mutations.
+ pub(crate) unsafe fn unsync_load(&self) -> u8 {
+ *(*self.inner.get()).get_mut()
+ }
+}
+
+impl Deref for AtomicU8 {
+ type Target = std::sync::atomic::AtomicU8;
+
+ fn deref(&self) -> &Self::Target {
+ // safety: it is always safe to access `&self` fns on the inner value as
+ // we never perform unsafe mutations.
+ unsafe { &*self.inner.get() }
+ }
+}
+
+impl fmt::Debug for AtomicU8 {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.deref().fmt(fmt)
+ }
+}
diff --git a/tokio/src/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs
index 78644b05..0fe998f1 100644
--- a/tokio/src/loom/std/atomic_usize.rs
+++ b/tokio/src/loom/std/atomic_usize.rs
@@ -25,6 +25,11 @@ impl AtomicUsize {
pub(crate) unsafe fn unsync_load(&self) -> usize {
*(*self.inner.get()).get_mut()
}
+
+ pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R {
+ // safety: we have mutable access
+ f(unsafe { (*self.inner.get()).get_mut() })
+ }
}
impl ops::Deref for AtomicUsize {
diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs
deleted file mode 100644
index 8300437a..00000000
--- a/tokio/src/loom/std/causal_cell.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-use std::cell::UnsafeCell;
-
-#[derive(Debug)]
-pub(crate) struct CausalCell<T>(UnsafeCell<T>);
-
-#[derive(Default)]
-pub(crate) struct CausalCheck(());
-
-impl<T> CausalCell<T> {
- pub(crate) fn new(data: T) -> CausalCell<T> {
- CausalCell(UnsafeCell::new(data))
- }
-
- pub(crate) fn with<F, R>(&self, f: F) -> R
- where
- F: FnOnce(*const T) -> R,
- {
- f(self.0.get())
- }
-
- pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck)
- where
- F: FnOnce(*const T) -> R,
- {
- (f(self.0.get()), CausalCheck::default())
- }
-
- pub(crate) fn with_mut<F, R>(&self, f: F) -> R
- where
- F: FnOnce(*mut T) -> R,
- {
- f(self.0.get())
- }
-}
-
-impl CausalCheck {
- pub(crate) fn check(self) {}
-
- pub(crate) fn join(&mut self, _other: CausalCheck) {}
-}
diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs
index a56d778a..6d7bcee1 100644
--- a/tokio/src/loom/std/mod.rs
+++ b/tokio/src/loom/std/mod.rs
@@ -1,12 +1,13 @@
#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))]
-mod atomic_u32;
+mod atomic_ptr;
mod atomic_u64;
+mod atomic_u8;
mod atomic_usize;
-mod causal_cell;
+mod unsafe_cell;
pub(crate) mod cell {
- pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
+ pub(crate) use super::unsafe_cell::UnsafeCell;
}
#[cfg(any(feature = "sync", feature = "io-driver"))]
@@ -58,12 +59,12 @@ pub(crate) mod sync {
pub(crate) use std::sync::{Condvar, Mutex, MutexGuard, WaitTimeoutResult};
pub(crate) mod atomic {
- pub(crate) use crate::loom::std::atomic_u32::AtomicU32;
+ pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
+ pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
- pub(crate) use std::sync::atomic::AtomicU8;
- pub(crate) use std::sync::atomic::{fence, AtomicPtr};
+ pub(crate) use std::sync::atomic::AtomicU16;
pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
}
}
diff --git a/tokio/src/loom/std/unsafe_cell.rs b/tokio/src/loom/std/unsafe_cell.rs
new file mode 100644
index 00000000..f2b03d8d
--- /dev/null
+++ b/tokio/src/loom/std/unsafe_cell.rs
@@ -0,0 +1,16 @@
+#[derive(Debug)]
+pub(crate) struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
+
+impl<T> UnsafeCell<T> {
+ pub(crate) fn new(data: T) -> UnsafeCell<T> {
+ UnsafeCell(std::cell::UnsafeCell::new(data))
+ }
+
+ pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
+ f(self.0.get())
+ }
+
+ pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
+ f(self.0.get())
+ }
+}
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index cd8fbb1c..aedc3280 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -230,6 +230,8 @@ use self::spawner::Spawner;
mod time;
cfg_rt_threaded! {
+ mod queue;
+
pub(crate) mod thread_pool;
use self::thread_pool::ThreadPool;
}
diff --git a/tokio/src/runtime/thread_pool/queue.rs b/tokio/src/runtime/queue.rs
index 66cec504..233fe454 100644
--- a/tokio/src/runtime/thread_pool/queue.rs
+++ b/tokio/src/runtime/queue.rs
@@ -1,14 +1,14 @@
//! Run-queue structures to support a work-stealing scheduler
-use crate::loom::cell::{CausalCell, CausalCheck};
-use crate::loom::sync::atomic::{self, AtomicU32, AtomicUsize};
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
-use std::sync::atomic::Ordering::{Acquire, Release};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
@@ -36,13 +36,21 @@ pub(super) struct Inject<T: 'static> {
pub(super) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
- head: AtomicU32,
+ ///
+ /// Contains two `u8` values. The LSB byte is the "real" head of the queue.
+ /// The `u8` in the MSB is set by a stealer in process of stealing values.
+ /// It represents the first value being stolen in the batch.
+ ///
+ /// When both `u8` values are the same, there is no active stealer.
+ ///
+ /// Tracking an in-progress stealer prevents a wrapping scenario.
+ head: AtomicU16,
/// Only updated by producer thread but read by many threads.
- tail: AtomicU32,
+ tail: AtomicU8,
/// Elements
- buffer: Box<[CausalCell<MaybeUninit<task::Notified<T>>>]>,
+ buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
}
struct Pointers {
@@ -68,23 +76,21 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
// logic, but allows loom to test more edge cases in a reasonable a mount of
// time.
#[cfg(loom)]
-const LOCAL_QUEUE_CAPACITY: usize = 2;
+const LOCAL_QUEUE_CAPACITY: usize = 4;
const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
/// Create a new local run-queue
pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
- debug_assert!(LOCAL_QUEUE_CAPACITY >= 2 && LOCAL_QUEUE_CAPACITY.is_power_of_two());
-
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
for _ in 0..LOCAL_QUEUE_CAPACITY {
- buffer.push(CausalCell::new(MaybeUninit::uninit()));
+ buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
- head: AtomicU32::new(0),
- tail: AtomicU32::new(0),
+ head: AtomicU16::new(0),
+ tail: AtomicU8::new(0),
buffer: buffer.into(),
});
@@ -126,44 +132,51 @@ impl<T> Local<T> {
/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
- loop {
+ let tail = loop {
let head = self.inner.head.load(Acquire);
+ let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
- if tail.wrapping_sub(head) < LOCAL_QUEUE_CAPACITY as u32 {
- // Map the position to a slot index.
- let idx = tail as usize & MASK;
-
- self.inner.buffer[idx].with_mut(|ptr| {
- // Write the task to the slot
- //
- // Safety: There is only one producer and the above `if`
- // condition ensures we don't touch a cell if there is a
- // value, thus no consumer.
- unsafe {
- ptr::write((*ptr).as_mut_ptr(), task);
- }
- });
-
- // Make the task available. Synchronizes with a load in
- // `steal_into2`.
- self.inner.tail.store(tail.wrapping_add(1), Release);
-
+ if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK {
+ // There is capacity for the task
+ break tail;
+ } else if steal != real {
+ // Concurrently stealing, this will free up capacity, so
+ // only push the new task onto the inject queue
+ inject.push(task);
return;
+ } else {
+ // Push the current task and half of the queue into the
+ // inject queue.
+ match self.push_overflow(task, real, tail, inject) {
+ Ok(_) => return,
+ // Lost the race, try again
+ Err(v) => {
+ task = v;
+ }
+ }
}
-
- // The local buffer is full. Push a batch of work to the inject
- // queue.
- match self.push_overflow(task, head, tail, inject) {
- Ok(_) => return,
- // Lost the race, try again
- Err(v) => task = v,
+ };
+
+ // Map the position to a slot index.
+ let idx = tail as usize & MASK;
+
+ self.inner.buffer[idx].with_mut(|ptr| {
+ // Write the task to the slot
+ //
+ // Safety: There is only one producer and the above `if`
+ // condition ensures we don't touch a cell if there is a
+ // value, thus no consumer.
+ unsafe {
+ ptr::write((*ptr).as_mut_ptr(), task);
}
+ });
- atomic::spin_loop_hint();
- }
+ // Make the task available. Synchronizes with a load in
+ // `steal_into2`.
+ self.inner.tail.store(tail.wrapping_add(1), Release);
}
/// Moves a batch of tasks into the inject queue.
@@ -176,14 +189,22 @@ impl<T> Local<T> {
fn push_overflow(
&mut self,
task: task::Notified<T>,
- head: u32,
- tail: u32,
+ head: u8,
+ tail: u8,
inject: &Inject<T>,
) -> Result<(), task::Notified<T>> {
const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;
- let n = tail.wrapping_sub(head) / 2;
- debug_assert_eq!(n as usize, LOCAL_QUEUE_CAPACITY / 2, "queue is not full");
+ let n = (LOCAL_QUEUE_CAPACITY / 2) as u8;
+ assert_eq!(
+ tail.wrapping_sub(head) as usize,
+ LOCAL_QUEUE_CAPACITY - 1,
+ "queue is not full; tail = {}; head = {}",
+ tail,
+ head
+ );
+
+ let prev = pack(head, head);
// Claim a bunch of tasks
//
@@ -195,8 +216,13 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
- let actual = self.inner.head.compare_and_swap(head, head + n, Release);
- if actual != head {
+ let actual = self.inner.head.compare_and_swap(
+ prev,
+ pack(head.wrapping_add(n), head.wrapping_add(n)),
+ Release,
+ );
+
+ if actual != prev {
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
@@ -227,7 +253,7 @@ impl<T> Local<T> {
// tasks and we are the only producer.
self.inner.buffer[i_idx].with_mut(|ptr| unsafe {
let ptr = (*ptr).as_ptr();
- *(*ptr).header().queue_next.get() = Some(next);
+ (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next));
});
}
@@ -249,42 +275,41 @@ impl<T> Local<T> {
return Some(task);
}
- loop {
- let head = self.inner.head.load(Acquire);
+ let mut head = self.inner.head.load(Acquire);
+
+ let idx = loop {
+ let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
- if head == tail {
+ if real == tail {
// queue is empty
return None;
}
- // Map the head position to a slot index.
- let idx = head as usize & MASK;
+ let next_real = real.wrapping_add(1);
- let task = self.inner.buffer[idx].with(|ptr| {
- // Tentatively read the task at the head position. Note that we
- // have not yet claimed the task.
- //
- // safety: reading this as uninitialized memory.
- unsafe { ptr::read(ptr) }
- });
+ // Only update `steal` component if it differs from `real`.
+ let next = if steal == real {
+ pack(next_real, next_real)
+ } else {
+ pack(steal, next_real)
+ };
- // Attempt to claim the task read above.
- let actual = self
+ // Attempt to claim a task.
+ let res = self
.inner
.head
- .compare_and_swap(head, head.wrapping_add(1), Release);
+ .compare_exchange(head, next, AcqRel, Acquire);
- if actual == head {
- // safety: we claimed the task and the data we read is
- // initialized memory.
- return Some(unsafe { task.assume_init() });
+ match res {
+ Ok(_) => break real as usize & MASK,
+ Err(actual) => head = actual,
}
+ };
- atomic::spin_loop_hint();
- }
+ Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
}
}
@@ -324,9 +349,8 @@ impl<T> Steal<T> {
}
// Synchronize with stealers
- let dst_head = dst.inner.head.load(Acquire);
-
- assert!(dst_tail.wrapping_sub(dst_head) + n <= LOCAL_QUEUE_CAPACITY as u32);
+ let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire));
+ assert_eq!(dst_steal, dst_real);
// Make the stolen items available to consumers
dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
@@ -334,73 +358,107 @@ impl<T> Steal<T> {
Some(ret)
}
- fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u32) -> u32 {
- loop {
- let src_head = self.0.head.load(Acquire);
+ // Steal tasks from `self`, placing them into `dst`. Returns the number of
+ // tasks that were stolen.
+ fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u8) -> u8 {
+ let mut prev_packed = self.0.head.load(Acquire);
+ let mut next_packed;
+
+ let n = loop {
+ let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);
+ // If these two do not match, another thread is concurrently
+ // stealing from the queue.
+ if src_head_steal != src_head_real {
+ return 0;
+ }
+
// Number of available tasks to steal
- let n = src_tail.wrapping_sub(src_head);
+ let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2;
if n == 0 {
+ // No tasks available to steal
return 0;
}
- if n > LOCAL_QUEUE_CAPACITY as u32 / 2 {
- atomic::spin_loop_hint();
- // inconsistent, try again
- continue;
- }
+ // Update the real head index to acquire the tasks.
+ let steal_to = src_head_real.wrapping_add(n);
+ next_packed = pack(src_head_steal, steal_to);
- // Track CausalCell causality checks. The check is deferred until
- // the compare_and_swap claims ownership of the tasks.
- let mut check = CausalCheck::default();
-
- for i in 0..n {
- // Compute the positions
- let src_pos = src_head.wrapping_add(i);
- let dst_pos = dst_tail.wrapping_add(i);
-
- // Map to slots
- let src_idx = src_pos as usize & MASK;
- let dst_idx = dst_pos as usize & MASK;
-
- // Read the task
- //
- // safety: this is being read as MaybeUninit -- potentially
- // uninitialized memory (in the case a producer wraps). We don't
- // assume it is initialized, but will just write the
- // `MaybeUninit` in our slot below.
- let (task, ch) = self.0.buffer[src_idx]
- .with_deferred(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
-
- check.join(ch);
-
- // Write the task to the new slot
- //
- // safety: `dst` queue is empty and we are the only producer to
- // this queue.
- dst.inner.buffer[dst_idx]
- .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
+ // Claim all those tasks. This is done by incrementing the "real"
+ // head but not the steal. By doing this, no other thread is able to
+ // steal from this queue until the current thread completes.
+ let res = self
+ .0
+ .head
+ .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => break n,
+ Err(actual) => prev_packed = actual,
}
+ };
+
+ let (first, _) = unpack(next_packed);
- // Claim all of those tasks!
- let actual = self
+ // Take all the tasks
+ for i in 0..n {
+ // Compute the positions
+ let src_pos = first.wrapping_add(i);
+ let dst_pos = dst_tail.wrapping_add(i);
+
+ // Map to slots
+ let src_idx = src_pos as usize & MASK;
+ let dst_idx = dst_pos as usize & MASK;
+
+ // Read the task
+ //
+ // safety: We acquired the task with the atomic exchange above.
+ let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ // Write the task to the new slot
+ //
+ // safety: `dst` queue is empty and we are the only producer to
+ // this queue.
+ dst.inner.buffer[dst_idx]
+ .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
+ }
+
+ let mut prev_packed = next_packed;
+
+ // Update `src_head_steal` to match `src_head_real` signalling that the
+ // stealing routine is complete.
+ loop {
+ let head = unpack(prev_packed).1;
+ next_packed = pack(head, head);
+
+ let res = self
.0
.head
- .compare_and_swap(src_head, src_head.wrapping_add(n), Release);
+ .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
- if actual == src_head {
- check.check();
- return n;
- }
+ match res {
+ Ok(_) => return n,
+ Err(actual) => {
+ let (actual_steal, actual_real) = unpack(actual);
- atomic::spin_loop_hint();
+ assert_ne!(actual_steal, actual_real);
+
+ prev_packed = actual;
+ }
+ }
}
}
}
+impl<T> Clone for Steal<T> {
+ fn clone(&self) -> Steal<T> {
+ Steal(self.0.clone())
+ }
+}
+
impl<T> Drop for Local<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
@@ -411,7 +469,7 @@ impl<T> Drop for Local<T> {
impl<T> Inner<T> {
fn is_empty(&self) -> bool {
- let head = self.head.load(Acquire);
+ let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
head == tail
@@ -452,7 +510,7 @@ impl<T: 'static> Inject<T> {
self.pointers.lock().unwrap().is_closed
}
- fn len(&self) -> usize {
+ pub(super) fn len(&self) -> usize {
self.len.load(Acquire)
}
@@ -558,11 +616,30 @@ impl<T: 'static> Drop for Inject<T> {
}
fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
- unsafe { *header.as_ref().queue_next.get() }
+ unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}
fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
- *header.as_ref().queue_next.get() = val;
+ header.as_ref().queue_next.with_mut(|ptr| *ptr = val);
}
}
+
+/// Split the head value into the real head and the index a stealer is working
+/// on.
+fn unpack(n: u16) -> (u8, u8) {
+ let real = n & u8::max_value() as u16;
+ let steal = n >> 8;
+
+ (steal as u8, real as u8)
+}
+
+/// Join the two head values
+fn pack(steal: u8, real: u8) -> u16 {
+ (real as u16) | ((steal as u16) << 8)
+}
+
+#[test]
+fn test_local_queue_capacity() {
+ assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize);
+}
diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs
index dee55a54..2092c0aa 100644
--- a/tokio/src/runtime/task/core.rs
+++ b/tokio/src/runtime/task/core.rs
@@ -1,11 +1,10 @@
-use crate::loom::cell::CausalCell;
+use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::util::linked_list;
-use std::cell::UnsafeCell;
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
@@ -32,10 +31,10 @@ pub(super) struct Cell<T: Future, S> {
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
- pub(super) scheduler: CausalCell<Option<S>>,
+ pub(super) scheduler: UnsafeCell<Option<S>>,
/// Either the future or the output
- pub(super) stage: CausalCell<Stage<T>>,
+ pub(super) stage: UnsafeCell<Stage<T>>,
}
/// Crate public as this is also needed by the pool.
@@ -62,7 +61,7 @@ unsafe impl Sync for Header {}
/// Cold data is stored after the future.
pub(super) struct Trailer {
/// Consumer task waiting on completion of this task.
- pub(super) waker: CausalCell<Option<Waker>>,
+ pub(super) waker: UnsafeCell<Option<Waker>>,
}
/// Either the future or the output.
@@ -85,11 +84,11 @@ impl<T: Future, S: Schedule> Cell<T, S> {
vtable: raw::vtable::<T, S>(),
},
core: Core {
- scheduler: CausalCell::new(None),
- stage: CausalCell::new(Stage::Running(future)),
+ scheduler: UnsafeCell::new(None),
+ stage: UnsafeCell::new(Stage::Running(future)),
},
trailer: Trailer {
- waker: CausalCell::new(None),
+ waker: UnsafeCell::new(None),