From 69975fb9601bbb21659db283d888470733bae660 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 20 Nov 2019 00:05:14 -0800 Subject: Refactor the I/O driver, extracting slab to `tokio::util`. (#1792) The I/O driver is made private and moved to `tokio::io::driver`. `Registration` is moved to `tokio::io::Registration` and `PollEvented` is moved to `tokio::io::PollEvented`. Additionally, the concurrent slab used by the I/O driver is cleaned up and extracted to `tokio::util::slab`, allowing it to eventually be used by other types. --- tokio/src/util/bit.rs | 86 +++++++++ tokio/src/util/mod.rs | 15 +- tokio/src/util/slab/addr.rs | 155 +++++++++++++++ tokio/src/util/slab/entry.rs | 7 + tokio/src/util/slab/generation.rs | 32 ++++ tokio/src/util/slab/mod.rs | 109 +++++++++++ tokio/src/util/slab/page.rs | 187 ++++++++++++++++++ tokio/src/util/slab/shard.rs | 108 +++++++++++ tokio/src/util/slab/slot.rs | 42 ++++ tokio/src/util/slab/stack.rs | 58 ++++++ tokio/src/util/slab/tests/loom_slab.rs | 327 ++++++++++++++++++++++++++++++++ tokio/src/util/slab/tests/loom_stack.rs | 88 +++++++++ tokio/src/util/slab/tests/mod.rs | 2 + 13 files changed, 1212 insertions(+), 4 deletions(-) create mode 100644 tokio/src/util/bit.rs create mode 100644 tokio/src/util/slab/addr.rs create mode 100644 tokio/src/util/slab/entry.rs create mode 100644 tokio/src/util/slab/generation.rs create mode 100644 tokio/src/util/slab/mod.rs create mode 100644 tokio/src/util/slab/page.rs create mode 100644 tokio/src/util/slab/shard.rs create mode 100644 tokio/src/util/slab/slot.rs create mode 100644 tokio/src/util/slab/stack.rs create mode 100644 tokio/src/util/slab/tests/loom_slab.rs create mode 100644 tokio/src/util/slab/tests/loom_stack.rs create mode 100644 tokio/src/util/slab/tests/mod.rs (limited to 'tokio/src/util') diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs new file mode 100644 index 00000000..03fb0cfb --- /dev/null +++ b/tokio/src/util/bit.rs @@ -0,0 +1,86 @@ +use std::fmt; + +#[derive(Clone, Copy)] +pub(crate) struct Pack { + mask: usize, + shift: u32, +} + +impl Pack { + /// Value is packed in the `width` most-significant bits. + pub(crate) const fn most_significant(width: u32) -> Pack { + let mask = mask_for(width).reverse_bits(); + + Pack { + mask, + shift: mask.trailing_zeros(), + } + } + + /// Value is packed in the `width` least-significant bits. + pub(crate) const fn least_significant(width: u32) -> Pack { + let mask = mask_for(width); + + Pack { + mask, + shift: 0, + } + } + + /// Value is packed in the `width` more-significant bits. + pub(crate) const fn then(&self, width: u32) -> Pack { + let shift = pointer_width() - self.mask.leading_zeros(); + let mask = mask_for(width) << shift; + + Pack { + mask, + shift, + } + } + + /// Mask used to unpack value + pub(crate) const fn mask(&self) -> usize { + self.mask + } + + /// Width, in bits, dedicated to storing the value. + pub(crate) const fn width(&self) -> u32 { + pointer_width() - (self.mask >> self.shift).leading_zeros() + } + + /// Max representable value + pub(crate) const fn max_value(&self) -> usize { + (1 << self.width()) - 1 + } + + pub(crate) fn pack(&self, value: usize, base: usize) -> usize { + assert!(value <= self.max_value()); + (base & !self.mask) | (value << self.shift) + } + + pub(crate) fn unpack(&self, src: usize) -> usize { + unpack(src, self.mask, self.shift) + } +} + +impl fmt::Debug for Pack { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "Pack {{ mask: {:b}, shift: {} }}", self.mask, self.shift) + } +} + +/// Returns the width of a pointer in bits +pub(crate) const fn pointer_width() -> u32 { + std::mem::size_of::() as u32 * 8 +} + +/// Returns a `usize` with the right-most `n` bits set. +pub(crate) const fn mask_for(n: u32) -> usize { + let shift = 1usize.wrapping_shl(n - 1); + shift | (shift - 1) +} + +/// Unpack a value using a mask & shift +pub(crate) const fn unpack(src: usize, mask: usize, shift: u32) -> usize { + (src & mask) >> shift +} diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 44377fcf..a50b33ac 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -1,5 +1,12 @@ -mod pad; -pub(crate) use self::pad::CachePadded; +cfg_io_driver! { + pub(crate) mod bit; + pub(crate) mod slab; +} -mod rand; -pub(crate) use self::rand::FastRand; +cfg_rt_threaded! { + mod pad; + pub(crate) use pad::CachePadded; + + mod rand; + pub(crate) use rand::FastRand; +} diff --git a/tokio/src/util/slab/addr.rs b/tokio/src/util/slab/addr.rs new file mode 100644 index 00000000..2efe93ef --- /dev/null +++ b/tokio/src/util/slab/addr.rs @@ -0,0 +1,155 @@ +//! Tracks the location of an entry in a slab. +//! +//! # Index packing +//! +//! A slab index consists of multiple indices packed into a single `usize` value +//! that correspond to different parts of the slab. +//! +//! The least significant `MAX_PAGES + INITIAL_PAGE_SIZE.trailing_zeros() + 1` +//! bits store the address within a shard, starting at 0 for the first slot on +//! the first page. To index a slot within a shard, we first find the index of +//! the page that the address falls on, and then the offset of the slot within +//! that page. +//! +//! Since every page is twice as large as the previous page, and all page sizes +//! are powers of two, we can determine the page index that contains a given +//! address by shifting the address down by the smallest page size and looking +//! at how many twos places necessary to represent that number, telling us what +//! power of two page size it fits inside of. We can determine the number of +//! twos places by counting the number of leading zeros (unused twos places) in +//! the number's binary representation, and subtracting that count from the +//! total number of bits in a word. +//! +//! Once we know what page contains an address, we can subtract the size of all +//! previous pages from the address to determine the offset within the page. +//! +//! After the page address, the next `MAX_THREADS.trailing_zeros() + 1` least +//! significant bits are the thread ID. These are used to index the array of +//! shards to find which shard a slot belongs to. If an entry is being removed +//! and the thread ID of its index matches that of the current thread, we can +//! use the `remove_local` fast path; otherwise, we have to use the synchronized +//! `remove_remote` path. +//! +//! Finally, a generation value is packed into the index. The `RESERVED_BITS` +//! most significant bits are left unused, and the remaining bits between the +//! last bit of the thread ID and the first reserved bit are used to store the +//! generation. The generation is used as part of an atomic read-modify-write +//! loop every time a `ScheduledIo`'s readiness is modified, or when the +//! resource is removed, to guard against the ABA problem. +//! +//! Visualized: +//! +//! ```text +//! ┌──────────┬───────────────┬──────────────────┬──────────────────────────┐ +//! │ reserved │ generation │ thread ID │ address │ +//! └▲─────────┴▲──────────────┴▲─────────────────┴▲────────────────────────▲┘ +//! │ │ │ │ │ +//! bits(usize) │ bits(MAX_THREADS) │ 0 +//! │ │ +//! bits(usize) - RESERVED MAX_PAGES + bits(INITIAL_PAGE_SIZE) +//! ``` + +use crate::util::bit; +use crate::util::slab::{Generation, MAX_PAGES, MAX_THREADS, INITIAL_PAGE_SIZE}; + +use std::usize; + +/// References the location at which an entry is stored in a slab. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(crate) struct Address(usize); + +const PAGE_INDEX_SHIFT: u32 = INITIAL_PAGE_SIZE.trailing_zeros() + 1; + +/// Address in the shard +const SLOT: bit::Pack = bit::Pack::least_significant( + MAX_PAGES as u32 + PAGE_INDEX_SHIFT); + +/// Masks the thread identifier +const THREAD: bit::Pack = SLOT.then(MAX_THREADS.trailing_zeros() + 1); + +/// Masks the generation +const GENERATION: bit::Pack = THREAD.then( + bit::pointer_width().wrapping_sub(RESERVED.width() + THREAD.width() + SLOT.width())); + +// Chosen arbitrarily +const RESERVED: bit::Pack = bit::Pack::most_significant(5); + +impl Address { + /// Represents no entry, picked to avoid collision with Mio's internals. + /// This value should not be passed to mio. + pub(crate) const NULL: usize = usize::MAX >> 1; + + /// Re-exported by `Generation`. + pub(super) const GENERATION_WIDTH: u32 = GENERATION.width(); + + pub(super) fn new(shard_index: usize, generation: Generation) -> Address { + let mut repr = 0; + + repr = SLOT.pack(shard_index, repr); + repr = GENERATION.pack(generation.to_usize(), repr); + + Address(repr) + } + + /// Convert from a `usize` representation. + pub(crate) fn from_usize(src: usize) -> Address { + assert_ne!(src, Self::NULL); + + Address(src) + } + + /// Convert to a `usize` representation + pub(crate) fn to_usize(self) -> usize { + self.0 + } + + pub(crate) fn generation(self) -> Generation { + Generation::new(GENERATION.unpack(self.0)) + } + + /// Returns the page index + pub(super) fn page(self) -> usize { + // Since every page is twice as large as the previous page, and all page + // sizes are powers of two, we can determine the page index that + // contains a given address by shifting the address down by the smallest + // page size and looking at how many twos places necessary to represent + // that number, telling us what power of two page size it fits inside + // of. We can determine the number of twos places by counting the number + // of leading zeros (unused twos places) in the number's binary + // representation, and subtracting that count from the total number of + // bits in a word. + let slot_shifted = (self.slot() + INITIAL_PAGE_SIZE) >> PAGE_INDEX_SHIFT; + (bit::pointer_width() - slot_shifted.leading_zeros()) as usize + } + + /// Returns the slot index + pub(super) fn slot(self) -> usize { + SLOT.unpack(self.0) + } +} + +#[cfg(test)] +cfg_not_loom! { + use proptest::proptest; + + #[test] + fn test_pack_format() { + assert_eq!(5, RESERVED.width()); + assert_eq!(0b11111, RESERVED.max_value()); + } + + proptest! { + #[test] + fn address_roundtrips( + slot in 0usize..SLOT.max_value(), + generation in 0usize..Generation::MAX, + ) { + let address = Address::new(slot, Generation::new(generation)); + // Round trip + let address = Address::from_usize(address.to_usize()); + + assert_eq!(address.slot(), slot); + assert_eq!(address.generation().to_usize(), generation); + } + } +} diff --git a/tokio/src/util/slab/entry.rs b/tokio/src/util/slab/entry.rs new file mode 100644 index 00000000..2e0b10b0 --- /dev/null +++ b/tokio/src/util/slab/entry.rs @@ -0,0 +1,7 @@ +use crate::util::slab::Generation; + +pub(crate) trait Entry: Default { + fn generation(&self) -> Generation; + + fn reset(&self, generation: Generation) -> bool; +} diff --git a/tokio/src/util/slab/generation.rs b/tokio/src/util/slab/generation.rs new file mode 100644 index 00000000..4b16b2ca --- /dev/null +++ b/tokio/src/util/slab/generation.rs @@ -0,0 +1,32 @@ +use crate::util::bit; +use crate::util::slab::Address; + +/// An mutation identifier for a slot in the slab. The generation helps prevent +/// accessing an entry with an outdated token. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] +pub(crate) struct Generation(usize); + +impl Generation { + pub(crate) const WIDTH: u32 = Address::GENERATION_WIDTH; + + pub(super) const MAX: usize = bit::mask_for(Address::GENERATION_WIDTH); + + /// Create a new generation + /// + /// # Panics + /// + /// Panics if `value` is greater than max generation. + pub(crate) fn new(value: usize) -> Generation { + assert!(value <= Self::MAX); + Generation(value) + } + + /// Returns the next generation value + pub(crate) fn next(self) -> Generation { + Generation((self.0 + 1) & Self::MAX) + } + + pub(crate) fn to_usize(self) -> usize { + self.0 + } +} diff --git a/tokio/src/util/slab/mod.rs b/tokio/src/util/slab/mod.rs new file mode 100644 index 00000000..e9b53c53 --- /dev/null +++ b/tokio/src/util/slab/mod.rs @@ -0,0 +1,109 @@ +//! A lock-free concurrent slab. + +mod addr; +pub(crate) use addr::Address; + +mod entry; +pub(crate) use entry::Entry; + +mod generation; +pub(crate) use generation::Generation; + +mod page; + +mod shard; +use shard::Shard; + +mod slot; +use slot::Slot; + +mod stack; +use stack::TransferStack; + +#[cfg(all(loom, test))] +mod tests; + +use crate::loom::sync::Mutex; +use crate::util::bit; + +use std::fmt; + +#[cfg(target_pointer_width = "64")] +const MAX_THREADS: usize = 4096; + +#[cfg(target_pointer_width = "32")] +const MAX_THREADS: usize = 2048; + +/// Max number of pages per slab +const MAX_PAGES: usize = bit::pointer_width() as usize / 4; + +cfg_not_loom! { + /// Size of first page + const INITIAL_PAGE_SIZE: usize = 32; +} + +cfg_loom! { + const INITIAL_PAGE_SIZE: usize = 2; +} + +/// A sharded slab. +pub(crate) struct Slab { + // Signal shard for now. Eventually there will be more. + shard: Shard, + local: Mutex<()>, +} + +unsafe impl Send for Slab {} +unsafe impl Sync for Slab {} + +impl Slab { + /// Returns a new slab with the default configuration parameters. + pub(crate) fn new() -> Slab { + Slab { + shard: Shard::new(), + local: Mutex::new(()), + } + } + + /// allocs a value into the slab, returning a key that can be used to + /// access it. + /// + /// If this function returns `None`, then the shard for the current thread + /// is full and no items can be added until some are removed, or the maximum + /// number of shards has been reached. + pub(crate) fn alloc(&self) -> Option
{ + // we must lock the slab to alloc an item. + let _local = self.local.lock().unwrap(); + self.shard.alloc() + } + + /// Removes the value associated with the given key from the slab. + pub(crate) fn remove(&self, idx: Address) { + // try to lock the slab so that we can use `remove_local`. + let lock = self.local.try_lock(); + + // if we were able to lock the slab, we are "local" and can use the fast + // path; otherwise, we will use `remove_remote`. + if lock.is_ok() { + self.shard.remove_local(idx) + } else { + self.shard.remove_remote(idx) + } + } + + /// Return a reference to the value associated with the given key. + /// + /// If the slab does not contain a value for the given key, `None` is + /// returned instead. + pub(crate) fn get(&self, token: Address) -> Option<&T> { + self.shard.get(token) + } +} + +impl fmt::Debug for Slab { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Slab") + .field("shard", &self.shard) + .finish() + } +} diff --git a/tokio/src/util/slab/page.rs b/tokio/src/util/slab/page.rs new file mode 100644 index 00000000..6b1c64c0 --- /dev/null +++ b/tokio/src/util/slab/page.rs @@ -0,0 +1,187 @@ +use crate::loom::cell::CausalCell; +use crate::util::slab::{Address, Entry, Slot, TransferStack, INITIAL_PAGE_SIZE}; + +use std::fmt; + +/// Data accessed only by the thread that owns the shard. +pub(crate) struct Local { + head: CausalCell, +} + +/// Data accessed by any thread. +pub(crate) struct Shared { + remote: TransferStack, + size: usize, + prev_sz: usize, + slab: CausalCell]>>>, +} + +/// Returns the size of the page at index `n` +pub(super) fn size(n: usize) -> usize { + INITIAL_PAGE_SIZE << n +} + +impl Local { + pub(crate) fn new() -> Self { + Self { + head: CausalCell::new(0), + } + } + + fn head(&self) -> usize { + self.head.with(|head| unsafe { *head }) + } + + fn set_head(&self, new_head: usize) { + self.head.with_mut(|head| unsafe { + *head = new_head; + }) + } +} + +impl Shared { + pub(crate) fn new(size: usize, prev_sz: usize) -> Shared { + Self { + prev_sz, + size, + remote: TransferStack::new(), + slab: CausalCell::new(None), + } + } + + /// Allocates storage for this page if it does not allready exist. + /// + /// This requires unique access to the page (e.g. it is called from the + /// thread that owns the page, or, in the case of `SingleShard`, while the + /// lock is held). In order to indicate this, a reference to the page's + /// `Local` data is taken by this function; the `Local` argument is not + /// actually used, but requiring it ensures that this is only called when + /// local access is held. + #[cold] + fn alloc_page(&self, _: &Local) { + debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() })); + + let mut slab = Vec::with_capacity(self.size); + slab.extend((1..self.size).map(Slot::new)); + slab.push(Slot::new(Address::NULL)); + + self.slab.with_mut(|s| { + // this mut access is safe — it only occurs to initially + // allocate the page, which only happens on this thread; if the + // page has not yet been allocated, other threads will not try + // to access it yet. + unsafe { + *s = Some(slab.into_boxed_slice()); + } + }); + } + + pub(crate) fn alloc(&self, local: &Local) -> Option
{ + let head = local.head(); + + // are there any items on the local free list? (fast path) + let head = if head < self.size { + head + } else { + // if the local free list is empty, pop all the items on the remote + // free list onto the local free list. + self.remote.pop_all()? + }; + + // if the head is still null, both the local and remote free lists are + // empty --- we can't fit any more items on this page. + if head == Address::NULL { + return None; + } + + // do we need to allocate storage for this page? + let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() }); + if page_needs_alloc { + self.alloc_page(local); + } + + let gen = self.slab.with(|slab| { + let slab = unsafe { &*(slab) } + .as_ref() + .expect("page must have been allocated to alloc!"); + + let slot = &slab[head]; + + local.set_head(slot.next()); + slot.generation() + }); + + let index = head + self.prev_sz; + + Some(Address::new(index, gen)) + } + + pub(crate) fn get(&self, addr: Address) -> Option<&T> { + let page_offset = addr.slot() - self.prev_sz; + + self.slab + .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset)) + .map(|slot| slot.get()) + } + + pub(crate) fn remove_local(&self, local: &Local, addr: Address) { + let offset = addr.slot() - self.prev_sz; + + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + + let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot + } else { + return; + }; + + if slot.reset(addr.generation()) { + slot.set_next(local.head()); + local.set_head(offset); + } + }) + } + + pub(crate) fn remove_remote(&self, addr: Address) { + let offset = addr.slot() - self.prev_sz; + + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + + let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot + } else { + return; + }; + + if !slot.reset(addr.generation()) { + return; + } + + self.remote.push(offset, |next| slot.set_next(next)); + }) + } +} + +impl fmt::Debug for Local { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.head.with(|head| { + let head = unsafe { *head }; + f.debug_struct("Local") + .field("head", &format_args!("{:#0x}", head)) + .finish() + }) + } +} + +impl fmt::Debug for Shared { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("remote", &self.remote) + .field("prev_sz", &self.prev_sz) + .field("size", &self.size) + // .field("slab", &self.slab) + .finish() + } +} diff --git a/tokio/src/util/slab/shard.rs b/tokio/src/util/slab/shard.rs new file mode 100644 index 00000000..711e36d3 --- /dev/null +++ b/tokio/src/util/slab/shard.rs @@ -0,0 +1,108 @@ +use crate::util::slab::{Address, Entry, page, MAX_PAGES}; + +use std::fmt; + +// ┌─────────────┐ ┌────────┐ +// │ page 1 │ │ │ +// ├─────────────┤ ┌───▶│ next──┼─┐ +// │ page 2 │ │ ├────────┤ │ +// │ │ │ │XXXXXXXX│ │ +// │ local_free──┼─┘ ├────────┤ │ +// │ global_free─┼─┐ │ │◀┘ +// ├─────────────┤ └───▶│ next──┼─┐ +// │ page 3 │ ├────────┤ │ +// └─────────────┘ │XXXXXXXX│ │ +// ... ├────────┤ │ +// ┌─────────────┐ │XXXXXXXX│ │ +// │ page n │ ├────────┤ │ +// └─────────────┘ │ │◀┘ +// │ next──┼───▶ +// ├────────┤ +// │XXXXXXXX│ +// └────────┘ +// ... +pub(super) struct Shard { + /// The local free list for each page. + /// + /// These are only ever accessed from this shard's thread, so they are + /// stored separately from the shared state for the page that can be + /// accessed concurrently, to minimize false sharing. + local: Box<[page::Local]>, + /// The shared state for each page in this shard. + /// + /// This consists of the page's metadata (size, previous size), remote free + /// list, and a pointer to the actual array backing that page. + shared: Box<[page::Shared]>, +} + +impl Shard { + pub(super) fn new() -> Shard { + let mut total_sz = 0; + let shared = (0..MAX_PAGES) + .map(|page_num| { + let sz = page::size(page_num); + let prev_sz = total_sz; + total_sz += sz; + page::Shared::new(sz, prev_sz) + }) + .collect(); + + let local = (0..MAX_PAGES).map(|_| page::Local::new()).collect(); + + Shard { + local, + shared, + } + } + + pub(super) fn alloc(&self) -> Option
{ + // Can we fit the value into an existing page? + for (page_idx, page) in self.shared.iter().enumerate() { + let local = self.local(page_idx); + + if let Some(page_offset) = page.alloc(local) { + return Some(page_offset); + } + } + + None + } + + pub(super) fn get(&self, addr: Address) -> Option<&T> { + let page_idx = addr.page(); + + if page_idx > self.shared.len() { + return None; + } + + self.shared[page_idx].get(addr) + } + + /// Remove an item on the shard's local thread. + pub(super) fn remove_local(&self, addr: Address) { + let page_idx = addr.page(); + + if let Some(page) = self.shared.get(page_idx) { + page.remove_local(self.local(page_idx), addr); + } + } + + /// Remove an item, while on a different thread from the shard's local thread. + pub(super) fn remove_remote(&self, addr: Address) { + if let Some(page) = self.shared.get(addr.page()) { + page.remove_remote(addr); + } + } + + fn local(&self, i: usize) -> &page::Local { + &self.local[i] + } +} + +impl fmt::Debug for Shard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shard") + .field("shared", &self.shared) + .finish() + } +} diff --git a/tokio/src/util/slab/slot.rs b/tokio/src/util/slab/slot.rs new file mode 100644 index 00000000..59baacf5 --- /dev/null +++ b/tokio/src/util/slab/slot.rs @@ -0,0 +1,42 @@ +use crate::loom::cell::CausalCell; +use crate::util::slab::{Generation, Entry}; + +/// Stores an entry in the slab. +pub(super) struct Slot { + next: CausalCell, + entry: T, +} + +impl Slot { + /// Initialize a new `Slot` linked to `next`. + /// + /// The entry is initialized to a default value. + pub(super) fn new(next: usize) -> Slot { + Slot { + next: CausalCell::new(next), + entry: T::default(), + } + } + + pub(super) fn get(&self) -> &T { + &self.entry + } + + pub(super) fn generation(&self) -> Generation { + self.entry.generation() + } + + pub(super) fn reset(&self, generation: Generation) -> bool { + self.entry.reset(generation) + } + + pub(super) fn next(&self) -> usize { + self.next.with(|next| unsafe { *next }) + } + + pub(super) fn set_next(&self, next: usize) { + self.next.with_mut(|n| unsafe { + (*n) = next; + }) + } +} diff --git a/tokio/src/util/slab/stack.rs b/tokio/src/util/slab/stack.rs new file mode 100644 index 00000000..0ae0d710 --- /dev/null +++ b/tokio/src/util/slab/stack.rs @@ -0,0 +1,58 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::util::slab::Address; + +use std::fmt; +use std::sync::atomic::Ordering; +use std::usize; + +pub(super) struct TransferStack { + head: AtomicUsize, +} + +impl TransferStack { + pub(super) fn new() -> Self { + Self { + head: AtomicUsize::new(Address::NULL), + } + } + + pub(super) fn pop_all(&self) -> Option { + let val = self.head.swap(Address::NULL, Ordering::Acquire); + + if val == Address::NULL { + None + } else { + Some(val) + } + } + + pub(super) fn push(&self, value: usize, before: impl Fn(usize)) { + let mut next = self.head.load(Ordering::Relaxed); + + loop { + before(next); + + match self + .head + .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire) + { + // lost the race! + Err(actual) => next = actual, + Ok(_) => return, + } + } + } +} + +impl fmt::Debug for TransferStack { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Loom likes to dump all its internal state in `fmt::Debug` impls, so + // we override this to just print the current value in tests. + f.debug_struct("TransferStack") + .field( + "head", + &format_args!("{:#x}", self.head.load(Ordering::Relaxed)), + ) + .finish() + } +} diff --git a/tokio/src/util/slab/tests/loom_slab.rs b/tokio/src/util/slab/tests/loom_slab.rs new file mode 100644 index 00000000..8a96a736 --- /dev/null +++ b/tokio/src/util/slab/tests/loom_slab.rs @@ -0,0 +1,327 @@ +use crate::io::driver::ScheduledIo; +use crate::util::slab::{Address, Slab}; + +use loom::sync::{Arc, Condvar, Mutex}; +use loom::thread; + +#[test] +fn local_remove() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + let idx = store_val(&s, 1); + assert_eq!(get_val(&s, idx), Some(1)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + let idx = store_val(&s, 2); + assert_eq!(get_val(&s, idx), Some(2)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + let idx = store_val(&s, 3); + assert_eq!(get_val(&s, idx), Some(3)); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + let idx = store_val(&s, 4); + s.remove(idx); + assert_eq!(get_val(&s, idx), None); + }); + + let s = slab; + let idx1 = store_val(&s, 5); + assert_eq!(get_val(&s, idx1), Some(5)); + let idx2 = store_val(&s, 6); + assert_eq!(get_val(&s, idx2), Some(6)); + s.remove(idx1); + assert_eq!(get_val(&s, idx1), None); + assert_eq!(get_val(&s, idx2), Some(6)); + s.remove(idx2); + assert_eq!(get_val(&s, idx2), None); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + }); +} + +#[test] +fn remove_remote() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + + let idx1 = store_val(&slab, 1); + assert_eq!(get_val(&slab, idx1), Some(1)); + + let idx2 = store_val(&slab, 2); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let idx3 = store_val(&slab, 3); + assert_eq!(get_val(&slab, idx3), Some(3)); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + assert_eq!(get_val(&s, idx2), Some(2)); + s.remove(idx2); + assert_eq!(get_val(&s, idx2), None); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + assert_eq!(get_val(&s, idx3), Some(3)); + s.remove(idx3); + assert_eq!(get_val(&s, idx3), None); + }); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), None); + assert_eq!(get_val(&slab, idx3), None); + }); +} + +#[test] +fn remove_remote_and_reuse() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + + let idx1 = store_val(&slab, 1); + let idx2 = store_val(&slab, 2); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + s.remove(idx1); + let value = get_val(&s, idx1); + + // We may or may not see the new value yet, depending on when + // this occurs, but we must either see the new value or `None`; + // the old value has been removed! + assert!(value == None || value == Some(3)); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + + assert_eq!(get_val(&slab, idx3), Some(3)); + assert_eq!(get_val(&slab, idx2), Some(2)); + }); +} + +#[test] +fn concurrent_alloc_remove() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let remover = thread::spawn(move || { + let (lock, cvar) = &*pair2; + for _ in 0..2 { + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.take().unwrap(); + slab2.remove(key); + assert_eq!(get_val(&slab2, key), None); + cvar.notify_one(); + } + }); + + let (lock, cvar) = &*pair; + for i in 0..2 { + let key = store_val(&slab, i); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + // Wait for the item to be removed. + while next.is_some() { + next = cvar.wait(next).unwrap(); + } + + assert_eq!(get_val(&slab, key), None); + } + + remover.join().unwrap(); + }) +} + +#[test] +fn concurrent_remove_remote_and_reuse() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + + let idx1 = store_val(&slab, 1); + let idx2 = store_val(&slab, 2); + + assert_eq!(get_val(&slab, idx1), Some(1)); + assert_eq!(get_val(&slab, idx2), Some(2)); + + let s = slab.clone(); + let s2 = slab.clone(); + let t1 = thread::spawn(move || { + s.remove(idx1); + }); + + let t2 = thread::spawn(move || { + s2.remove(idx2); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 1 should not panic"); + + assert!(get_val(&slab, idx1).is_none()); + assert!(get_val(&slab, idx2).is_none()); + assert_eq!(get_val(&slab, idx3), Some(3)); + }); +} + +#[test] +fn alloc_remove_get() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let t1 = thread::spawn(move || { + let slab = slab2; + let (lock, cvar) = &*pair2; + // allocate one entry just so that we have to use the final one for + // all future allocations. + let _key0 = store_val(&slab, 0); + let key = store_val(&slab, 1); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + // remove the second entry + slab.remove(key); + // store a new readiness at the same location (since the slab + // already has an entry in slot 0) + store_val(&slab, 2); + }); + + let (lock, cvar) = &*pair; + // wait for the second entry to be stored... + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.unwrap(); + + // our generation will be stale when the second store occurs at that + // index, we must not see the value of that store. + let val = get_val(&slab, key); + assert_ne!(val, Some(2), "generation must have advanced!"); + + t1.join().unwrap(); + }) +} + +#[test] +fn alloc_remove_set() { + loom::model(|| { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let t1 = thread::spawn(move || { + let slab = slab2; + let (lock, cvar) = &*pair2; + // allocate one entry just so that we have to use the final one for + // all future allocations. + let _key0 = store_val(&slab, 0); + let key = store_val(&slab, 1); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + slab.remove(key); + // remove the old entry and insert a new one, with a new generation. + let key2 = slab.alloc().expect("store key 2"); + // after the remove, we must not see the value written with the + // stale index. + assert_eq!( + get_val(&slab, key), + None, + "stale set must no longer be visible" + ); + assert_eq!(get_val(&slab, key2), Some(0)); + key2 + }); + + let (lock, cvar) = &*pair; + + // wait for the second entry to be stored. the index we get from the + // other thread may become stale after a write. + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.unwrap(); + + // try to write to the index with our generation + slab.get(key).map(|val| val.set_readiness(key, |_| 2)); + + let key2 = t1.join().unwrap(); + // after the remove, we must not see the value written with the + // stale index either. + assert_eq!( + get_val(&slab, key), + None, + "stale set must no longer be visible" + ); + assert_eq!(get_val(&slab, key2), Some(0)); + }); +} + +fn get_val(slab: &Arc>, address: Address) -> Option { + slab.get(address).and_then(|s| s.get_readiness(address)) +} + +fn store_val(slab: &Arc>, readiness: usize) -> Address { + let key = slab.alloc().expect("allocate slot"); + + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:?}", key); + } + + key +} + +fn store_when_free(slab: &Arc>, readiness: usize) -> Address { + let key = loop { + if let Some(key) = slab.alloc() { + break key; + } + + thread::yield_now(); + }; + + if let Some(slot) = slab.get(key) { + slot.set_readiness(key, |_| readiness) + .expect("generation should still be valid!"); + } else { + panic!("slab did not contain a value for {:?}", key); + } + + key +} diff --git a/tokio/src/util/slab/tests/loom_stack.rs b/tokio/src/util/slab/tests/loom_stack.rs new file mode 100644 index 00000000..d1121bee --- /dev/null +++ b/tokio/src/util/slab/tests/loom_stack.rs @@ -0,0 +1,88 @@ +use crate::util::slab::TransferStack; + +use loom::cell::CausalCell; +use loom::sync::Arc; +use loom::thread; + +#[test] +fn transfer_stack() { + loom::model(|| { + let causalities = [CausalCell::new(None), CausalCell::new(None)]; + let shared = Arc::new((causalities, TransferStack::new())); + let shared1 = shared.clone(); + let shared2 = shared.clone(); + + // Spawn two threads that both try to push to the stack. + let t1 = thread::spawn(move || { + let (causalities, stack) = &*shared1; + stack.push(0, |prev| { + causalities[0].with_mut(|c| unsafe { + *c = Some(prev); + }); + }); + }); + + let t2 = thread::spawn(move || { + let (causalities, stack) = &*shared2; + stack.push(1, |prev| { + causalities[1].with_mut(|c| unsafe { + *c = Some(prev); + }); + }); + }); + + let (causalities, stack) = &*shared; + + // Try to pop from the stack... + let mut idx = stack.pop_all(); + while idx == None { + idx = stack.pop_all(); + thread::yield_now(); + } + let idx = idx.unwrap(); + + let saw_both = causalities[idx].with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + // were there two entries in the stack? if so, check that + // both saw a write. + if let Some(c) = causalities.get(val.unwrap()) { + c.with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + }); + true + } else { + false + } + }); + + // We only saw one push. Ensure that the other push happens too. + if !saw_both { + // Try to pop from the stack... + let mut idx = stack.pop_all(); + while idx == None { + idx = stack.pop_all(); + thread::yield_now(); + } + let idx = idx.unwrap(); + + causalities[idx].with(|val| { + let val = unsafe { *val }; + assert!( + val.is_some(), + "CausalCell write must happen-before index is pushed to the stack!", + ); + }); + } + + t1.join().unwrap(); + t2.join().unwrap(); + }); +} diff --git a/tokio/src/util/slab/tests/mod.rs b/tokio/src/util/slab/tests/mod.rs new file mode 100644 index 00000000..7f793544 --- /dev/null +++ b/tokio/src/util/slab/tests/mod.rs @@ -0,0 +1,2 @@ +mod loom_slab; +mod loom_stack; -- cgit v1.2.3