diff options
author | Carl Lerche <me@carllerche.com> | 2020-08-11 22:28:43 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-11 22:28:43 -0700 |
commit | 8feebab7cdef2fbeb810d18509b1443b6b9f60b1 (patch) | |
tree | a76395dd802bae93fc7ba97e50ba59ed153a1991 /tokio/src | |
parent | 674985d9fb87c866d9219297fde35b2e32b40b0b (diff) |
io: rewrite slab to support compaction (#2757)
The I/O driver uses a slab to store per-resource state. Doing this
provides two benefits. First, allocating state is streamlined. Second,
resources may be safely indexed using a `usize` type. The `usize` is
used passed to the OS's selector when registering for receiving events.
The original slab implementation used a `Vec` backed by `RwLock`. This
primarily caused contention when reading state. This implementation also
only **grew** the slab capacity but never shrank. In #1625, the slab was
rewritten to use a lock-free strategy. The lock contention was removed
but this implementation was still grow-only.
This change adds the ability to release memory. Similar to the previous
implementation, it structures the slab to use a vector of pages. This
enables growing the slab without having to move any previous entries. It
also adds the ability to release pages. This is done by introducing a
lock when allocating/releasing slab entries. This does not impact
benchmarks, primarily due to the existing implementation not being
"done" and also having a lock around allocating and releasing.
A `Slab::compact()` function is added. Pages are iterated. When a page
is found with no slots in use, the page is freed. The `compact()`
function is called occasionally by the I/O driver.
Fixes #2505
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/io/driver/mod.rs | 211 | ||||
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 88 | ||||
-rw-r--r-- | tokio/src/io/registration.rs | 48 | ||||
-rw-r--r-- | tokio/src/loom/std/atomic_ptr.rs | 8 | ||||
-rw-r--r-- | tokio/src/util/bit.rs | 18 | ||||
-rw-r--r-- | tokio/src/util/slab.rs | 790 | ||||
-rw-r--r-- | tokio/src/util/slab/addr.rs | 154 | ||||
-rw-r--r-- | tokio/src/util/slab/entry.rs | 7 | ||||
-rw-r--r-- | tokio/src/util/slab/generation.rs | 32 | ||||
-rw-r--r-- | tokio/src/util/slab/mod.rs | 107 | ||||
-rw-r--r-- | tokio/src/util/slab/page.rs | 187 | ||||
-rw-r--r-- | tokio/src/util/slab/shard.rs | 105 | ||||
-rw-r--r-- | tokio/src/util/slab/slot.rs | 42 | ||||
-rw-r--r-- | tokio/src/util/slab/stack.rs | 58 | ||||
-rw-r--r-- | tokio/src/util/slab/tests/loom_slab.rs | 327 | ||||
-rw-r--r-- | tokio/src/util/slab/tests/loom_stack.rs | 88 | ||||
-rw-r--r-- | tokio/src/util/slab/tests/mod.rs | 2 |
17 files changed, 928 insertions, 1344 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index d8d17f88..d8c253ab 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -3,23 +3,30 @@ pub(crate) mod platform; mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests -use crate::loom::sync::atomic::AtomicUsize; use crate::park::{Park, Unpark}; use crate::runtime::context; -use crate::util::slab::{Address, Slab}; +use crate::util::bit; +use crate::util::slab::{self, Slab}; use mio::event::Evented; use std::fmt; use std::io; -use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Weak}; use std::task::Waker; use std::time::Duration; /// I/O driver, backed by Mio pub(crate) struct Driver { + /// Tracks the number of times `turn` is called. It is safe for this to wrap + /// as it is mostly used to determine when to call `compact()` + tick: u16, + /// Reuse the `mio::Events` value across calls to poll. - events: mio::Events, + events: Option<mio::Events>, + + /// Primary slab handle containing the state for each resource registered + /// with this driver. + resources: Slab<ScheduledIo>, /// State shared between the reactor and the handles. inner: Arc<Inner>, @@ -37,11 +44,8 @@ pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, - /// Dispatch slabs for I/O and futures events - pub(super) io_dispatch: Slab<ScheduledIo>, - - /// The number of sources in `io_dispatch`. - n_sources: AtomicUsize, + /// Allocates `ScheduledIo` handles when creating new resources. + pub(super) io_dispatch: slab::Allocator<ScheduledIo>, /// Used to wake up the reactor from a call to `turn` wakeup: mio::SetReadiness, @@ -53,7 +57,19 @@ pub(super) enum Direction { Write, } -const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL); +// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup +// token. +const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); + +const ADDRESS: bit::Pack = bit::Pack::least_significant(24); + +// Packs the generation value in the `readiness` field. +// +// The generation prevents a race condition where a slab slot is reused for a +// new socket while the I/O driver is about to apply a readiness event. The +// generaton value is checked when setting new readiness. If the generation do +// not match, then the readiness event is discarded. +const GENERATION: bit::Pack = ADDRESS.then(7); fn _assert_kinds() { fn _assert<T: Send + Sync>() {} @@ -69,6 +85,8 @@ impl Driver { pub(crate) fn new() -> io::Result<Driver> { let io = mio::Poll::new()?; let wakeup_pair = mio::Registration::new2(); + let slab = Slab::new(); + let allocator = slab.allocator(); io.register( &wakeup_pair.0, @@ -78,12 +96,13 @@ impl Driver { )?; Ok(Driver { - events: mio::Events::with_capacity(1024), + tick: 0, + events: Some(mio::Events::with_capacity(1024)), + resources: slab, _wakeup_registration: wakeup_pair.0, inner: Arc::new(Inner { io, - io_dispatch: Slab::new(), - n_sources: AtomicUsize::new(0), + io_dispatch: allocator, wakeup: wakeup_pair.1, }), }) @@ -102,16 +121,27 @@ impl Driver { } fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // How often to call `compact()` on the resource slab + const COMPACT_INTERVAL: u16 = 256; + + self.tick = self.tick.wrapping_add(1); + + if self.tick % COMPACT_INTERVAL == 0 { + self.resources.compact(); + } + + let mut events = self.events.take().expect("i/o driver event store missing"); + // Block waiting for an event to happen, peeling out how many events // happened. - match self.inner.io.poll(&mut self.events, max_wait) { + match self.inner.io.poll(&mut events, max_wait) { Ok(_) => {} Err(e) => return Err(e), } // Process all the events that came in, dispatching appropriately - for event in self.events.iter() { + for event in events.iter() { let token = event.token(); if token == TOKEN_WAKEUP { @@ -124,22 +154,24 @@ impl Driver { } } + self.events = Some(events); + Ok(()) } - fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { let mut rd = None; let mut wr = None; - let address = Address::from_usize(token.0); + let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - let io = match self.inner.io_dispatch.get(address) { + let io = match self.resources.get(addr) { Some(io) => io, None => return, }; if io - .set_readiness(address, |curr| curr | ready.as_usize()) + .set_readiness(Some(token.0), |curr| curr | ready.as_usize()) .is_err() { // token no longer valid! @@ -164,6 +196,18 @@ impl Driver { } } +impl Drop for Driver { + fn drop(&mut self) { + self.resources.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. + io.reader.wake(); + io.writer.wake(); + }) + } +} + impl Park for Driver { type Unpark = Handle; type Error = io::Error; @@ -246,24 +290,20 @@ impl Inner { &self, source: &dyn Evented, ready: mio::Ready, - ) -> io::Result<Address> { - let address = self.io_dispatch.alloc().ok_or_else(|| { + ) -> io::Result<slab::Ref<ScheduledIo>> { + let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( io::ErrorKind::Other, "reactor at max registered I/O resources", ) })?; - self.n_sources.fetch_add(1, SeqCst); + let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - self.io.register( - source, - mio::Token(address.to_usize()), - ready, - mio::PollOpt::edge(), - )?; + self.io + .register(source, mio::Token(token), ready, mio::PollOpt::edge())?; - Ok(address) + Ok(shared) } /// Deregisters an I/O resource from the reactor. @@ -271,21 +311,11 @@ impl Inner { self.io.deregister(source) } - pub(super) fn drop_source(&self, address: Address) { - self.io_dispatch.remove(address); - self.n_sources.fetch_sub(1, SeqCst); - } - /// Registers interest in the I/O resource associated with `token`. - pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) { - let sched = self - .io_dispatch - .get(token) - .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token)); - + pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) { let waker = match dir { - Direction::Read => &sched.reader, - Direction::Write => &sched.writer, + Direction::Read => &io.reader, + Direction::Write => &io.writer, }; waker.register(w); @@ -303,100 +333,3 @@ impl Direction { } } } - -#[cfg(all(test, loom))] -mod tests { - use super::*; - use loom::thread; - - // No-op `Evented` impl just so we can have something to pass to `add_source`. - struct NotEvented; - - impl Evented for NotEvented { - fn register( - &self, - _: &mio::Poll, - _: mio::Token, - _: mio::Ready, - _: mio::PollOpt, - ) -> io::Result<()> { - Ok(()) - } - - fn reregister( - &self, - _: &mio::Poll, - _: mio::Token, - _: mio::Ready, - _: mio::PollOpt, - ) -> io::Result<()> { - Ok(()) - } - - fn deregister(&self, _: &mio::Poll) -> io::Result<()> { - Ok(()) - } - } - - #[test] - fn tokens_unique_when_dropped() { - loom::model(|| { - let reactor = Driver::new().unwrap(); - let inner = reactor.inner; - let inner2 = inner.clone(); - - let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - let thread = thread::spawn(move || { - inner2.drop_source(token_1); - }); - - let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - thread.join().unwrap(); - - assert!(token_1 != token_2); - }) - } - - #[test] - fn tokens_unique_when_dropped_on_full_page() { - loom::model(|| { - let reactor = Driver::new().unwrap(); - let inner = reactor.inner; - let inner2 = inner.clone(); - // add sources to fill up the first page so that the dropped index - // may be reused. - for _ in 0..31 { - inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - } - - let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - let thread = thread::spawn(move || { - inner2.drop_source(token_1); - }); - - let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - thread.join().unwrap(); - - assert!(token_1 != token_2); - }) - } - - #[test] - fn tokens_unique_concurrent_add() { - loom::model(|| { - let reactor = Driver::new().unwrap(); - let inner = reactor.inner; - let inner2 = inner.clone(); - - let thread = thread::spawn(move || { - let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap(); - token_2 - }); - - let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - let token_2 = thread.join().unwrap(); - - assert!(token_1 != token_2); - }) - } -} diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 7f6446e3..566f7daf 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,47 +1,30 @@ use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; -use crate::util::bit; -use crate::util::slab::{Address, Entry, Generation}; +use crate::util::slab::Entry; -use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +/// Stored in the I/O driver resource slab. #[derive(Debug)] pub(crate) struct ScheduledIo { + /// Packs the resource's readiness with the resource's generation. readiness: AtomicUsize, + + /// Task waiting on read readiness pub(crate) reader: AtomicWaker, + + /// Task waiting on write readiness pub(crate) writer: AtomicWaker, } -const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH); - impl Entry for ScheduledIo { - fn generation(&self) -> Generation { - unpack_generation(self.readiness.load(SeqCst)) - } - - fn reset(&self, generation: Generation) -> bool { - let mut current = self.readiness.load(Acquire); - - loop { - if unpack_generation(current) != generation { - return false; - } - - let next = PACK.pack(generation.next().to_usize(), 0); - - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => break, - Err(actual) => current = actual, - } - } + fn reset(&self) { + let state = self.readiness.load(Acquire); - drop(self.reader.take_waker()); - drop(self.writer.take_waker()); + let generation = super::GENERATION.unpack(state); + let next = super::GENERATION.pack_lossy(generation + 1, 0); - true + self.readiness.store(next, Release); } } @@ -56,24 +39,8 @@ impl Default for ScheduledIo { } impl ScheduledIo { - #[cfg(all(test, loom))] - /// Returns the current readiness value of this `ScheduledIo`, if the - /// provided `token` is still a valid access. - /// - /// # Returns - /// - /// If the given token's generation no longer matches the `ScheduledIo`'s - /// generation, then the corresponding IO resource has been removed and - /// replaced with a new resource. In that case, this method returns `None`. - /// Otherwise, this returns the current readiness. - pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> { - let ready = self.readiness.load(Acquire); - - if unpack_generation(ready) != address.generation() { - return None; - } - - Some(ready & !PACK.mask()) + pub(crate) fn generation(&self) -> usize { + super::GENERATION.unpack(self.readiness.load(Acquire)) } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -92,32 +59,35 @@ impl ScheduledIo { /// Otherwise, this returns the previous readiness. pub(crate) fn set_readiness( &self, - address: Address, + token: Option<usize>, f: impl Fn(usize) -> usize, ) -> Result<usize, ()> { - let generation = address.generation(); - let mut current = self.readiness.load(Acquire); loop { - // Check that the generation for this access is still the current - // one. - if unpack_generation(current) != generation { - return Err(()); + let current_generation = super::GENERATION.unpack(current); + + if let Some(token) = token { + // Check that the generation for this access is still the + // current one. + if super::GENERATION.unpack(token) != current_generation { + return Err(()); + } } + // Mask out the generation bits so that the modifying function // doesn't see them. let current_readiness = current & mio::Ready::all().as_usize(); let new = f(current_readiness); debug_assert!( - new <= !PACK.max_value(), + new <= super::ADDRESS.max_value(), "new readiness value would overwrite generation bits!" ); match self.readiness.compare_exchange( current, - PACK.pack(generation.to_usize(), new), + super::GENERATION.pack(current_generation, new), AcqRel, Acquire, ) { @@ -135,7 +105,3 @@ impl Drop for ScheduledIo { self.reader.wake(); } } - -fn unpack_generation(src: usize) -> Generation { - Generation::new(PACK.unpack(src)) -} diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 77fe6dbc..63aaff56 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -1,5 +1,5 @@ -use crate::io::driver::{platform, Direction, Handle}; -use crate::util::slab::Address; +use crate::io::driver::{platform, Direction, Handle, ScheduledIo}; +use crate::util::slab; use mio::{self, Evented}; use std::io; @@ -39,11 +39,17 @@ cfg_io_driver! { /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] pub struct Registration { + /// Handle to the associated driver. handle: Handle, - address: Address, + + /// Reference to state stored by the driver. + shared: slab::Ref<ScheduledIo>, } } +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + // ===== impl Registration ===== impl Registration { @@ -104,7 +110,7 @@ impl Registration { T: Evented, { let handle = Handle::current(); - let address = if let Some(inner) = handle.inner() { + let shared = if let Some(inner) = handle.inner() { inner.add_source(io, ready)? } else { return Err(io::Error::new( @@ -113,7 +119,7 @@ impl Registration { )); }; - Ok(Registration { handle, address }) + Ok(Registration { handle, shared }) } /// Deregisters the I/O resource from the reactor it is associated with. @@ -272,14 +278,12 @@ impl Registration { // If the task should be notified about new events, ensure that it has // been registered if let Some(ref cx) = cx { - inner.register(self.address, direction, cx.waker().clone()) + inner.register(&self.shared, direction, cx.waker().clone()) } let mask = direction.mask(); let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize(); - let sched = inner.io_dispatch.get(self.address).unwrap(); - // This consumes the current readiness state **except** for HUP and // error. HUP and error are excluded because a) they are final states // and never transitition out and b) both the read AND the write @@ -296,9 +300,10 @@ impl Registration { // AND write. A specific case that `EPOLLERR` occurs is when the read // end of a pipe is closed. When this occurs, a peer blocked by // writing to the pipe should be notified. - let curr_ready = sched - .set_readiness(self.address, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); + let curr_ready = self + .shared + .set_readiness(None, |curr| curr & (!mask_no_hup)) + .unwrap_or_else(|_| unreachable!()); let mut ready = mask & mio::Ready::from_usize(curr_ready); @@ -306,14 +311,15 @@ impl Registration { if let Some(cx) = cx { // Update the task info match direction { - Direction::Read => sched.reader.register_by_ref(cx.waker()), - Direction::Write => sched.writer.register_by_ref(cx.waker()), + Direction::Read => self.shared.reader.register_by_ref(cx.waker()), + Direction::Write => self.shared.writer.register_by_ref(cx.waker()), } // Try again - let curr_ready = sched - .set_readiness(self.address, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); + let curr_ready = self + .shared + .set_readiness(None, |curr| curr & (!mask_no_hup)) + .unwrap(); ready = mask & mio::Ready::from_usize(curr_ready); } } @@ -326,15 +332,9 @@ impl Registration { } } -unsafe impl Send for Registration {} -unsafe impl Sync for Registration {} - impl Drop for Registration { fn drop(&mut self) { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return, - }; - inner.drop_source(self.address); + drop(self.shared.reader.take_waker()); + drop(self.shared.writer.take_waker()); } } diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs index f7fd56cc..236645f0 100644 --- a/tokio/src/loom/std/atomic_ptr.rs +++ b/tokio/src/loom/std/atomic_ptr.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; /// `AtomicPtr` providing an additional `load_unsync` function. pub(crate) struct AtomicPtr<T> { @@ -21,6 +21,12 @@ impl<T> Deref for AtomicPtr<T> { } } +impl<T> DerefMut for AtomicPtr<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + impl<T> fmt::Debug for AtomicPtr<T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { self.deref().fmt(fmt) diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index e61ac216..ee756044 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -7,16 +7,6 @@ pub(crate) struct Pack { } impl Pack { - /// Value is packed in the `width` most-significant bits. - pub(crate) const fn most_significant(width: u32) -> Pack { - let mask = mask_for(width).reverse_bits(); - - Pack { - mask, - shift: mask.trailing_zeros(), - } - } - /// Value is packed in the `width` least-significant bits. pub(crate) const fn least_significant(width: u32) -> Pack { let mask = mask_for(width); @@ -53,6 +43,14 @@ impl Pack { (base & !self.mask) | (value << self.shift) } + /// Packs the value with `base`, losing any bits of `value` that fit. + /// + /// If `value` is larger than the max value that can be represented by the + /// allotted width, the most significant bits are truncated. + pub(crate) fn pack_lossy(&self, value: usize, base: usize) -> usize { + self.pack(value & self.max_value(), base) + } + pub(crate) fn unpack(&self, src: usize) -> usize { unpack(src, self.mask, self.shift) } diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs new file mode 100644 index 00000000..cb7fd5e9 --- /dev/null +++ b/tokio/src/util/slab.rs @@ -0,0 +1,790 @@ +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; +use crate::loom::sync::{Arc, Mutex}; +use crate::util::bit; +use std::fmt; +use std::mem; +use std::ops; +use std::ptr; +use std::sync::atomic::Ordering::Relaxed; + +/// Amortized allocation for homogeneous data types. +/// +/// The slab pre-allocates chunks of memory to store values. It uses a similar +/// growing strategy as `Vec`. When new capacity is needed, the slab grows by +/// 2x. +/// +/// # Pages +/// +/// Unlike `Vec`, growing does not require moving existing elements. Instead of +/// being a continuous chunk of memory for all elements, `Slab` is an array of +/// arrays. The top-level array is an array of pages. Each page is 2x bigger +/// than the previous one. When the slab grows, a new page is allocated. +/// +/// Pages are lazily initialized. +/// +/// # Allocating +/// +/// When allocating an object, first previously used slots are reused. If no +/// previously used slot is available, a new slot is initialized in an existing +/// page. If all pages are full, then a new page is allocated. +/// +/// When an allocated object is released, it is pushed into it's page's free +/// list. Allocating scans all pages for a free slot. +/// +/// # Indexing +/// +/// The slab is able to index values using an address. Even when the indexed +/// object has been released, it is still safe to index. This is a key ability +/// for using the slab with the I/O driver. Addresses are registered with the +/// OS's selector and I/O resources can be released without synchronizing with +/// the OS. +/// +/// # Compaction +/// +/// `Slab::compact` will release pages that have been allocated but are no +/// longer used. This is done by scanning the pages and finding pages with no +/// allocated objects. These pages are then freed. +/// +/// # Synchronization +/// +/// The `Slab` structure is able to provide (mostly) unsynchronized reads to +/// values stored in the slab. Insertions and removals are synchronized. Reading +/// objects via `Ref` is fully unsynchronized. Indexing objects uses amortized +/// synchronization. +/// +pub(crate) struct Slab<T> { + /// Array of pages. Each page is synchronized. + pages: [Arc<Page<T>>; NUM_PAGES], + + /// Caches the array pointer & number of initialized slots. + cached: [CachedPage<T>; NUM_PAGES], +} + +/// Allocate values in the associated slab. +pub(crate) struct Allocator<T> { + /// Pages in the slab. The first page has a capacity of 16 elements. Each + /// following page has double the capacity of the previous page. + /// + /// Each returned `Ref` holds a reference count to this `Arc`. + pages: [Arc<Page<T>>; NUM_PAGES], +} + +/// References a slot in the slab. Indexing a slot using an `Address` is memory +/// safe even if the slot has been released or the page has been deallocated. +/// However, it is not guaranteed that the slot has not been reused and is now +/// represents a different value. +/// +/// The I/O driver uses a counter to track the slot's generation. Once accessing +/// the slot, the generations are compared. If they match, the value matches the +/// address. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) struct Address(usize); + +/// An entry in the slab. +pub(crate) trait Entry: Default { + /// Reset the entry's value and track the generation. + fn reset(&self); +} + +/// A reference to a value stored in the slab +pub(crate) struct Ref<T> { + value: *const Value<T>, +} + +/// Maximum number of pages a slab can contain. +const NUM_PAGES: usize = 19; + +/// Minimum number of slots a page can contain. +const PAGE_INITIAL_SIZE: usize = 32; +const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1; + +/// A page in the slab +struct Page<T> { + /// Slots + slots: Mutex<Slots<T>>, + + // Number of slots currently being used. This is not guaranteed to be up to + // date and should only be used as a hint. + used: AtomicUsize, + + // Set to `true` when the page has been allocated. + allocated: AtomicBool, + + // The number of slots the page can hold. + len: usize, + + // Length of all previous pages combined + prev_len: usize, +} + +struct CachedPage<T> { + /// Pointer to the page's slots. + slots: *const Slot<T>, + + /// Number of initialized slots. + init: usize, +} + +/// Page state +struct Slots<T> { + /// Slots + slots: Vec<Slot<T>>, + + head: usize, + + /// Number of slots currently in use. + used: usize, +} + +unsafe impl<T: Sync> Sync for Page<T> {} +unsafe impl<T: Sync> Send for Page<T> {} +unsafe impl<T: Sync> Sync for CachedPage<T> {} +unsafe impl<T: Sync> Send for CachedPage<T> {} + +/// A slot in the slab. Contains slot-specific metadata. +/// +/// `#[repr(C)]` guarantees that the struct starts w/ `value`. We use pointer +/// math to map a value pointer to an index in the page. +#[repr(C)] +struct Slot<T> { + /// Pointed to by `Ref`. + value: UnsafeCell<Value<T>>, + + /// Next entry in the free list. + next: u32, +} + +/// Value paired with a reference to the page +struct Value<T> { + /// Value stored in the value + value: T, + + /// Pointer to the page containing the slot. + /// + /// A raw pointer is used as this creates a ref cycle. + page: *const Page<T>, +} + +impl<T> Slab<T> { + /// Create a new, empty, slab + pub(crate) fn new() -> Slab<T> { + // Initializing arrays is a bit annoying. Instead of manually writing + // out an array and every single entry, `Default::default()` is used to + // initialize the array, then the array is iterated and each value is + // initialized. + let mut slab = Slab { + pages: Default::default(), + cached: Default::default(), + }; + + let mut len = PAGE_INITIAL_SIZE; + let mut prev_len: usize = 0; + + for page in &mut slab.pages { + let page = Arc::get_mut(page).unwrap(); + page.len = len; + page.prev_len = prev_len; + len *= 2; + prev_len += page.len; + + // Ensure we don't exceed the max address space. + debug_assert!( + page.len - 1 + page.prev_len < (1 << 24), + "max = {:b}", + page.len - 1 + page.prev_len + ); + } + + slab + } + + /// Returns a new `Allocator`. + /// + /// The `Allocator` supports concurrent allocation of objects. + pub(crate) fn allocator(&self) -> Allocator<T> { + Allocator { + pages: self.pages.clone(), + } + } + + /// Returns a reference to the value stored at the given address. + /// + /// `&mut self` is used as the call may update internal cached state. + pub(crate) fn get(&mut self, addr: Address) -> Option<&T> { + let page_idx = addr.page(); + let slot_idx = self.pages[page_idx].slot(addr); + + // If the address references a slot that was last seen as uninitialized, + // the `CachedPage` is updated. This requires acquiring the page lock + // and updating the slot pointer and initialized offset. + if self.cached[page_idx].init <= slot_idx { + self.cached[page_idx].refresh(&self.pages[page_idx]); + } + + // If the address **still** references an uninitialized slot, then the + // address is invalid and `None` is returned. + if self.cached[page_idx].init <= slot_idx { + return None; + } + + // Get a reference to the value. The lifetime of the returned reference + // is bound to `&self`. The only way to invalidate the underlying memory + // is to call `compact()`. The lifetimes prevent calling `compact()` + // while references to values are outstanding. + // + // The referenced data is never |