diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-26 12:23:12 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-26 12:23:12 -0700 |
commit | 1cb1e291c10adf6b4e530cb1475b95ba10fa615f (patch) | |
tree | aabaebe663e2647fb72cb609d1486adcde0c4cc4 | |
parent | 186196b911bb7cbbd67e74b4ef051d3daf2d64c1 (diff) |
rt: track loom changes + tweak queue (#2315)
Loom is having a big refresh to improve performance and tighten up the
concurrency model. This diff tracks those changes.
Included in the changes is the removal of `CausalCell` deferred checks.
This is due to it technically being undefined behavior in the C++11
memory model. To address this, the work-stealing queue is updated to
avoid needing this behavior. This is done by limiting the queue to have
one concurrent stealer.
31 files changed, 523 insertions, 259 deletions
diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index bc93febd..ce22c942 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -25,7 +25,7 @@ jobs: # Run with all crate features - script: cargo test --all-features env: - LOOM_MAX_PREEMPTIONS: 2 + RUST_BACKTRACE: 1 CI: 'True' displayName: ${{ crate }} - cargo test --all-features workingDirectory: $(Build.SourcesDirectory)/${{ crate }} @@ -41,7 +41,7 @@ jobs: # Run with all crate features - script: cargo test --all-features env: - LOOM_MAX_PREEMPTIONS: 2 + RUST_BACKTRACE: 1 CI: 'True' displayName: ${{ crate }} - cargo test --all-features workingDirectory: $(Build.SourcesDirectory)/${{ crate }} diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 322f5908..1b18e6ad 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -129,7 +129,7 @@ tempfile = "3.1.0" # loom is currently not compiling on windows. # See: https://github.com/Xudong-Huang/generator-rs/issues/19 [target.'cfg(not(windows))'.dev-dependencies] -loom = { version = "0.2.13", features = ["futures", "checkpoint"] } +loom = { version = "0.3.0", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs new file mode 100644 index 00000000..eb8e4755 --- /dev/null +++ b/tokio/src/loom/std/atomic_ptr.rs @@ -0,0 +1,32 @@ +use std::fmt; +use std::ops::Deref; + +/// `AtomicPtr` providing an additional `load_unsync` function. +pub(crate) struct AtomicPtr<T> { + inner: std::sync::atomic::AtomicPtr<T>, +} + +impl<T> AtomicPtr<T> { + pub(crate) fn new(ptr: *mut T) -> AtomicPtr<T> { + let inner = std::sync::atomic::AtomicPtr::new(ptr); + AtomicPtr { inner } + } + + pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R { + f(self.inner.get_mut()) + } +} + +impl<T> Deref for AtomicPtr<T> { + type Target = std::sync::atomic::AtomicPtr<T>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<T> fmt::Debug for AtomicPtr<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs new file mode 100644 index 00000000..9f394a79 --- /dev/null +++ b/tokio/src/loom/std/atomic_u8.rs @@ -0,0 +1,44 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; + +/// `AtomicU8` providing an additional `load_unsync` function. +pub(crate) struct AtomicU8 { + inner: UnsafeCell<std::sync::atomic::AtomicU8>, +} + +unsafe impl Send for AtomicU8 {} +unsafe impl Sync for AtomicU8 {} + +impl AtomicU8 { + pub(crate) fn new(val: u8) -> AtomicU8 { + let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val)); + AtomicU8 { inner } + } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u8 { + *(*self.inner.get()).get_mut() + } +} + +impl Deref for AtomicU8 { + type Target = std::sync::atomic::AtomicU8; + + fn deref(&self) -> &Self::Target { + // safety: it is always safe to access `&self` fns on the inner value as + // we never perform unsafe mutations. + unsafe { &*self.inner.get() } + } +} + +impl fmt::Debug for AtomicU8 { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs index 78644b05..0fe998f1 100644 --- a/tokio/src/loom/std/atomic_usize.rs +++ b/tokio/src/loom/std/atomic_usize.rs @@ -25,6 +25,11 @@ impl AtomicUsize { pub(crate) unsafe fn unsync_load(&self) -> usize { *(*self.inner.get()).get_mut() } + + pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R { + // safety: we have mutable access + f(unsafe { (*self.inner.get()).get_mut() }) + } } impl ops::Deref for AtomicUsize { diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs deleted file mode 100644 index 8300437a..00000000 --- a/tokio/src/loom/std/causal_cell.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::cell::UnsafeCell; - -#[derive(Debug)] -pub(crate) struct CausalCell<T>(UnsafeCell<T>); - -#[derive(Default)] -pub(crate) struct CausalCheck(()); - -impl<T> CausalCell<T> { - pub(crate) fn new(data: T) -> CausalCell<T> { - CausalCell(UnsafeCell::new(data)) - } - - pub(crate) fn with<F, R>(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck) - where - F: FnOnce(*const T) -> R, - { - (f(self.0.get()), CausalCheck::default()) - } - - pub(crate) fn with_mut<F, R>(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } -} - -impl CausalCheck { - pub(crate) fn check(self) {} - - pub(crate) fn join(&mut self, _other: CausalCheck) {} -} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index a56d778a..6d7bcee1 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,12 +1,13 @@ #![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] -mod atomic_u32; +mod atomic_ptr; mod atomic_u64; +mod atomic_u8; mod atomic_usize; -mod causal_cell; +mod unsafe_cell; pub(crate) mod cell { - pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; + pub(crate) use super::unsafe_cell::UnsafeCell; } #[cfg(any(feature = "sync", feature = "io-driver"))] @@ -58,12 +59,12 @@ pub(crate) mod sync { pub(crate) use std::sync::{Condvar, Mutex, MutexGuard, WaitTimeoutResult}; pub(crate) mod atomic { - pub(crate) use crate::loom::std::atomic_u32::AtomicU32; + pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr; pub(crate) use crate::loom::std::atomic_u64::AtomicU64; + pub(crate) use crate::loom::std::atomic_u8::AtomicU8; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::AtomicU8; - pub(crate) use std::sync::atomic::{fence, AtomicPtr}; + pub(crate) use std::sync::atomic::AtomicU16; pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; } } diff --git a/tokio/src/loom/std/unsafe_cell.rs b/tokio/src/loom/std/unsafe_cell.rs new file mode 100644 index 00000000..f2b03d8d --- /dev/null +++ b/tokio/src/loom/std/unsafe_cell.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub(crate) struct UnsafeCell<T>(std::cell::UnsafeCell<T>); + +impl<T> UnsafeCell<T> { + pub(crate) fn new(data: T) -> UnsafeCell<T> { + UnsafeCell(std::cell::UnsafeCell::new(data)) + } + + pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R { + f(self.0.get()) + } + + pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R { + f(self.0.get()) + } +} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index cd8fbb1c..aedc3280 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -230,6 +230,8 @@ use self::spawner::Spawner; mod time; cfg_rt_threaded! { + mod queue; + pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } diff --git a/tokio/src/runtime/thread_pool/queue.rs b/tokio/src/runtime/queue.rs index 66cec504..233fe454 100644 --- a/tokio/src/runtime/thread_pool/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -1,14 +1,14 @@ //! Run-queue structures to support a work-stealing scheduler -use crate::loom::cell::{CausalCell, CausalCheck}; -use crate::loom::sync::atomic::{self, AtomicU32, AtomicUsize}; +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize}; use crate::loom::sync::{Arc, Mutex}; use crate::runtime::task; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; /// Producer handle. May only be used from a single thread. pub(super) struct Local<T: 'static> { @@ -36,13 +36,21 @@ pub(super) struct Inject<T: 'static> { pub(super) struct Inner<T: 'static> { /// Concurrently updated by many threads. - head: AtomicU32, + /// + /// Contains two `u8` values. The LSB byte is the "real" head of the queue. + /// The `u8` in the MSB is set by a stealer in process of stealing values. + /// It represents the first value being stolen in the batch. + /// + /// When both `u8` values are the same, there is no active stealer. + /// + /// Tracking an in-progress stealer prevents a wrapping scenario. + head: AtomicU16, /// Only updated by producer thread but read by many threads. - tail: AtomicU32, + tail: AtomicU8, /// Elements - buffer: Box<[CausalCell<MaybeUninit<task::Notified<T>>>]>, + buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>, } struct Pointers { @@ -68,23 +76,21 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; // logic, but allows loom to test more edge cases in a reasonable a mount of // time. #[cfg(loom)] -const LOCAL_QUEUE_CAPACITY: usize = 2; +const LOCAL_QUEUE_CAPACITY: usize = 4; const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; /// Create a new local run-queue pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) { - debug_assert!(LOCAL_QUEUE_CAPACITY >= 2 && LOCAL_QUEUE_CAPACITY.is_power_of_two()); - let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); for _ in 0..LOCAL_QUEUE_CAPACITY { - buffer.push(CausalCell::new(MaybeUninit::uninit())); + buffer.push(UnsafeCell::new(MaybeUninit::uninit())); } let inner = Arc::new(Inner { - head: AtomicU32::new(0), - tail: AtomicU32::new(0), + head: AtomicU16::new(0), + tail: AtomicU8::new(0), buffer: buffer.into(), }); @@ -126,44 +132,51 @@ impl<T> Local<T> { /// Pushes a task to the back of the local queue, skipping the LIFO slot. pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) { - loop { + let tail = loop { let head = self.inner.head.load(Acquire); + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if tail.wrapping_sub(head) < LOCAL_QUEUE_CAPACITY as u32 { - // Map the position to a slot index. - let idx = tail as usize & MASK; - - self.inner.buffer[idx].with_mut(|ptr| { - // Write the task to the slot - // - // Safety: There is only one producer and the above `if` - // condition ensures we don't touch a cell if there is a - // value, thus no consumer. - unsafe { - ptr::write((*ptr).as_mut_ptr(), task); - } - }); - - // Make the task available. Synchronizes with a load in - // `steal_into2`. - self.inner.tail.store(tail.wrapping_add(1), Release); - + if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK { + // There is capacity for the task + break tail; + } else if steal != real { + // Concurrently stealing, this will free up capacity, so + // only push the new task onto the inject queue + inject.push(task); return; + } else { + // Push the current task and half of the queue into the + // inject queue. + match self.push_overflow(task, real, tail, inject) { + Ok(_) => return, + // Lost the race, try again + Err(v) => { + task = v; + } + } } - - // The local buffer is full. Push a batch of work to the inject - // queue. - match self.push_overflow(task, head, tail, inject) { - Ok(_) => return, - // Lost the race, try again - Err(v) => task = v, + }; + + // Map the position to a slot index. + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); } + }); - atomic::spin_loop_hint(); - } + // Make the task available. Synchronizes with a load in + // `steal_into2`. + self.inner.tail.store(tail.wrapping_add(1), Release); } /// Moves a batch of tasks into the inject queue. @@ -176,14 +189,22 @@ impl<T> Local<T> { fn push_overflow( &mut self, task: task::Notified<T>, - head: u32, - tail: u32, + head: u8, + tail: u8, inject: &Inject<T>, ) -> Result<(), task::Notified<T>> { const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1; - let n = tail.wrapping_sub(head) / 2; - debug_assert_eq!(n as usize, LOCAL_QUEUE_CAPACITY / 2, "queue is not full"); + let n = (LOCAL_QUEUE_CAPACITY / 2) as u8; + assert_eq!( + tail.wrapping_sub(head) as usize, + LOCAL_QUEUE_CAPACITY - 1, + "queue is not full; tail = {}; head = {}", + tail, + head + ); + + let prev = pack(head, head); // Claim a bunch of tasks // @@ -195,8 +216,13 @@ impl<T> Local<T> { // work. This is because all tasks are pushed into the queue from the // current thread (or memory has been acquired if the local queue handle // moved). - let actual = self.inner.head.compare_and_swap(head, head + n, Release); - if actual != head { + let actual = self.inner.head.compare_and_swap( + prev, + pack(head.wrapping_add(n), head.wrapping_add(n)), + Release, + ); + + if actual != prev { // We failed to claim the tasks, losing the race. Return out of // this function and try the full `push` routine again. The queue // may not be full anymore. @@ -227,7 +253,7 @@ impl<T> Local<T> { // tasks and we are the only producer. self.inner.buffer[i_idx].with_mut(|ptr| unsafe { let ptr = (*ptr).as_ptr(); - *(*ptr).header().queue_next.get() = Some(next); + (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next)); }); } @@ -249,42 +275,41 @@ impl<T> Local<T> { return Some(task); } - loop { - let head = self.inner.head.load(Acquire); + let mut head = self.inner.head.load(Acquire); + + let idx = loop { + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if head == tail { + if real == tail { // queue is empty return None; } - // Map the head position to a slot index. - let idx = head as usize & MASK; + let next_real = real.wrapping_add(1); - let task = self.inner.buffer[idx].with(|ptr| { - // Tentatively read the task at the head position. Note that we - // have not yet claimed the task. - // - // safety: reading this as uninitialized memory. - unsafe { ptr::read(ptr) } - }); + // Only update `steal` component if it differs from `real`. + let next = if steal == real { + pack(next_real, next_real) + } else { + pack(steal, next_real) + }; - // Attempt to claim the task read above. - let actual = self + // Attempt to claim a task. + let res = self .inner .head - .compare_and_swap(head, head.wrapping_add(1), Release); + .compare_exchange(head, next, AcqRel, Acquire); - if actual == head { - // safety: we claimed the task and the data we read is - // initialized memory. - return Some(unsafe { task.assume_init() }); + match res { + Ok(_) => break real as usize & MASK, + Err(actual) => head = actual, } + }; - atomic::spin_loop_hint(); - } + Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) } } @@ -324,9 +349,8 @@ impl<T> Steal<T> { } // Synchronize with stealers - let dst_head = dst.inner.head.load(Acquire); - - assert!(dst_tail.wrapping_sub(dst_head) + n <= LOCAL_QUEUE_CAPACITY as u32); + let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire)); + assert_eq!(dst_steal, dst_real); // Make the stolen items available to consumers dst.inner.tail.store(dst_tail.wrapping_add(n), Release); @@ -334,73 +358,107 @@ impl<T> Steal<T> { Some(ret) } - fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u32) -> u32 { - loop { - let src_head = self.0.head.load(Acquire); + // Steal tasks from `self`, placing them into `dst`. Returns the number of + // tasks that were stolen. + fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u8) -> u8 { + let mut prev_packed = self.0.head.load(Acquire); + let mut next_packed; + + let n = loop { + let (src_head_steal, src_head_real) = unpack(prev_packed); let src_tail = self.0.tail.load(Acquire); + // If these two do not match, another thread is concurrently + // stealing from the queue. + if src_head_steal != src_head_real { + return 0; + } + // Number of available tasks to steal - let n = src_tail.wrapping_sub(src_head); + let n = src_tail.wrapping_sub(src_head_real); let n = n - n / 2; if n == 0 { + // No tasks available to steal return 0; } - if n > LOCAL_QUEUE_CAPACITY as u32 / 2 { - atomic::spin_loop_hint(); - // inconsistent, try again - continue; - } + // Update the real head index to acquire the tasks. + let steal_to = src_head_real.wrapping_add(n); + next_packed = pack(src_head_steal, steal_to); - // Track CausalCell causality checks. The check is deferred until - // the compare_and_swap claims ownership of the tasks. - let mut check = CausalCheck::default(); - - for i in 0..n { - // Compute the positions - let src_pos = src_head.wrapping_add(i); - let dst_pos = dst_tail.wrapping_add(i); - - // Map to slots - let src_idx = src_pos as usize & MASK; - let dst_idx = dst_pos as usize & MASK; - - // Read the task - // - // safety: this is being read as MaybeUninit -- potentially - // uninitialized memory (in the case a producer wraps). We don't - // assume it is initialized, but will just write the - // `MaybeUninit` in our slot below. - let (task, ch) = self.0.buffer[src_idx] - .with_deferred(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - check.join(ch); - - // Write the task to the new slot - // - // safety: `dst` queue is empty and we are the only producer to - // this queue. - dst.inner.buffer[dst_idx] - .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + // Claim all those tasks. This is done by incrementing the "real" + // head but not the steal. By doing this, no other thread is able to + // steal from this queue until the current thread completes. + let res = self + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => break n, + Err(actual) => prev_packed = actual, } + }; + + let (first, _) = unpack(next_packed); - // Claim all of those tasks! - let actual = self + // Take all the tasks + for i in 0..n { + // Compute the positions + let src_pos = first.wrapping_add(i); + let dst_pos = dst_tail.wrapping_add(i); + + // Map to slots + let src_idx = src_pos as usize & MASK; + let dst_idx = dst_pos as usize & MASK; + + // Read the task + // + // safety: We acquired the task with the atomic exchange above. + let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + // Write the task to the new slot + // + // safety: `dst` queue is empty and we are the only producer to + // this queue. + dst.inner.buffer[dst_idx] + .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + } + + let mut prev_packed = next_packed; + + // Update `src_head_steal` to match `src_head_real` signalling that the + // stealing routine is complete. + loop { + let head = unpack(prev_packed).1; + next_packed = pack(head, head); + + let res = self .0 .head - .compare_and_swap(src_head, src_head.wrapping_add(n), Release); + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); - if actual == src_head { - check.check(); - return n; - } + match res { + Ok(_) => return n, + Err(actual) => { + let (actual_steal, actual_real) = unpack(actual); - atomic::spin_loop_hint(); + assert_ne!(actual_steal, actual_real); + + prev_packed = actual; + } + } } } } +impl<T> Clone for Steal<T> { + fn clone(&self) -> Steal<T> { + Steal(self.0.clone()) + } +} + impl<T> Drop for Local<T> { fn drop(&mut self) { if !std::thread::panicking() { @@ -411,7 +469,7 @@ impl<T> Drop for Local<T> { impl<T> Inner<T> { fn is_empty(&self) -> bool { - let head = self.head.load(Acquire); + let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); head == tail @@ -452,7 +510,7 @@ impl<T: 'static> Inject<T> { self.pointers.lock().unwrap().is_closed } - fn len(&self) -> usize { + pub(super) fn len(&self) -> usize { self.len.load(Acquire) } @@ -558,11 +616,30 @@ impl<T: 'static> Drop for Inject<T> { } fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> { - unsafe { *header.as_ref().queue_next.get() } + unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } } fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) { unsafe { - *header.as_ref().queue_next.get() = val; + header.as_ref().queue_next.with_mut(|ptr| *ptr = val); } } + +/// Split the head value into the real head and the index a stealer is working +/// on. +fn unpack(n: u16) -> (u8, u8) { + let real = n & u8::max_value() as u16; + let steal = n >> 8; + + (steal as u8, real as u8) +} + +/// Join the two head values +fn pack(steal: u8, real: u8) -> u16 { + (real as u16) | ((steal as u16) << 8) +} + +#[test] +fn test_local_queue_capacity() { + assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize); +} diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index dee55a54..2092c0aa 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -1,11 +1,10 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{Notified, Schedule, Task}; use crate::util::linked_list; -use std::cell::UnsafeCell; use std::future::Future; use std::pin::Pin; use std::ptr::NonNull; @@ -32,10 +31,10 @@ pub(super) struct Cell<T: Future, S> { /// Holds the future or output, depending on the stage of execution. pub(super) struct Core<T: Future, S> { /// Scheduler used to drive this future - pub(super) scheduler: CausalCell<Option<S>>, + pub(super) scheduler: UnsafeCell<Option<S>>, /// Either the future or the output - pub(super) stage: CausalCell<Stage<T>>, + pub(super) stage: UnsafeCell<Stage<T>>, } /// Crate public as this is also needed by the pool. @@ -62,7 +61,7 @@ unsafe impl Sync for Header {} /// Cold data is stored after the future. pub(super) struct Trailer { /// Consumer task waiting on completion of this task. - pub(super) waker: CausalCell<Option<Waker>>, + pub(super) waker: UnsafeCell<Option<Waker>>, } /// Either the future or the output. @@ -85,11 +84,11 @@ impl<T: Future, S: Schedule> Cell<T, S> { vtable: raw::vtable::<T, S>(), }, core: Core { - scheduler: CausalCell::new(None), - stage: CausalCell::new(Stage::Running(future)), + scheduler: UnsafeCell::new(None), + stage: UnsafeCell::new(Stage::Running(future)), }, trailer: Trailer { - waker: CausalCell::new(None), + waker: UnsafeCell::new(None), |