diff options
Diffstat (limited to 'tokio')
27 files changed, 2325 insertions, 314 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 6bbbf2c6..aa60ae22 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -41,7 +41,7 @@ io-util = ["io-traits", "pin-project", "memchr"] io = ["io-traits", "io-util"] macros = ["tokio-macros"] net-full = ["tcp", "udp", "uds"] -net-driver = ["mio", "tokio-executor/blocking"] +net-driver = ["mio", "tokio-executor/blocking", "lazy_static"] rt-current-thread = [ "timer", "tokio-executor/current-thread", @@ -113,6 +113,10 @@ version = "0.3.8" default-features = false optional = true +[target.'cfg(loom)'.dependencies] +# play nice with loom tests in other crates. +loom = "0.2.11" + [dev-dependencies] tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" } tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } @@ -130,5 +134,9 @@ serde_json = "1.0" tempfile = "3.1.0" time = "0.1" +# sharded slab tests +loom = "0.2.11" +proptest = "0.9.4" + [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 249bf47a..f5e88b4a 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -69,7 +69,6 @@ //! } //! } //! ``` - macro_rules! if_runtime { ($($i:item)*) => ($( #[cfg(any( @@ -97,6 +96,9 @@ pub mod io; #[cfg(feature = "net-driver")] pub mod net; +#[cfg(feature = "net-driver")] +mod loom; + pub mod prelude; #[cfg(feature = "process")] diff --git a/tokio/src/loom.rs b/tokio/src/loom.rs new file mode 100644 index 00000000..57ce2df6 --- /dev/null +++ b/tokio/src/loom.rs @@ -0,0 +1,45 @@ +//! This module abstracts over `loom` and `std::sync` depending on whether we +//! are running tests or not. +pub(crate) use self::inner::*; + +#[cfg(all(test, loom))] +mod inner { + pub(crate) use loom::sync::CausalCell; + pub(crate) use loom::sync::Mutex; + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::*; + pub(crate) use std::sync::atomic::Ordering; + } +} + +#[cfg(not(all(test, loom)))] +mod inner { + use std::cell::UnsafeCell; + pub(crate) use std::sync::atomic; + pub(crate) use std::sync::Mutex; + + #[derive(Debug)] + pub(crate) struct CausalCell<T>(UnsafeCell<T>); + + impl<T> CausalCell<T> { + pub(crate) fn new(data: T) -> CausalCell<T> { + CausalCell(UnsafeCell::new(data)) + } + + #[inline(always)] + pub(crate) fn with<F, R>(&self, f: F) -> R + where + F: FnOnce(*const T) -> R, + { + f(self.0.get()) + } + + #[inline(always)] + pub(crate) fn with_mut<F, R>(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } +} diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs index 9079ccc7..b7f33d02 100644 --- a/tokio/src/net/driver/mod.rs +++ b/tokio/src/net/driver/mod.rs @@ -124,7 +124,15 @@ //! [`PollEvented`]: struct.PollEvented.html //! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html //! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +#[cfg(loom)] +macro_rules! loom_thread_local { + ($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } } +} +#[cfg(not(loom))] +macro_rules! loom_thread_local { + ($($tts:tt)+) => { std::thread_local!{ $($tts)+ } } +} pub(crate) mod platform; mod reactor; mod registration; diff --git a/tokio/src/net/driver/reactor/dispatch/iter.rs b/tokio/src/net/driver/reactor/dispatch/iter.rs new file mode 100644 index 00000000..f785f5d4 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/iter.rs @@ -0,0 +1,53 @@ +use super::{ + page::{self, ScheduledIo}, + Shard, +}; +use std::slice; + +pub(in crate::net::driver::reactor) struct UniqueIter<'a> { + pub(super) shards: slice::IterMut<'a, Shard>, + pub(super) pages: slice::Iter<'a, page::Shared>, + pub(super) slots: Option<page::Iter<'a>>, +} + +impl<'a> Iterator for UniqueIter<'a> { + type Item = &'a ScheduledIo; + fn next(&mut self) -> Option<Self::Item> { + loop { + if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) { + return Some(item); + } + + if let Some(page) = self.pages.next() { + self.slots = page.iter(); + } + + if let Some(shard) = self.shards.next() { + self.pages = shard.iter(); + } else { + return None; + } + } + } +} + +pub(in crate::net::driver::reactor) struct ShardIter<'a> { + pub(super) pages: slice::IterMut<'a, page::Shared>, + pub(super) slots: Option<page::Iter<'a>>, +} + +impl<'a> Iterator for ShardIter<'a> { + type Item = &'a ScheduledIo; + fn next(&mut self) -> Option<Self::Item> { + loop { + if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) { + return Some(item); + } + if let Some(page) = self.pages.next() { + self.slots = page.iter(); + } else { + return None; + } + } + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/mod.rs b/tokio/src/net/driver/reactor/dispatch/mod.rs new file mode 100644 index 00000000..d7262a35 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/mod.rs @@ -0,0 +1,36 @@ +//! A lock-free concurrent slab. + +#[cfg(all(test, loom))] +macro_rules! test_println { + ($($arg:tt)*) => { + println!("{:?} {}", crate::net::driver::reactor::dispatch::Tid::current(), format_args!($($arg)*)) + } +} + +mod iter; +mod pack; +mod page; +mod sharded_slab; +mod tid; + +#[cfg(all(test, loom))] +// this is used by sub-modules +use self::tests::test_util; +use pack::{Pack, WIDTH}; +use sharded_slab::Shard; +#[cfg(all(test, loom))] +pub(crate) use sharded_slab::Slab; +pub(crate) use sharded_slab::{SingleShard, MAX_SOURCES}; +use tid::Tid; + +#[cfg(target_pointer_width = "64")] +const MAX_THREADS: usize = 4096; +#[cfg(target_pointer_width = "32")] +const MAX_THREADS: usize = 2048; +const INITIAL_PAGE_SIZE: usize = 32; +const MAX_PAGES: usize = WIDTH / 4; +// Chosen arbitrarily. +const RESERVED_BITS: usize = 5; + +#[cfg(test)] +mod tests; diff --git a/tokio/src/net/driver/reactor/dispatch/pack.rs b/tokio/src/net/driver/reactor/dispatch/pack.rs new file mode 100644 index 00000000..0be44a48 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/pack.rs @@ -0,0 +1,89 @@ +pub(super) const WIDTH: usize = std::mem::size_of::<usize>() * 8; + +/// Trait encapsulating the calculations required for bit-packing slab indices. +/// +/// This allows us to avoid manually repeating some calculations when packing +/// and unpacking indices. +pub(crate) trait Pack: Sized { + // ====== provided by each implementation ================================= + + /// The number of bits occupied by this type when packed into a usize. + /// + /// This must be provided to determine the number of bits into which to pack + /// the type. + const LEN: usize; + /// The type packed on the less significant side of this type. + /// + /// If this type is packed into the least significant bit of a usize, this + /// should be `()`, which occupies no bytes. + /// + /// This is used to calculate the shift amount for packing this value. + type Prev: Pack; + + // ====== calculated automatically ======================================== + + /// A number consisting of `Self::LEN` 1 bits, starting at the least + /// significant bit. + /// + /// This is the higest value this type can represent. This number is shifted + /// left by `Self::SHIFT` bits to calculate this type's `MASK`. + /// + /// This is computed automatically based on `Self::LEN`. + const BITS: usize = { + let shift = 1 << (Self::LEN - 1); + shift | (shift - 1) + }; + /// The number of bits to shift a number to pack it into a usize with other + /// values. + /// + /// This is caculated automatically based on the `LEN` and `SHIFT` constants + /// of the previous value. + const SHIFT: usize = Self::Prev::SHIFT + Self::Prev::LEN; + + /// The mask to extract only this type from a packed `usize`. + /// + /// This is calculated by shifting `Self::BITS` left by `Self::SHIFT`. + const MASK: usize = Self::BITS << Self::SHIFT; + + fn as_usize(&self) -> usize; + fn from_usize(val: usize) -> Self; + + #[inline(always)] + fn pack(&self, to: usize) -> usize { + let value = self.as_usize(); + debug_assert!(value <= Self::BITS); + + (to & !Self::MASK) | (value << Self::SHIFT) + } + + #[inline(always)] + fn from_packed(from: usize) -> Self { + let value = (from & Self::MASK) >> Self::SHIFT; + debug_assert!(value <= Self::BITS); + Self::from_usize(value) + } +} + +impl Pack for () { + const BITS: usize = 0; + const LEN: usize = 0; + const SHIFT: usize = 0; + const MASK: usize = 0; + + type Prev = (); + + fn as_usize(&self) -> usize { + unreachable!() + } + fn from_usize(_val: usize) -> Self { + unreachable!() + } + + fn pack(&self, _to: usize) -> usize { + unreachable!() + } + + fn from_packed(_from: usize) -> Self { + unreachable!() + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/page/mod.rs b/tokio/src/net/driver/reactor/dispatch/page/mod.rs new file mode 100644 index 00000000..0b8d3c4c --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/page/mod.rs @@ -0,0 +1,257 @@ +use super::{Pack, INITIAL_PAGE_SIZE, WIDTH}; +use crate::loom::CausalCell; + +pub(crate) mod scheduled_io; +mod stack; +pub(crate) use self::scheduled_io::ScheduledIo; +use self::stack::TransferStack; +use std::fmt; + +/// A page address encodes the location of a slot within a shard (the page +/// number and offset within that page) as a single linear value. +#[repr(transparent)] +#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)] +pub(crate) struct Addr { + addr: usize, +} + +impl Addr { + const NULL: usize = Self::BITS + 1; + const INDEX_SHIFT: usize = INITIAL_PAGE_SIZE.trailing_zeros() as usize + 1; + + pub(crate) fn index(self) -> usize { + // Since every page is twice as large as the previous page, and all page sizes + // are powers of two, we can determine the page index that contains a given + // address by shifting the address down by the smallest page size and + // looking at how many twos places necessary to represent that number, + // telling us what power of two page size it fits inside of. We can + // determine the number of twos places by counting the number of leading + // zeros (unused twos places) in the number's binary representation, and + // subtracting that count from the total number of bits in a word. + WIDTH - ((self.addr + INITIAL_PAGE_SIZE) >> Self::INDEX_SHIFT).leading_zeros() as usize + } + + pub(crate) fn offset(self) -> usize { + self.addr + } +} + +pub(super) fn size(n: usize) -> usize { + INITIAL_PAGE_SIZE * 2usize.pow(n as _) +} + +impl Pack for Addr { + const LEN: usize = super::MAX_PAGES + Self::INDEX_SHIFT; + + type Prev = (); + + fn as_usize(&self) -> usize { + self.addr + } + + fn from_usize(addr: usize) -> Self { + debug_assert!(addr <= Self::BITS); + Self { addr } + } +} + +pub(in crate::net::driver) type Iter<'a> = std::slice::Iter<'a, ScheduledIo>; + +pub(crate) struct Local { + head: CausalCell<usize>, +} + +pub(crate) struct Shared { + remote: TransferStack, + size: usize, + prev_sz: usize, + slab: CausalCell<Option<Box<[ScheduledIo]>>>, +} + +impl Local { + pub(crate) fn new() -> Self { + Self { + head: CausalCell::new(0), + } + } + + #[inline(always)] + fn head(&self) -> usize { + self.head.with(|head| unsafe { *head }) + } + + #[inline(always)] + fn set_head(&self, new_head: usize) { + self.head.with_mut(|head| unsafe { + *head = new_head; + }) + } +} + +impl Shared { + const NULL: usize = Addr::NULL; + + pub(crate) fn new(size: usize, prev_sz: usize) -> Self { + Self { + prev_sz, + size, + remote: TransferStack::new(), + slab: CausalCell::new(None), + } + } + + /// Allocates storage for this page if it does not allready exist. + /// + /// This requires unique access to the page (e.g. it is called from the + /// thread that owns the page, or, in the case of `SingleShard`, while the + /// lock is held). In order to indicate this, a reference to the page's + /// `Local` data is taken by this function; the `Local` argument is not + /// actually used, but requiring it ensures that this is only called when + /// local access is held. + #[cold] + fn alloc_page(&self, _: &Local) { + debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() })); + + let mut slab = Vec::with_capacity(self.size); + slab.extend((1..self.size).map(ScheduledIo::new)); + slab.push(ScheduledIo::new(Self::NULL)); + self.slab.with_mut(|s| { + // this mut access is safe — it only occurs to initially + // allocate the page, which only happens on this thread; if the + // page has not yet been allocated, other threads will not try + // to access it yet. + unsafe { + *s = Some(slab.into_boxed_slice()); + } + }); + } + + #[inline] + pub(crate) fn alloc(&self, local: &Local) -> Option<usize> { + let head = local.head(); + + // are there any items on the local free list? (fast path) + let head = if head < self.size { + head + } else { + // if the local free list is empty, pop all the items on the remote + // free list onto the local free list. + self.remote.pop_all()? + }; + + // if the head is still null, both the local and remote free lists are + // empty --- we can't fit any more items on this page. + if head == Self::NULL { + return None; + } + + // do we need to allocate storage for this page? + let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() }); + if page_needs_alloc { + self.alloc_page(local); + } + + let gen = self.slab.with(|slab| { + let slab = unsafe { &*(slab) } + .as_ref() + .expect("page must have been allocated to alloc!"); + let slot = &slab[head]; + local.set_head(slot.next()); + slot.alloc() + }); + + let index = head + self.prev_sz; + Some(gen.pack(index)) + } + + #[inline] + pub(in crate::net::driver) fn get(&self, addr: Addr) -> Option<&ScheduledIo> { + let page_offset = addr.offset() - self.prev_sz; + self.slab + .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset)) + } + + pub(crate) fn remove_local(&self, local: &Local, addr: Addr, idx: usize) { + let offset = addr.offset() - self.prev_sz; + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot + } else { + return; + }; + if slot.reset(scheduled_io::Generation::from_packed(idx)) { + slot.set_next(local.head()); + local.set_head(offset); + } + }) + } + + pub(crate) fn remove_remote(&self, addr: Addr, idx: usize) { + let offset = addr.offset() - self.prev_sz; + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot + } else { + return; + }; + if !slot.reset(scheduled_io::Generation::from_packed(idx)) { + return; + } + self.remote.push(offset, |next| slot.set_next(next)); + }) + } + + pub(in crate::net::driver) fn iter(&self) -> Option<Iter<'_>> { + let slab = self.slab.with(|slab| unsafe { (&*slab).as_ref() }); + slab.map(|slab| slab.iter()) + } +} + +impl fmt::Debug for Local { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.head.with(|head| { + let head = unsafe { *head }; + f.debug_struct("Local") + .field("head", &format_args!("{:#0x}", head)) + .finish() + }) + } +} + +impl fmt::Debug for Shared { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("remote", &self.remote) + .field("prev_sz", &self.prev_sz) + .field("size", &self.size) + // .field("slab", &self.slab) + .finish() + } +} + +impl fmt::Debug for Addr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Addr") + .field("addr", &format_args!("{:#0x}", &self.addr)) + .field("index", &self.index()) + .field("offset", &self.offset()) + .finish() + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + use proptest::prelude::*; + + proptest! { + #[test] + fn addr_roundtrips(pidx in 0usize..Addr::BITS) { + let addr = Addr::from_usize(pidx); + let packed = addr.pack(0); + assert_eq!(addr, Addr::from_packed(packed)); + } + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs new file mode 100644 index 00000000..34a07ea8 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs @@ -0,0 +1,171 @@ +use super::super::{Pack, Tid, RESERVED_BITS, WIDTH}; +use crate::loom::{ + atomic::{AtomicUsize, Ordering}, + CausalCell, +}; + +use tokio_sync::AtomicWaker; + +#[derive(Debug)] +pub(crate) struct ScheduledIo { + /// The offset of the next item on the free list. + next: CausalCell<usize>, + readiness: AtomicUsize, + pub(in crate::net::driver) reader: AtomicWaker, + pub(in crate::net::driver) writer: AtomicWaker, +} + +#[repr(transparent)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] +pub(crate) struct Generation { + value: usize, +} + +impl Pack for Generation { + /// Use all the remaining bits in the word for the generation counter, minus + /// any bits reserved by the user. + const LEN: usize = (WIDTH - RESERVED_BITS) - Self::SHIFT; + + type Prev = Tid; + + #[inline(always)] + fn from_usize(u: usize) -> Self { + debug_assert!(u <= Self::BITS); + Self::new(u) + } + + #[inline(always)] + fn as_usize(&self) -> usize { + self.value + } +} + +impl Generation { + const ONE: usize = 1 << Self::SHIFT; + + fn new(value: usize) -> Self { + Self { value } + } + + fn next(self) -> Self { + Self::from_usize((self.value + 1) % Self::BITS) + } +} + +impl ScheduledIo { + pub(super) fn new(next: usize) -> Self { + Self { + next: CausalCell::new(next), + readiness: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + } + } + + #[inline] + pub(super) fn alloc(&self) -> Generation { + Generation::from_packed(self.readiness.load(Ordering::SeqCst)) + } + + #[inline(always)] + pub(super) fn next(&self) -> usize { + self.next.with(|next| unsafe { *next }) + } + + #[inline] + pub(super) fn reset(&self, gen: Generation) -> bool { + let mut current = self.readiness.load(Ordering::Acquire); + loop { + if Generation::from_packed(current) != gen { + return false; + } + let next_gen = gen.next().pack(0); + match self.readiness.compare_exchange( + current, + next_gen, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } + drop(self.reader.take_waker()); + drop(self.writer.take_waker()); + true + } + + #[inline(always)] + pub(super) fn set_next(&self, next: usize) { + self.next.with_mut(|n| unsafe { + (*n) = next; + }) + } + + /// Returns the current readiness value of this `ScheduledIo`, if the + /// provided `token` is still a valid access. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `None`. + /// Otherwise, this returns the current readiness. + pub(in crate::net::driver) fn get_readiness(&self, token: usize) -> Option<usize> { + let gen = token & Generation::MASK; + let ready = self.readiness.load(Ordering::Acquire); + if ready & Generation::MASK != gen { + return None; + } + Some(ready & (!Generation::MASK)) + } + + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on + /// the current value, returning the previous readiness value. + /// + /// # Arguments + /// - `token`: the token for this `ScheduledIo`. + /// - `f`: a closure returning a new readiness value given the previous + /// readiness. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `Err`. + /// Otherwise, this returns the previous readiness. + pub(in crate::net::driver) fn set_readiness( + &self, + token: usize, + f: impl Fn(usize) -> usize, + ) -> Result<usize, ()> { + let gen = token & Generation::MASK; + let mut current = self.readiness.load(Ordering::Acquire); + loop { + // Check that the generation for this access is still the current + // one. + if current & Generation::MASK != gen { + return Err(()); + } + // Mask out the generation bits so that the modifying function + // doesn't see them. + let current_readiness = current & mio::Ready::all().as_usize(); + let new = f(current_readiness); + debug_assert!( + new < Generation::ONE, + "new readiness value would overwrite generation bits!" + ); + + match self.readiness.compare_exchange( + current, + new | gen, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return Ok(current), + // we lost the race, retry! + Err(actual) => current = actual, + } + } + } +} diff --git a/tokio/src/net/driver/reactor/dispatch/page/stack.rs b/tokio/src/net/driver/reactor/dispatch/page/stack.rs new file mode 100644 index 00000000..26597dc2 --- /dev/null +++ b/tokio/src/net/driver/reactor/dispatch/page/stack.rs @@ -0,0 +1,149 @@ +use crate::loom::atomic::{AtomicUsize, Ordering}; +use std::fmt; + +pub(super) struct TransferStack { + head: AtomicUsize, +} + +impl TransferStack { + pub(super) fn new() -> Self { + Self { + head: AtomicUsize::new(super::Addr::NULL), + } + } + + pub(super) fn pop_all(&self) -> Option<usize> { + let val = self.head.swap(super::Addr::NULL, Ordering::Acquire); + if val == super::Addr::NULL { + None + } else { + Some(val) + } + } + + pub(super) fn push(&self, value: usize, before: impl Fn(usize)) { + let mut next = self.head.load(Ordering::Relaxed); + loop { + before(next); + + match self + .head + .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire) + { + // lost the race! + Err(actual) => next = actual, + Ok(_) => return, + } + } + } +} + +impl fmt::Debug for TransferStack { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Loom likes to dump all its internal state in `fmt::Debug` impls, so + // we override this to just print the current value in tests. + f.debug_struct("TransferStack") + .field( + "head", + &format_args!("{:#x}", self.head.load(Ordering::Relaxed)), + ) + .finish() + } +} + +#[cfg(all(test, loom))] +mod test { + use super::super::super::test_util; + use super::*; + use crate::loom::CausalCell; + use loom::thread; + use std::sync::Arc; + + #[test] + fn transfer_stack() { + test_util::run_model("transfer_stack", || { + let causalities = [CausalCell::new(None), CausalCell::new(None)]; + let shared = Arc::new((causalities, TransferStack::new())); + let shared1 = shared.clone(); + let shared2 = shared.clone(); + + // Spawn two threads that both try to push to the stack. + let t1 = thread::spawn(move || { + let (causalities, stack) = &*shared1; + stack.push(0, |prev| { + causalities[0].with_mut(|c| unsafe { + *c = Some(prev); + }); + test_println!("prev={:#x}", prev) + }); + }); + + let t2 = thread::spawn(move || { + let (causalities, stack) = &*shared2; + stack.push(1, |prev| { + causalities[1].with_mut(|c| unsafe { + *c = Some(prev); + }); + test_println!("prev={:#x}", prev) + }); + }); + + let (causalities, stack) = &*shared; + + // Try to pop from the stack... + let mut idx = stack.pop_all(); + while idx == None { + idx = stack.pop_all(); + thread::yield_now(); + } + let idx = idx.unwrap(); + test_println!("popped {:#x}", idx); + + let saw_both = causalities[idx].with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + // were there two entries in the stack? if so, check that + |