summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-03-26 12:23:12 -0700
committerGitHub <noreply@github.com>2020-03-26 12:23:12 -0700
commit1cb1e291c10adf6b4e530cb1475b95ba10fa615f (patch)
treeaabaebe663e2647fb72cb609d1486adcde0c4cc4 /tokio/src/sync/mpsc
parent186196b911bb7cbbd67e74b4ef051d3daf2d64c1 (diff)
rt: track loom changes + tweak queue (#2315)
Loom is having a big refresh to improve performance and tighten up the concurrency model. This diff tracks those changes. Included in the changes is the removal of `CausalCell` deferred checks. This is due to it technically being undefined behavior in the C++11 memory model. To address this, the work-stealing queue is updated to avoid needing this behavior. This is done by limiting the queue to have one concurrent stealer.
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r--tokio/src/sync/mpsc/block.rs16
-rw-r--r--tokio/src/sync/mpsc/chan.rs6
2 files changed, 11 insertions, 11 deletions
diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs
index 4af990bf..7bf16196 100644
--- a/tokio/src/sync/mpsc/block.rs
+++ b/tokio/src/sync/mpsc/block.rs
@@ -1,5 +1,5 @@
use crate::loom::{
- cell::CausalCell,
+ cell::UnsafeCell,
sync::atomic::{AtomicPtr, AtomicUsize},
thread,
};
@@ -26,7 +26,7 @@ pub(crate) struct Block<T> {
/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
- observed_tail_position: CausalCell<usize>,
+ observed_tail_position: UnsafeCell<usize>,
/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
@@ -39,7 +39,7 @@ pub(crate) enum Read<T> {
Closed,
}
-struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]);
+struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
use super::BLOCK_CAP;
@@ -85,7 +85,7 @@ impl<T> Block<T> {
ready_slots: AtomicUsize::new(0),
- observed_tail_position: CausalCell::new(0),
+ observed_tail_position: UnsafeCell::new(0),
// Value storage
values: unsafe { Values::uninitialized() },
@@ -365,12 +365,12 @@ impl<T> Values<T> {
unsafe fn uninitialized() -> Values<T> {
let mut vals = MaybeUninit::uninit();
- // When fuzzing, `CausalCell` needs to be initialized.
+ // When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
- let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
+ let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
- .write(CausalCell::new(MaybeUninit::uninit()));
+ .write(UnsafeCell::new(MaybeUninit::uninit()));
}
}
@@ -379,7 +379,7 @@ impl<T> Values<T> {
}
impl<T> ops::Index<usize> for Values<T> {
- type Output = CausalCell<MaybeUninit<T>>;
+ type Output = UnsafeCell<MaybeUninit<T>>;
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index dc02dae2..32628661 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -1,4 +1,4 @@
-use crate::loom::cell::CausalCell;
+use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
@@ -114,7 +114,7 @@ struct Chan<T, S> {
tx_count: AtomicUsize,
/// Only accessed by `Rx` handle.
- rx_fields: CausalCell<RxFields<T>>,
+ rx_fields: UnsafeCell<RxFields<T>>,
}
impl<T, S> fmt::Debug for Chan<T, S>
@@ -164,7 +164,7 @@ where
semaphore,
rx_waker: AtomicWaker::new(),
tx_count: AtomicUsize::new(1),
- rx_fields: CausalCell::new(RxFields {
+ rx_fields: UnsafeCell::new(RxFields {
list: rx,
rx_closed: false,
}),