diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-26 12:23:12 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-26 12:23:12 -0700 |
commit | 1cb1e291c10adf6b4e530cb1475b95ba10fa615f (patch) | |
tree | aabaebe663e2647fb72cb609d1486adcde0c4cc4 /tokio/src/sync/mpsc | |
parent | 186196b911bb7cbbd67e74b4ef051d3daf2d64c1 (diff) |
rt: track loom changes + tweak queue (#2315)
Loom is having a big refresh to improve performance and tighten up the
concurrency model. This diff tracks those changes.
Included in the changes is the removal of `CausalCell` deferred checks.
This is due to it technically being undefined behavior in the C++11
memory model. To address this, the work-stealing queue is updated to
avoid needing this behavior. This is done by limiting the queue to have
one concurrent stealer.
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r-- | tokio/src/sync/mpsc/block.rs | 16 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 6 |
2 files changed, 11 insertions, 11 deletions
diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index 4af990bf..7bf16196 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -1,5 +1,5 @@ use crate::loom::{ - cell::CausalCell, + cell::UnsafeCell, sync::atomic::{AtomicPtr, AtomicUsize}, thread, }; @@ -26,7 +26,7 @@ pub(crate) struct Block<T> { /// The observed `tail_position` value *after* the block has been passed by /// `block_tail`. - observed_tail_position: CausalCell<usize>, + observed_tail_position: UnsafeCell<usize>, /// Array containing values pushed into the block. Values are stored in a /// continuous array in order to improve cache line behavior when reading. @@ -39,7 +39,7 @@ pub(crate) enum Read<T> { Closed, } -struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]); +struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]); use super::BLOCK_CAP; @@ -85,7 +85,7 @@ impl<T> Block<T> { ready_slots: AtomicUsize::new(0), - observed_tail_position: CausalCell::new(0), + observed_tail_position: UnsafeCell::new(0), // Value storage values: unsafe { Values::uninitialized() }, @@ -365,12 +365,12 @@ impl<T> Values<T> { unsafe fn uninitialized() -> Values<T> { let mut vals = MaybeUninit::uninit(); - // When fuzzing, `CausalCell` needs to be initialized. + // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! { - let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>; + let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>; for i in 0..BLOCK_CAP { p.add(i) - .write(CausalCell::new(MaybeUninit::uninit())); + .write(UnsafeCell::new(MaybeUninit::uninit())); } } @@ -379,7 +379,7 @@ impl<T> Values<T> { } impl<T> ops::Index<usize> for Values<T> { - type Output = CausalCell<MaybeUninit<T>>; + type Output = UnsafeCell<MaybeUninit<T>>; fn index(&self, index: usize) -> &Self::Output { self.0.index(index) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index dc02dae2..32628661 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,4 +1,4 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; @@ -114,7 +114,7 @@ struct Chan<T, S> { tx_count: AtomicUsize, /// Only accessed by `Rx` handle. - rx_fields: CausalCell<RxFields<T>>, + rx_fields: UnsafeCell<RxFields<T>>, } impl<T, S> fmt::Debug for Chan<T, S> @@ -164,7 +164,7 @@ where semaphore, rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), - rx_fields: CausalCell::new(RxFields { + rx_fields: UnsafeCell::new(RxFields { list: rx, rx_closed: false, }), |