diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-26 12:23:12 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-26 12:23:12 -0700 |
commit | 1cb1e291c10adf6b4e530cb1475b95ba10fa615f (patch) | |
tree | aabaebe663e2647fb72cb609d1486adcde0c4cc4 /tokio | |
parent | 186196b911bb7cbbd67e74b4ef051d3daf2d64c1 (diff) |
rt: track loom changes + tweak queue (#2315)
Loom is having a big refresh to improve performance and tighten up the
concurrency model. This diff tracks those changes.
Included in the changes is the removal of `CausalCell` deferred checks.
This is due to it technically being undefined behavior in the C++11
memory model. To address this, the work-stealing queue is updated to
avoid needing this behavior. This is done by limiting the queue to have
one concurrent stealer.
Diffstat (limited to 'tokio')
30 files changed, 521 insertions, 257 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 322f5908..1b18e6ad 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -129,7 +129,7 @@ tempfile = "3.1.0" # loom is currently not compiling on windows. # See: https://github.com/Xudong-Huang/generator-rs/issues/19 [target.'cfg(not(windows))'.dev-dependencies] -loom = { version = "0.2.13", features = ["futures", "checkpoint"] } +loom = { version = "0.3.0", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs new file mode 100644 index 00000000..eb8e4755 --- /dev/null +++ b/tokio/src/loom/std/atomic_ptr.rs @@ -0,0 +1,32 @@ +use std::fmt; +use std::ops::Deref; + +/// `AtomicPtr` providing an additional `load_unsync` function. +pub(crate) struct AtomicPtr<T> { + inner: std::sync::atomic::AtomicPtr<T>, +} + +impl<T> AtomicPtr<T> { + pub(crate) fn new(ptr: *mut T) -> AtomicPtr<T> { + let inner = std::sync::atomic::AtomicPtr::new(ptr); + AtomicPtr { inner } + } + + pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R { + f(self.inner.get_mut()) + } +} + +impl<T> Deref for AtomicPtr<T> { + type Target = std::sync::atomic::AtomicPtr<T>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<T> fmt::Debug for AtomicPtr<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs new file mode 100644 index 00000000..9f394a79 --- /dev/null +++ b/tokio/src/loom/std/atomic_u8.rs @@ -0,0 +1,44 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; + +/// `AtomicU8` providing an additional `load_unsync` function. +pub(crate) struct AtomicU8 { + inner: UnsafeCell<std::sync::atomic::AtomicU8>, +} + +unsafe impl Send for AtomicU8 {} +unsafe impl Sync for AtomicU8 {} + +impl AtomicU8 { + pub(crate) fn new(val: u8) -> AtomicU8 { + let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val)); + AtomicU8 { inner } + } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u8 { + *(*self.inner.get()).get_mut() + } +} + +impl Deref for AtomicU8 { + type Target = std::sync::atomic::AtomicU8; + + fn deref(&self) -> &Self::Target { + // safety: it is always safe to access `&self` fns on the inner value as + // we never perform unsafe mutations. + unsafe { &*self.inner.get() } + } +} + +impl fmt::Debug for AtomicU8 { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs index 78644b05..0fe998f1 100644 --- a/tokio/src/loom/std/atomic_usize.rs +++ b/tokio/src/loom/std/atomic_usize.rs @@ -25,6 +25,11 @@ impl AtomicUsize { pub(crate) unsafe fn unsync_load(&self) -> usize { *(*self.inner.get()).get_mut() } + + pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R { + // safety: we have mutable access + f(unsafe { (*self.inner.get()).get_mut() }) + } } impl ops::Deref for AtomicUsize { diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs deleted file mode 100644 index 8300437a..00000000 --- a/tokio/src/loom/std/causal_cell.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::cell::UnsafeCell; - -#[derive(Debug)] -pub(crate) struct CausalCell<T>(UnsafeCell<T>); - -#[derive(Default)] -pub(crate) struct CausalCheck(()); - -impl<T> CausalCell<T> { - pub(crate) fn new(data: T) -> CausalCell<T> { - CausalCell(UnsafeCell::new(data)) - } - - pub(crate) fn with<F, R>(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck) - where - F: FnOnce(*const T) -> R, - { - (f(self.0.get()), CausalCheck::default()) - } - - pub(crate) fn with_mut<F, R>(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } -} - -impl CausalCheck { - pub(crate) fn check(self) {} - - pub(crate) fn join(&mut self, _other: CausalCheck) {} -} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index a56d778a..6d7bcee1 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,12 +1,13 @@ #![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] -mod atomic_u32; +mod atomic_ptr; mod atomic_u64; +mod atomic_u8; mod atomic_usize; -mod causal_cell; +mod unsafe_cell; pub(crate) mod cell { - pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; + pub(crate) use super::unsafe_cell::UnsafeCell; } #[cfg(any(feature = "sync", feature = "io-driver"))] @@ -58,12 +59,12 @@ pub(crate) mod sync { pub(crate) use std::sync::{Condvar, Mutex, MutexGuard, WaitTimeoutResult}; pub(crate) mod atomic { - pub(crate) use crate::loom::std::atomic_u32::AtomicU32; + pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr; pub(crate) use crate::loom::std::atomic_u64::AtomicU64; + pub(crate) use crate::loom::std::atomic_u8::AtomicU8; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::AtomicU8; - pub(crate) use std::sync::atomic::{fence, AtomicPtr}; + pub(crate) use std::sync::atomic::AtomicU16; pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; } } diff --git a/tokio/src/loom/std/unsafe_cell.rs b/tokio/src/loom/std/unsafe_cell.rs new file mode 100644 index 00000000..f2b03d8d --- /dev/null +++ b/tokio/src/loom/std/unsafe_cell.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub(crate) struct UnsafeCell<T>(std::cell::UnsafeCell<T>); + +impl<T> UnsafeCell<T> { + pub(crate) fn new(data: T) -> UnsafeCell<T> { + UnsafeCell(std::cell::UnsafeCell::new(data)) + } + + pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R { + f(self.0.get()) + } + + pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R { + f(self.0.get()) + } +} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index cd8fbb1c..aedc3280 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -230,6 +230,8 @@ use self::spawner::Spawner; mod time; cfg_rt_threaded! { + mod queue; + pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } diff --git a/tokio/src/runtime/thread_pool/queue.rs b/tokio/src/runtime/queue.rs index 66cec504..233fe454 100644 --- a/tokio/src/runtime/thread_pool/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -1,14 +1,14 @@ //! Run-queue structures to support a work-stealing scheduler -use crate::loom::cell::{CausalCell, CausalCheck}; -use crate::loom::sync::atomic::{self, AtomicU32, AtomicUsize}; +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize}; use crate::loom::sync::{Arc, Mutex}; use crate::runtime::task; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; /// Producer handle. May only be used from a single thread. pub(super) struct Local<T: 'static> { @@ -36,13 +36,21 @@ pub(super) struct Inject<T: 'static> { pub(super) struct Inner<T: 'static> { /// Concurrently updated by many threads. - head: AtomicU32, + /// + /// Contains two `u8` values. The LSB byte is the "real" head of the queue. + /// The `u8` in the MSB is set by a stealer in process of stealing values. + /// It represents the first value being stolen in the batch. + /// + /// When both `u8` values are the same, there is no active stealer. + /// + /// Tracking an in-progress stealer prevents a wrapping scenario. + head: AtomicU16, /// Only updated by producer thread but read by many threads. - tail: AtomicU32, + tail: AtomicU8, /// Elements - buffer: Box<[CausalCell<MaybeUninit<task::Notified<T>>>]>, + buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>, } struct Pointers { @@ -68,23 +76,21 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; // logic, but allows loom to test more edge cases in a reasonable a mount of // time. #[cfg(loom)] -const LOCAL_QUEUE_CAPACITY: usize = 2; +const LOCAL_QUEUE_CAPACITY: usize = 4; const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; /// Create a new local run-queue pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) { - debug_assert!(LOCAL_QUEUE_CAPACITY >= 2 && LOCAL_QUEUE_CAPACITY.is_power_of_two()); - let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); for _ in 0..LOCAL_QUEUE_CAPACITY { - buffer.push(CausalCell::new(MaybeUninit::uninit())); + buffer.push(UnsafeCell::new(MaybeUninit::uninit())); } let inner = Arc::new(Inner { - head: AtomicU32::new(0), - tail: AtomicU32::new(0), + head: AtomicU16::new(0), + tail: AtomicU8::new(0), buffer: buffer.into(), }); @@ -126,44 +132,51 @@ impl<T> Local<T> { /// Pushes a task to the back of the local queue, skipping the LIFO slot. pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) { - loop { + let tail = loop { let head = self.inner.head.load(Acquire); + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if tail.wrapping_sub(head) < LOCAL_QUEUE_CAPACITY as u32 { - // Map the position to a slot index. - let idx = tail as usize & MASK; - - self.inner.buffer[idx].with_mut(|ptr| { - // Write the task to the slot - // - // Safety: There is only one producer and the above `if` - // condition ensures we don't touch a cell if there is a - // value, thus no consumer. - unsafe { - ptr::write((*ptr).as_mut_ptr(), task); - } - }); - - // Make the task available. Synchronizes with a load in - // `steal_into2`. - self.inner.tail.store(tail.wrapping_add(1), Release); - + if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK { + // There is capacity for the task + break tail; + } else if steal != real { + // Concurrently stealing, this will free up capacity, so + // only push the new task onto the inject queue + inject.push(task); return; + } else { + // Push the current task and half of the queue into the + // inject queue. + match self.push_overflow(task, real, tail, inject) { + Ok(_) => return, + // Lost the race, try again + Err(v) => { + task = v; + } + } } - - // The local buffer is full. Push a batch of work to the inject - // queue. - match self.push_overflow(task, head, tail, inject) { - Ok(_) => return, - // Lost the race, try again - Err(v) => task = v, + }; + + // Map the position to a slot index. + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); } + }); - atomic::spin_loop_hint(); - } + // Make the task available. Synchronizes with a load in + // `steal_into2`. + self.inner.tail.store(tail.wrapping_add(1), Release); } /// Moves a batch of tasks into the inject queue. @@ -176,14 +189,22 @@ impl<T> Local<T> { fn push_overflow( &mut self, task: task::Notified<T>, - head: u32, - tail: u32, + head: u8, + tail: u8, inject: &Inject<T>, ) -> Result<(), task::Notified<T>> { const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1; - let n = tail.wrapping_sub(head) / 2; - debug_assert_eq!(n as usize, LOCAL_QUEUE_CAPACITY / 2, "queue is not full"); + let n = (LOCAL_QUEUE_CAPACITY / 2) as u8; + assert_eq!( + tail.wrapping_sub(head) as usize, + LOCAL_QUEUE_CAPACITY - 1, + "queue is not full; tail = {}; head = {}", + tail, + head + ); + + let prev = pack(head, head); // Claim a bunch of tasks // @@ -195,8 +216,13 @@ impl<T> Local<T> { // work. This is because all tasks are pushed into the queue from the // current thread (or memory has been acquired if the local queue handle // moved). - let actual = self.inner.head.compare_and_swap(head, head + n, Release); - if actual != head { + let actual = self.inner.head.compare_and_swap( + prev, + pack(head.wrapping_add(n), head.wrapping_add(n)), + Release, + ); + + if actual != prev { // We failed to claim the tasks, losing the race. Return out of // this function and try the full `push` routine again. The queue // may not be full anymore. @@ -227,7 +253,7 @@ impl<T> Local<T> { // tasks and we are the only producer. self.inner.buffer[i_idx].with_mut(|ptr| unsafe { let ptr = (*ptr).as_ptr(); - *(*ptr).header().queue_next.get() = Some(next); + (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next)); }); } @@ -249,42 +275,41 @@ impl<T> Local<T> { return Some(task); } - loop { - let head = self.inner.head.load(Acquire); + let mut head = self.inner.head.load(Acquire); + + let idx = loop { + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if head == tail { + if real == tail { // queue is empty return None; } - // Map the head position to a slot index. - let idx = head as usize & MASK; + let next_real = real.wrapping_add(1); - let task = self.inner.buffer[idx].with(|ptr| { - // Tentatively read the task at the head position. Note that we - // have not yet claimed the task. - // - // safety: reading this as uninitialized memory. - unsafe { ptr::read(ptr) } - }); + // Only update `steal` component if it differs from `real`. + let next = if steal == real { + pack(next_real, next_real) + } else { + pack(steal, next_real) + }; - // Attempt to claim the task read above. - let actual = self + // Attempt to claim a task. + let res = self .inner .head - .compare_and_swap(head, head.wrapping_add(1), Release); + .compare_exchange(head, next, AcqRel, Acquire); - if actual == head { - // safety: we claimed the task and the data we read is - // initialized memory. - return Some(unsafe { task.assume_init() }); + match res { + Ok(_) => break real as usize & MASK, + Err(actual) => head = actual, } + }; - atomic::spin_loop_hint(); - } + Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) } } @@ -324,9 +349,8 @@ impl<T> Steal<T> { } // Synchronize with stealers - let dst_head = dst.inner.head.load(Acquire); - - assert!(dst_tail.wrapping_sub(dst_head) + n <= LOCAL_QUEUE_CAPACITY as u32); + let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire)); + assert_eq!(dst_steal, dst_real); // Make the stolen items available to consumers dst.inner.tail.store(dst_tail.wrapping_add(n), Release); @@ -334,73 +358,107 @@ impl<T> Steal<T> { Some(ret) } - fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u32) -> u32 { - loop { - let src_head = self.0.head.load(Acquire); + // Steal tasks from `self`, placing them into `dst`. Returns the number of + // tasks that were stolen. + fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u8) -> u8 { + let mut prev_packed = self.0.head.load(Acquire); + let mut next_packed; + + let n = loop { + let (src_head_steal, src_head_real) = unpack(prev_packed); let src_tail = self.0.tail.load(Acquire); + // If these two do not match, another thread is concurrently + // stealing from the queue. + if src_head_steal != src_head_real { + return 0; + } + // Number of available tasks to steal - let n = src_tail.wrapping_sub(src_head); + let n = src_tail.wrapping_sub(src_head_real); let n = n - n / 2; if n == 0 { + // No tasks available to steal return 0; } - if n > LOCAL_QUEUE_CAPACITY as u32 / 2 { - atomic::spin_loop_hint(); - // inconsistent, try again - continue; - } + // Update the real head index to acquire the tasks. + let steal_to = src_head_real.wrapping_add(n); + next_packed = pack(src_head_steal, steal_to); - // Track CausalCell causality checks. The check is deferred until - // the compare_and_swap claims ownership of the tasks. - let mut check = CausalCheck::default(); - - for i in 0..n { - // Compute the positions - let src_pos = src_head.wrapping_add(i); - let dst_pos = dst_tail.wrapping_add(i); - - // Map to slots - let src_idx = src_pos as usize & MASK; - let dst_idx = dst_pos as usize & MASK; - - // Read the task - // - // safety: this is being read as MaybeUninit -- potentially - // uninitialized memory (in the case a producer wraps). We don't - // assume it is initialized, but will just write the - // `MaybeUninit` in our slot below. - let (task, ch) = self.0.buffer[src_idx] - .with_deferred(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - check.join(ch); - - // Write the task to the new slot - // - // safety: `dst` queue is empty and we are the only producer to - // this queue. - dst.inner.buffer[dst_idx] - .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + // Claim all those tasks. This is done by incrementing the "real" + // head but not the steal. By doing this, no other thread is able to + // steal from this queue until the current thread completes. + let res = self + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => break n, + Err(actual) => prev_packed = actual, } + }; + + let (first, _) = unpack(next_packed); - // Claim all of those tasks! - let actual = self + // Take all the tasks + for i in 0..n { + // Compute the positions + let src_pos = first.wrapping_add(i); + let dst_pos = dst_tail.wrapping_add(i); + + // Map to slots + let src_idx = src_pos as usize & MASK; + let dst_idx = dst_pos as usize & MASK; + + // Read the task + // + // safety: We acquired the task with the atomic exchange above. + let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + // Write the task to the new slot + // + // safety: `dst` queue is empty and we are the only producer to + // this queue. + dst.inner.buffer[dst_idx] + .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + } + + let mut prev_packed = next_packed; + + // Update `src_head_steal` to match `src_head_real` signalling that the + // stealing routine is complete. + loop { + let head = unpack(prev_packed).1; + next_packed = pack(head, head); + + let res = self .0 .head - .compare_and_swap(src_head, src_head.wrapping_add(n), Release); + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); - if actual == src_head { - check.check(); - return n; - } + match res { + Ok(_) => return n, + Err(actual) => { + let (actual_steal, actual_real) = unpack(actual); - atomic::spin_loop_hint(); + assert_ne!(actual_steal, actual_real); + + prev_packed = actual; + } + } } } } +impl<T> Clone for Steal<T> { + fn clone(&self) -> Steal<T> { + Steal(self.0.clone()) + } +} + impl<T> Drop for Local<T> { fn drop(&mut self) { if !std::thread::panicking() { @@ -411,7 +469,7 @@ impl<T> Drop for Local<T> { impl<T> Inner<T> { fn is_empty(&self) -> bool { - let head = self.head.load(Acquire); + let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); head == tail @@ -452,7 +510,7 @@ impl<T: 'static> Inject<T> { self.pointers.lock().unwrap().is_closed } - fn len(&self) -> usize { + pub(super) fn len(&self) -> usize { self.len.load(Acquire) } @@ -558,11 +616,30 @@ impl<T: 'static> Drop for Inject<T> { } fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> { - unsafe { *header.as_ref().queue_next.get() } + unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } } fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) { unsafe { - *header.as_ref().queue_next.get() = val; + header.as_ref().queue_next.with_mut(|ptr| *ptr = val); } } + +/// Split the head value into the real head and the index a stealer is working +/// on. +fn unpack(n: u16) -> (u8, u8) { + let real = n & u8::max_value() as u16; + let steal = n >> 8; + + (steal as u8, real as u8) +} + +/// Join the two head values +fn pack(steal: u8, real: u8) -> u16 { + (real as u16) | ((steal as u16) << 8) +} + +#[test] +fn test_local_queue_capacity() { + assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize); +} diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index dee55a54..2092c0aa 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -1,11 +1,10 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{Notified, Schedule, Task}; use crate::util::linked_list; -use std::cell::UnsafeCell; use std::future::Future; use std::pin::Pin; use std::ptr::NonNull; @@ -32,10 +31,10 @@ pub(super) struct Cell<T: Future, S> { /// Holds the future or output, depending on the stage of execution. pub(super) struct Core<T: Future, S> { /// Scheduler used to drive this future - pub(super) scheduler: CausalCell<Option<S>>, + pub(super) scheduler: UnsafeCell<Option<S>>, /// Either the future or the output - pub(super) stage: CausalCell<Stage<T>>, + pub(super) stage: UnsafeCell<Stage<T>>, } /// Crate public as this is also needed by the pool. @@ -62,7 +61,7 @@ unsafe impl Sync for Header {} /// Cold data is stored after the future. pub(super) struct Trailer { /// Consumer task waiting on completion of this task. - pub(super) waker: CausalCell<Option<Waker>>, + pub(super) waker: UnsafeCell<Option<Waker>>, } /// Either the future or the output. @@ -85,11 +84,11 @@ impl<T: Future, S: Schedule> Cell<T, S> { vtable: raw::vtable::<T, S>(), }, core: Core { - scheduler: CausalCell::new(None), - stage: CausalCell::new(Stage::Running(future)), + scheduler: UnsafeCell::new(None), + stage: UnsafeCell::new(Stage::Running(future)), }, trailer: Trailer { - waker: CausalCell::new(None), + waker: UnsafeCell::new(None), }, }) } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index f9cf5e75..29b231ea 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -124,6 +124,9 @@ where if snapshot.is_notified() { // Signal yield self.core().yield_now(Notified(self.to_task())); + // The ref-count was incremented as part of + // `transition_to_idle`. + self.drop_reference(); } } Err(_) => self.cancel_task(), diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 1ea60a9b..17b5157e 100644 --- a/to |