summaryrefslogtreecommitdiffstats
path: root/tokio/src/util
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-20 00:05:14 -0800
committerGitHub <noreply@github.com>2019-11-20 00:05:14 -0800
commit69975fb9601bbb21659db283d888470733bae660 (patch)
tree8db7c9a31e4125646634af09e197ae6479e10cc4 /tokio/src/util
parent7c8b8877d440629ab9a27a2c9dcef859835d3536 (diff)
Refactor the I/O driver, extracting slab to `tokio::util`. (#1792)
The I/O driver is made private and moved to `tokio::io::driver`. `Registration` is moved to `tokio::io::Registration` and `PollEvented` is moved to `tokio::io::PollEvented`. Additionally, the concurrent slab used by the I/O driver is cleaned up and extracted to `tokio::util::slab`, allowing it to eventually be used by other types.
Diffstat (limited to 'tokio/src/util')
-rw-r--r--tokio/src/util/bit.rs86
-rw-r--r--tokio/src/util/mod.rs15
-rw-r--r--tokio/src/util/slab/addr.rs155
-rw-r--r--tokio/src/util/slab/entry.rs7
-rw-r--r--tokio/src/util/slab/generation.rs32
-rw-r--r--tokio/src/util/slab/mod.rs109
-rw-r--r--tokio/src/util/slab/page.rs187
-rw-r--r--tokio/src/util/slab/shard.rs108
-rw-r--r--tokio/src/util/slab/slot.rs42
-rw-r--r--tokio/src/util/slab/stack.rs58
-rw-r--r--tokio/src/util/slab/tests/loom_slab.rs327
-rw-r--r--tokio/src/util/slab/tests/loom_stack.rs88
-rw-r--r--tokio/src/util/slab/tests/mod.rs2
13 files changed, 1212 insertions, 4 deletions
diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs
new file mode 100644
index 00000000..03fb0cfb
--- /dev/null
+++ b/tokio/src/util/bit.rs
@@ -0,0 +1,86 @@
+use std::fmt;
+
+#[derive(Clone, Copy)]
+pub(crate) struct Pack {
+ mask: usize,
+ shift: u32,
+}
+
+impl Pack {
+ /// Value is packed in the `width` most-significant bits.
+ pub(crate) const fn most_significant(width: u32) -> Pack {
+ let mask = mask_for(width).reverse_bits();
+
+ Pack {
+ mask,
+ shift: mask.trailing_zeros(),
+ }
+ }
+
+ /// Value is packed in the `width` least-significant bits.
+ pub(crate) const fn least_significant(width: u32) -> Pack {
+ let mask = mask_for(width);
+
+ Pack {
+ mask,
+ shift: 0,
+ }
+ }
+
+ /// Value is packed in the `width` more-significant bits.
+ pub(crate) const fn then(&self, width: u32) -> Pack {
+ let shift = pointer_width() - self.mask.leading_zeros();
+ let mask = mask_for(width) << shift;
+
+ Pack {
+ mask,
+ shift,
+ }
+ }
+
+ /// Mask used to unpack value
+ pub(crate) const fn mask(&self) -> usize {
+ self.mask
+ }
+
+ /// Width, in bits, dedicated to storing the value.
+ pub(crate) const fn width(&self) -> u32 {
+ pointer_width() - (self.mask >> self.shift).leading_zeros()
+ }
+
+ /// Max representable value
+ pub(crate) const fn max_value(&self) -> usize {
+ (1 << self.width()) - 1
+ }
+
+ pub(crate) fn pack(&self, value: usize, base: usize) -> usize {
+ assert!(value <= self.max_value());
+ (base & !self.mask) | (value << self.shift)
+ }
+
+ pub(crate) fn unpack(&self, src: usize) -> usize {
+ unpack(src, self.mask, self.shift)
+ }
+}
+
+impl fmt::Debug for Pack {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "Pack {{ mask: {:b}, shift: {} }}", self.mask, self.shift)
+ }
+}
+
+/// Returns the width of a pointer in bits
+pub(crate) const fn pointer_width() -> u32 {
+ std::mem::size_of::<usize>() as u32 * 8
+}
+
+/// Returns a `usize` with the right-most `n` bits set.
+pub(crate) const fn mask_for(n: u32) -> usize {
+ let shift = 1usize.wrapping_shl(n - 1);
+ shift | (shift - 1)
+}
+
+/// Unpack a value using a mask & shift
+pub(crate) const fn unpack(src: usize, mask: usize, shift: u32) -> usize {
+ (src & mask) >> shift
+}
diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs
index 44377fcf..a50b33ac 100644
--- a/tokio/src/util/mod.rs
+++ b/tokio/src/util/mod.rs
@@ -1,5 +1,12 @@
-mod pad;
-pub(crate) use self::pad::CachePadded;
+cfg_io_driver! {
+ pub(crate) mod bit;
+ pub(crate) mod slab;
+}
-mod rand;
-pub(crate) use self::rand::FastRand;
+cfg_rt_threaded! {
+ mod pad;
+ pub(crate) use pad::CachePadded;
+
+ mod rand;
+ pub(crate) use rand::FastRand;
+}
diff --git a/tokio/src/util/slab/addr.rs b/tokio/src/util/slab/addr.rs
new file mode 100644
index 00000000..2efe93ef
--- /dev/null
+++ b/tokio/src/util/slab/addr.rs
@@ -0,0 +1,155 @@
+//! Tracks the location of an entry in a slab.
+//!
+//! # Index packing
+//!
+//! A slab index consists of multiple indices packed into a single `usize` value
+//! that correspond to different parts of the slab.
+//!
+//! The least significant `MAX_PAGES + INITIAL_PAGE_SIZE.trailing_zeros() + 1`
+//! bits store the address within a shard, starting at 0 for the first slot on
+//! the first page. To index a slot within a shard, we first find the index of
+//! the page that the address falls on, and then the offset of the slot within
+//! that page.
+//!
+//! Since every page is twice as large as the previous page, and all page sizes
+//! are powers of two, we can determine the page index that contains a given
+//! address by shifting the address down by the smallest page size and looking
+//! at how many twos places necessary to represent that number, telling us what
+//! power of two page size it fits inside of. We can determine the number of
+//! twos places by counting the number of leading zeros (unused twos places) in
+//! the number's binary representation, and subtracting that count from the
+//! total number of bits in a word.
+//!
+//! Once we know what page contains an address, we can subtract the size of all
+//! previous pages from the address to determine the offset within the page.
+//!
+//! After the page address, the next `MAX_THREADS.trailing_zeros() + 1` least
+//! significant bits are the thread ID. These are used to index the array of
+//! shards to find which shard a slot belongs to. If an entry is being removed
+//! and the thread ID of its index matches that of the current thread, we can
+//! use the `remove_local` fast path; otherwise, we have to use the synchronized
+//! `remove_remote` path.
+//!
+//! Finally, a generation value is packed into the index. The `RESERVED_BITS`
+//! most significant bits are left unused, and the remaining bits between the
+//! last bit of the thread ID and the first reserved bit are used to store the
+//! generation. The generation is used as part of an atomic read-modify-write
+//! loop every time a `ScheduledIo`'s readiness is modified, or when the
+//! resource is removed, to guard against the ABA problem.
+//!
+//! Visualized:
+//!
+//! ```text
+//! ┌──────────┬───────────────┬──────────────────┬──────────────────────────┐
+//! │ reserved │ generation │ thread ID │ address │
+//! └▲─────────┴▲──────────────┴▲─────────────────┴▲────────────────────────▲┘
+//! │ │ │ │ │
+//! bits(usize) │ bits(MAX_THREADS) │ 0
+//! │ │
+//! bits(usize) - RESERVED MAX_PAGES + bits(INITIAL_PAGE_SIZE)
+//! ```
+
+use crate::util::bit;
+use crate::util::slab::{Generation, MAX_PAGES, MAX_THREADS, INITIAL_PAGE_SIZE};
+
+use std::usize;
+
+/// References the location at which an entry is stored in a slab.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub(crate) struct Address(usize);
+
+const PAGE_INDEX_SHIFT: u32 = INITIAL_PAGE_SIZE.trailing_zeros() + 1;
+
+/// Address in the shard
+const SLOT: bit::Pack = bit::Pack::least_significant(
+ MAX_PAGES as u32 + PAGE_INDEX_SHIFT);
+
+/// Masks the thread identifier
+const THREAD: bit::Pack = SLOT.then(MAX_THREADS.trailing_zeros() + 1);
+
+/// Masks the generation
+const GENERATION: bit::Pack = THREAD.then(
+ bit::pointer_width().wrapping_sub(RESERVED.width() + THREAD.width() + SLOT.width()));
+
+// Chosen arbitrarily
+const RESERVED: bit::Pack = bit::Pack::most_significant(5);
+
+impl Address {
+ /// Represents no entry, picked to avoid collision with Mio's internals.
+ /// This value should not be passed to mio.
+ pub(crate) const NULL: usize = usize::MAX >> 1;
+
+ /// Re-exported by `Generation`.
+ pub(super) const GENERATION_WIDTH: u32 = GENERATION.width();
+
+ pub(super) fn new(shard_index: usize, generation: Generation) -> Address {
+ let mut repr = 0;
+
+ repr = SLOT.pack(shard_index, repr);
+ repr = GENERATION.pack(generation.to_usize(), repr);
+
+ Address(repr)
+ }
+
+ /// Convert from a `usize` representation.
+ pub(crate) fn from_usize(src: usize) -> Address {
+ assert_ne!(src, Self::NULL);
+
+ Address(src)
+ }
+
+ /// Convert to a `usize` representation
+ pub(crate) fn to_usize(self) -> usize {
+ self.0
+ }
+
+ pub(crate) fn generation(self) -> Generation {
+ Generation::new(GENERATION.unpack(self.0))
+ }
+
+ /// Returns the page index
+ pub(super) fn page(self) -> usize {
+ // Since every page is twice as large as the previous page, and all page
+ // sizes are powers of two, we can determine the page index that
+ // contains a given address by shifting the address down by the smallest
+ // page size and looking at how many twos places necessary to represent
+ // that number, telling us what power of two page size it fits inside
+ // of. We can determine the number of twos places by counting the number
+ // of leading zeros (unused twos places) in the number's binary
+ // representation, and subtracting that count from the total number of
+ // bits in a word.
+ let slot_shifted = (self.slot() + INITIAL_PAGE_SIZE) >> PAGE_INDEX_SHIFT;
+ (bit::pointer_width() - slot_shifted.leading_zeros()) as usize
+ }
+
+ /// Returns the slot index
+ pub(super) fn slot(self) -> usize {
+ SLOT.unpack(self.0)
+ }
+}
+
+#[cfg(test)]
+cfg_not_loom! {
+ use proptest::proptest;
+
+ #[test]
+ fn test_pack_format() {
+ assert_eq!(5, RESERVED.width());
+ assert_eq!(0b11111, RESERVED.max_value());
+ }
+
+ proptest! {
+ #[test]
+ fn address_roundtrips(
+ slot in 0usize..SLOT.max_value(),
+ generation in 0usize..Generation::MAX,
+ ) {
+ let address = Address::new(slot, Generation::new(generation));
+ // Round trip
+ let address = Address::from_usize(address.to_usize());
+
+ assert_eq!(address.slot(), slot);
+ assert_eq!(address.generation().to_usize(), generation);
+ }
+ }
+}
diff --git a/tokio/src/util/slab/entry.rs b/tokio/src/util/slab/entry.rs
new file mode 100644
index 00000000..2e0b10b0
--- /dev/null
+++ b/tokio/src/util/slab/entry.rs
@@ -0,0 +1,7 @@
+use crate::util::slab::Generation;
+
+pub(crate) trait Entry: Default {
+ fn generation(&self) -> Generation;
+
+ fn reset(&self, generation: Generation) -> bool;
+}
diff --git a/tokio/src/util/slab/generation.rs b/tokio/src/util/slab/generation.rs
new file mode 100644
index 00000000..4b16b2ca
--- /dev/null
+++ b/tokio/src/util/slab/generation.rs
@@ -0,0 +1,32 @@
+use crate::util::bit;
+use crate::util::slab::Address;
+
+/// An mutation identifier for a slot in the slab. The generation helps prevent
+/// accessing an entry with an outdated token.
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
+pub(crate) struct Generation(usize);
+
+impl Generation {
+ pub(crate) const WIDTH: u32 = Address::GENERATION_WIDTH;
+
+ pub(super) const MAX: usize = bit::mask_for(Address::GENERATION_WIDTH);
+
+ /// Create a new generation
+ ///
+ /// # Panics
+ ///
+ /// Panics if `value` is greater than max generation.
+ pub(crate) fn new(value: usize) -> Generation {
+ assert!(value <= Self::MAX);
+ Generation(value)
+ }
+
+ /// Returns the next generation value
+ pub(crate) fn next(self) -> Generation {
+ Generation((self.0 + 1) & Self::MAX)
+ }
+
+ pub(crate) fn to_usize(self) -> usize {
+ self.0
+ }
+}
diff --git a/tokio/src/util/slab/mod.rs b/tokio/src/util/slab/mod.rs
new file mode 100644
index 00000000..e9b53c53
--- /dev/null
+++ b/tokio/src/util/slab/mod.rs
@@ -0,0 +1,109 @@
+//! A lock-free concurrent slab.
+
+mod addr;
+pub(crate) use addr::Address;
+
+mod entry;
+pub(crate) use entry::Entry;
+
+mod generation;
+pub(crate) use generation::Generation;
+
+mod page;
+
+mod shard;
+use shard::Shard;
+
+mod slot;
+use slot::Slot;
+
+mod stack;
+use stack::TransferStack;
+
+#[cfg(all(loom, test))]
+mod tests;
+
+use crate::loom::sync::Mutex;
+use crate::util::bit;
+
+use std::fmt;
+
+#[cfg(target_pointer_width = "64")]
+const MAX_THREADS: usize = 4096;
+
+#[cfg(target_pointer_width = "32")]
+const MAX_THREADS: usize = 2048;
+
+/// Max number of pages per slab
+const MAX_PAGES: usize = bit::pointer_width() as usize / 4;
+
+cfg_not_loom! {
+ /// Size of first page
+ const INITIAL_PAGE_SIZE: usize = 32;
+}
+
+cfg_loom! {
+ const INITIAL_PAGE_SIZE: usize = 2;
+}
+
+/// A sharded slab.
+pub(crate) struct Slab<T> {
+ // Signal shard for now. Eventually there will be more.
+ shard: Shard<T>,
+ local: Mutex<()>,
+}
+
+unsafe impl<T: Send> Send for Slab<T> {}
+unsafe impl<T: Sync> Sync for Slab<T> {}
+
+impl<T: Entry> Slab<T> {
+ /// Returns a new slab with the default configuration parameters.
+ pub(crate) fn new() -> Slab<T> {
+ Slab {
+ shard: Shard::new(),
+ local: Mutex::new(()),
+ }
+ }
+
+ /// allocs a value into the slab, returning a key that can be used to
+ /// access it.
+ ///
+ /// If this function returns `None`, then the shard for the current thread
+ /// is full and no items can be added until some are removed, or the maximum
+ /// number of shards has been reached.
+ pub(crate) fn alloc(&self) -> Option<Address> {
+ // we must lock the slab to alloc an item.
+ let _local = self.local.lock().unwrap();
+ self.shard.alloc()
+ }
+
+ /// Removes the value associated with the given key from the slab.
+ pub(crate) fn remove(&self, idx: Address) {
+ // try to lock the slab so that we can use `remove_local`.
+ let lock = self.local.try_lock();
+
+ // if we were able to lock the slab, we are "local" and can use the fast
+ // path; otherwise, we will use `remove_remote`.
+ if lock.is_ok() {
+ self.shard.remove_local(idx)
+ } else {
+ self.shard.remove_remote(idx)
+ }
+ }
+
+ /// Return a reference to the value associated with the given key.
+ ///
+ /// If the slab does not contain a value for the given key, `None` is
+ /// returned instead.
+ pub(crate) fn get(&self, token: Address) -> Option<&T> {
+ self.shard.get(token)
+ }
+}
+
+impl<T> fmt::Debug for Slab<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Slab")
+ .field("shard", &self.shard)
+ .finish()
+ }
+}
diff --git a/tokio/src/util/slab/page.rs b/tokio/src/util/slab/page.rs
new file mode 100644
index 00000000..6b1c64c0
--- /dev/null
+++ b/tokio/src/util/slab/page.rs
@@ -0,0 +1,187 @@
+use crate::loom::cell::CausalCell;
+use crate::util::slab::{Address, Entry, Slot, TransferStack, INITIAL_PAGE_SIZE};
+
+use std::fmt;
+
+/// Data accessed only by the thread that owns the shard.
+pub(crate) struct Local {
+ head: CausalCell<usize>,
+}
+
+/// Data accessed by any thread.
+pub(crate) struct Shared<T> {
+ remote: TransferStack,
+ size: usize,
+ prev_sz: usize,
+ slab: CausalCell<Option<Box<[Slot<T>]>>>,
+}
+
+/// Returns the size of the page at index `n`
+pub(super) fn size(n: usize) -> usize {
+ INITIAL_PAGE_SIZE << n
+}
+
+impl Local {
+ pub(crate) fn new() -> Self {
+ Self {
+ head: CausalCell::new(0),
+ }
+ }
+
+ fn head(&self) -> usize {
+ self.head.with(|head| unsafe { *head })
+ }
+
+ fn set_head(&self, new_head: usize) {
+ self.head.with_mut(|head| unsafe {
+ *head = new_head;
+ })
+ }
+}
+
+impl<T: Entry> Shared<T> {
+ pub(crate) fn new(size: usize, prev_sz: usize) -> Shared<T> {
+ Self {
+ prev_sz,
+ size,
+ remote: TransferStack::new(),
+ slab: CausalCell::new(None),
+ }
+ }
+
+ /// Allocates storage for this page if it does not allready exist.
+ ///
+ /// This requires unique access to the page (e.g. it is called from the
+ /// thread that owns the page, or, in the case of `SingleShard`, while the
+ /// lock is held). In order to indicate this, a reference to the page's
+ /// `Local` data is taken by this function; the `Local` argument is not
+ /// actually used, but requiring it ensures that this is only called when
+ /// local access is held.
+ #[cold]
+ fn alloc_page(&self, _: &Local) {
+ debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() }));
+
+ let mut slab = Vec::with_capacity(self.size);
+ slab.extend((1..self.size).map(Slot::new));
+ slab.push(Slot::new(Address::NULL));
+
+ self.slab.with_mut(|s| {
+ // this mut access is safe — it only occurs to initially
+ // allocate the page, which only happens on this thread; if the
+ // page has not yet been allocated, other threads will not try
+ // to access it yet.
+ unsafe {
+ *s = Some(slab.into_boxed_slice());
+ }
+ });
+ }
+
+ pub(crate) fn alloc(&self, local: &Local) -> Option<Address> {
+ let head = local.head();
+
+ // are there any items on the local free list? (fast path)
+ let head = if head < self.size {
+ head
+ } else {
+ // if the local free list is empty, pop all the items on the remote
+ // free list onto the local free list.
+ self.remote.pop_all()?
+ };
+
+ // if the head is still null, both the local and remote free lists are
+ // empty --- we can't fit any more items on this page.
+ if head == Address::NULL {
+ return None;
+ }
+
+ // do we need to allocate storage for this page?
+ let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() });
+ if page_needs_alloc {
+ self.alloc_page(local);
+ }
+
+ let gen = self.slab.with(|slab| {
+ let slab = unsafe { &*(slab) }
+ .as_ref()
+ .expect("page must have been allocated to alloc!");
+
+ let slot = &slab[head];
+
+ local.set_head(slot.next());
+ slot.generation()
+ });
+
+ let index = head + self.prev_sz;
+
+ Some(Address::new(index, gen))
+ }
+
+ pub(crate) fn get(&self, addr: Address) -> Option<&T> {
+ let page_offset = addr.slot() - self.prev_sz;
+
+ self.slab
+ .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset))
+ .map(|slot| slot.get())
+ }
+
+ pub(crate) fn remove_local(&self, local: &Local, addr: Address) {
+ let offset = addr.slot() - self.prev_sz;
+
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+
+ if slot.reset(addr.generation()) {
+ slot.set_next(local.head());
+ local.set_head(offset);
+ }
+ })
+ }
+
+ pub(crate) fn remove_remote(&self, addr: Address) {
+ let offset = addr.slot() - self.prev_sz;
+
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+
+ if !slot.reset(addr.generation()) {
+ return;
+ }
+
+ self.remote.push(offset, |next| slot.set_next(next));
+ })
+ }
+}
+
+impl fmt::Debug for Local {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.head.with(|head| {
+ let head = unsafe { *head };
+ f.debug_struct("Local")
+ .field("head", &format_args!("{:#0x}", head))
+ .finish()
+ })
+ }
+}
+
+impl<T> fmt::Debug for Shared<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shared")
+ .field("remote", &self.remote)
+ .field("prev_sz", &self.prev_sz)
+ .field("size", &self.size)
+ // .field("slab", &self.slab)
+ .finish()
+ }
+}
diff --git a/tokio/src/util/slab/shard.rs b/tokio/src/util/slab/shard.rs
new file mode 100644
index 00000000..711e36d3
--- /dev/null
+++ b/tokio/src/util/slab/shard.rs
@@ -0,0 +1,108 @@
+use crate::util::slab::{Address, Entry, page, MAX_PAGES};
+
+use std::fmt;
+
+// ┌─────────────┐ ┌────────┐
+// │ page 1 │ │ │
+// ├─────────────┤ ┌───▶│ next──┼─┐
+// │ page 2 │ │ ├────────┤ │
+// │ │ │ │XXXXXXXX│ │
+// │ local_free──┼─┘ ├────────┤ │
+// │ global_free─┼─┐ │ │◀┘
+// ├─────────────┤ └───▶│ next──┼─┐
+// │ page 3 │ ├────────┤ │
+// └─────────────┘ │XXXXXXXX│ │
+// ... ├────────┤ │
+// ┌─────────────┐ │XXXXXXXX│ │
+// │ page n │ ├────────┤ │
+// └─────────────┘ │ │◀┘
+// │ next──┼───▶
+// ├────────┤
+// │XXXXXXXX│
+// └────────┘
+// ...
+pub(super) struct Shard<T> {
+ /// The local free list for each page.
+ ///
+ /// These are only ever accessed from this shard's thread, so they are
+ /// stored separately from the shared state for the page that can be
+ /// accessed concurrently, to minimize false sharing.
+ local: Box<[page::Local]>,
+ /// The shared state for each page in this shard.
+ ///
+ /// This consists of the page's metadata (size, previous size), remote free
+ /// list, and a pointer to the actual array backing that page.
+ shared: Box<[page::Shared<T>]>,
+}
+
+impl<T: Entry> Shard<T> {
+ pub(super) fn new() -> Shard<T> {
+ let mut total_sz = 0;
+ let shared = (0..MAX_PAGES)
+ .map(|page_num| {
+ let sz = page::size(page_num);
+ let prev_sz = total_sz;
+ total_sz += sz;
+ page::Shared::new(sz, prev_sz)
+ })
+ .collect();
+
+ let local = (0..MAX_PAGES).map(|_| page::Local::new()).collect();
+
+ Shard {
+ local,
+ shared,
+ }
+ }
+
+ pub(super) fn alloc(&self) -> Option<Address> {
+ // Can we fit the value into an existing page?
+ for (page_idx, page) in self.shared.iter().enumerate() {
+ let local = self.local(page_idx);
+
+ if let Some(page_offset) = page.alloc(local) {
+ return Some(page_offset);
+ }
+ }
+
+ None
+ }
+
+ pub(super) fn get(&self, addr: Address) -> Option<&T> {
+ let page_idx = addr.page();
+
+ if page_idx > self.shared.len() {
+ return None;
+ }
+
+ self.shared[page_idx].get(addr)
+ }
+
+ /// Remove an item on the shard's local thread.
+ pub(super) fn remove_local(&self, addr: Address) {
+ let page_idx = addr.page();
+
+ if let Some(page) = self.shared.get(page_idx) {
+ page.remove_local(self.local(page_idx), addr);
+ }
+ }
+
+ /// Remove an item, while on a different thread from the shard's local thread.
+ pub(super) fn remove_remote(&self, addr: Address) {
+ if let Some(page) = self.shared.get(addr.page()) {
+ page.remove_remote(addr);
+ }
+ }
+
+ fn local(&self, i: usize) -> &page::Local {
+ &self.local[i]
+ }
+}
+
+impl<T> fmt::Debug for Shard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shard")
+ .field("shared", &self.shared)
+ .finish()
+ }
+}
diff --git a/tokio/src/util/slab/slot.rs b/tokio/src/util/slab/slot.rs
new file mode 100644
index 00000000..59baacf5
--- /dev/null
+++ b/tokio/src/util/slab/slot.rs
@@ -0,0 +1,42 @@
+use crate::loom::cell::CausalCell;
+use crate::util::slab::{Generation, Entry};
+
+/// Stores an entry in the slab.
+pub(super) struct Slot<T> {
+ next: CausalCell<usize>,
+ entry: T,
+}
+
+impl<T: Entry> Slot<T> {
+ /// Initialize a new `Slot` linked to `next`.
+ ///
+ /// The entry is initialized to a default value.
+ pub(super) fn new(next: usize) -> Slot<T> {
+ Slot {
+ next: CausalCell::new(next),
+ entry: T::default(),
+ }
+ }
+
+ pub(super) fn get(&self) -> &T {
+ &self.entry
+ }
+
+ pub(super) fn generation(&self) -> Generation {
+ self.entry.generation()
+ }
+
+ pub(super) fn reset(&self, generation: Generation) -> bool {
+ self.entry.reset(generation)
+ }
+
+ pub(super) fn next(&self) -> usize {
+ self.next.with(|next| unsafe { *next })
+ }
+
+ pub(super) fn set_next(&self, next: usize) {
+ self.next.with_mut(|n| unsafe {
+ (*n) = next;
+ })
+ }
+}
diff --git a/tokio/src/util/slab/stack.rs b/tokio/src/util/slab/stack.rs
new file mode 100644
index 00000000..0ae0d710
--- /dev/null
+++ b/tokio/src/util/slab/stack.rs
@@ -0,0 +1,58 @@
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::util::slab::Address;
+
+use std::fmt;
+use std::sync::atomic::Ordering;
+use std::usize;
+
+pub(super) struct TransferStack {
+ head: AtomicUsize,
+}
+
+impl TransferStack {
+ pub(super) fn new() -> Self {
+ Self {
+ head: AtomicUsize::new(Address::NULL),
+ }
+ }
+
+ pub(super) fn pop_all(&self) -> Option<usize> {
+ let val = self.head.swap(Address::NULL, Ordering::Acquire);
+
+ if val == Address::NULL {
+ None
+ } else {
+ Some(val)
+ }
+ }
+
+ pub(super) fn push(&self, value: usize, before: impl Fn(usize)) {
+ let mut next = self.head.load(Ordering::Relaxed);
+
+ loop {
+ before(next);
+
+ match self
+ .head
+ .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire)
+ {
+ // lost the race!
+ Err(actual) => next = actual,
+ Ok(_) => return,
+ }
+ }
+ }
+}
+
+impl fmt::Debug for TransferStack {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ // Loom likes to dump all its internal state in `fmt::Debug` impls, so
+ // we override this to just print the current value in tests.
+ f.debug_struct("TransferStack")
+ .field(
+ "head",
+ &format_args!("{:#x}", self.head.load(Ordering::Relaxed)),
+ )
+ .finish()
+ }
+}
diff --git a/tokio/src/util/slab/tests/loom_slab.rs b/tokio/src/util/slab/tests/loom_slab.rs
new file mode 100644
index 00000000..8a96a736
--- /dev/null
+++ b/tokio/src/util/slab/tests/loom_slab.rs
@@ -0,0 +1,327 @@
+use crate::io::driver::ScheduledIo;
+use crate::util::slab::{Address, Slab};
+
+use loom::sync::{Arc, Condvar, Mutex};
+use loom::thread;
+
+#[test]
+fn local_remove() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let s = slab.clone();
+ let t1 = thread::spawn(move || {
+ let idx = store_val(&s, 1);
+ assert_eq!(get_val(&s, idx), Some(1));
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ let idx = store_val(&s, 2);
+ assert_eq!(get_val(&s, idx), Some(2));
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ });
+
+ let s = slab.clone();
+ let t2 = thread::spawn(move || {
+ let idx = store_val(&s, 3);
+ assert_eq!(get_val(&s, idx), Some(3));
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ let idx = store_val(&s, 4);
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ });
+
+ let s = slab;
+ let idx1 = store_val(&s, 5);
+ assert_eq!(get_val(&s, idx1), Some(5));
+ let idx2 = store_val(&s, 6);
+ assert_eq!(get_val(&s, idx2), Some(6));
+ s.remove(idx1);
+ assert_eq!(get_val(&s, idx1), None);
+ assert_eq!(get_val(&s, idx2), Some(6));
+ s.remove(idx2);
+ assert_eq!(get_val(&s, idx2), None);
+
+ t1.join().expect("thread 1 should not panic");
+ t2.join().expect("thread 2 should not panic");
+ });
+}
+
+#[test]
+fn remove_remote() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let idx1 = store_val(&slab, 1);
+ assert_eq!(get_val(&slab, idx1), Some(1));
+
+ let idx2 = store_val(&slab, 2);
+ assert_eq!(get_val(&slab, idx2), Some(2));
+
+ let idx3 = store_val(&slab, 3);
+ assert_eq!(get_val(&slab, idx3), Some(3));
+
+ let s = slab.clone();
+ let t1 = thread::spawn(move || {
+ assert_eq!(get_val(&s, idx2), Some(2));
+ s.remove(idx2);
+ assert_eq!(get_val(&s, idx2), None);
+ });
+
+ let s = slab.clone();
+ let t2 = thread::spawn(move || {
+ assert_eq!(get_val(&s, idx3), Some(3));
+ s.remove(idx3);
+ assert_eq!(get_val(&s, idx3), None);
+ });
+
+ t1.join().expect("thread 1 should not panic");
+ t2.join().expect("thread 2 should not panic");
+
+ assert_eq!(get_val(&slab, idx1), Some(1));
+ assert_eq!(get_val(&slab, idx2), None);
+ assert_eq!(get_val(&slab, idx3), None);
+ });
+}
+
+#[test]
+fn remove_remote_and_reuse() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let idx1 = store_val(&slab, 1);
+ let idx2 = store_val(&slab, 2);
+
+ assert_eq!(get_val(&slab, idx1), Some(1));
+ assert_eq!(get_val(&slab, idx2), Some(2));
+
+ let s = slab.clone();
+ let t1 = thread::spawn(move || {
+ s.remove(idx1);
+ let value = get_val(&s, idx1);
+
+ // We may or may not see the new value yet, depending on when
+ // this occurs, but we must either see the new value or `None`;
+ // the old value has been removed!
+ assert!(value == None || value == Some(3));
+ });
+
+ let idx3 = store_when_free(&slab, 3);
+ t1.join().expect("thread 1 should not panic");
+
+ assert_eq!(get_val(&slab, idx3), Some(3));
+ assert_eq!(get_val(&slab, idx2), Some(2));
+ });
<