summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-08-11 22:28:43 -0700
committerGitHub <noreply@github.com>2020-08-11 22:28:43 -0700
commit8feebab7cdef2fbeb810d18509b1443b6b9f60b1 (patch)
treea76395dd802bae93fc7ba97e50ba59ed153a1991 /tokio/src
parent674985d9fb87c866d9219297fde35b2e32b40b0b (diff)
io: rewrite slab to support compaction (#2757)
The I/O driver uses a slab to store per-resource state. Doing this provides two benefits. First, allocating state is streamlined. Second, resources may be safely indexed using a `usize` type. The `usize` is used passed to the OS's selector when registering for receiving events. The original slab implementation used a `Vec` backed by `RwLock`. This primarily caused contention when reading state. This implementation also only **grew** the slab capacity but never shrank. In #1625, the slab was rewritten to use a lock-free strategy. The lock contention was removed but this implementation was still grow-only. This change adds the ability to release memory. Similar to the previous implementation, it structures the slab to use a vector of pages. This enables growing the slab without having to move any previous entries. It also adds the ability to release pages. This is done by introducing a lock when allocating/releasing slab entries. This does not impact benchmarks, primarily due to the existing implementation not being "done" and also having a lock around allocating and releasing. A `Slab::compact()` function is added. Pages are iterated. When a page is found with no slots in use, the page is freed. The `compact()` function is called occasionally by the I/O driver. Fixes #2505
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/io/driver/mod.rs211
-rw-r--r--tokio/src/io/driver/scheduled_io.rs88
-rw-r--r--tokio/src/io/registration.rs48
-rw-r--r--tokio/src/loom/std/atomic_ptr.rs8
-rw-r--r--tokio/src/util/bit.rs18
-rw-r--r--tokio/src/util/slab.rs790
-rw-r--r--tokio/src/util/slab/addr.rs154
-rw-r--r--tokio/src/util/slab/entry.rs7
-rw-r--r--tokio/src/util/slab/generation.rs32
-rw-r--r--tokio/src/util/slab/mod.rs107
-rw-r--r--tokio/src/util/slab/page.rs187
-rw-r--r--tokio/src/util/slab/shard.rs105
-rw-r--r--tokio/src/util/slab/slot.rs42
-rw-r--r--tokio/src/util/slab/stack.rs58
-rw-r--r--tokio/src/util/slab/tests/loom_slab.rs327
-rw-r--r--tokio/src/util/slab/tests/loom_stack.rs88
-rw-r--r--tokio/src/util/slab/tests/mod.rs2
17 files changed, 928 insertions, 1344 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index d8d17f88..d8c253ab 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -3,23 +3,30 @@ pub(crate) mod platform;
mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
-use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
use crate::runtime::context;
-use crate::util::slab::{Address, Slab};
+use crate::util::bit;
+use crate::util::slab::{self, Slab};
use mio::event::Evented;
use std::fmt;
use std::io;
-use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;
/// I/O driver, backed by Mio
pub(crate) struct Driver {
+ /// Tracks the number of times `turn` is called. It is safe for this to wrap
+ /// as it is mostly used to determine when to call `compact()`
+ tick: u16,
+
/// Reuse the `mio::Events` value across calls to poll.
- events: mio::Events,
+ events: Option<mio::Events>,
+
+ /// Primary slab handle containing the state for each resource registered
+ /// with this driver.
+ resources: Slab<ScheduledIo>,
/// State shared between the reactor and the handles.
inner: Arc<Inner>,
@@ -37,11 +44,8 @@ pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
- /// Dispatch slabs for I/O and futures events
- pub(super) io_dispatch: Slab<ScheduledIo>,
-
- /// The number of sources in `io_dispatch`.
- n_sources: AtomicUsize,
+ /// Allocates `ScheduledIo` handles when creating new resources.
+ pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
/// Used to wake up the reactor from a call to `turn`
wakeup: mio::SetReadiness,
@@ -53,7 +57,19 @@ pub(super) enum Direction {
Write,
}
-const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
+// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
+// token.
+const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
+
+const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
+
+// Packs the generation value in the `readiness` field.
+//
+// The generation prevents a race condition where a slab slot is reused for a
+// new socket while the I/O driver is about to apply a readiness event. The
+// generaton value is checked when setting new readiness. If the generation do
+// not match, then the readiness event is discarded.
+const GENERATION: bit::Pack = ADDRESS.then(7);
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
@@ -69,6 +85,8 @@ impl Driver {
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
+ let slab = Slab::new();
+ let allocator = slab.allocator();
io.register(
&wakeup_pair.0,
@@ -78,12 +96,13 @@ impl Driver {
)?;
Ok(Driver {
- events: mio::Events::with_capacity(1024),
+ tick: 0,
+ events: Some(mio::Events::with_capacity(1024)),
+ resources: slab,
_wakeup_registration: wakeup_pair.0,
inner: Arc::new(Inner {
io,
- io_dispatch: Slab::new(),
- n_sources: AtomicUsize::new(0),
+ io_dispatch: allocator,
wakeup: wakeup_pair.1,
}),
})
@@ -102,16 +121,27 @@ impl Driver {
}
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
+ // How often to call `compact()` on the resource slab
+ const COMPACT_INTERVAL: u16 = 256;
+
+ self.tick = self.tick.wrapping_add(1);
+
+ if self.tick % COMPACT_INTERVAL == 0 {
+ self.resources.compact();
+ }
+
+ let mut events = self.events.take().expect("i/o driver event store missing");
+
// Block waiting for an event to happen, peeling out how many events
// happened.
- match self.inner.io.poll(&mut self.events, max_wait) {
+ match self.inner.io.poll(&mut events, max_wait) {
Ok(_) => {}
Err(e) => return Err(e),
}
// Process all the events that came in, dispatching appropriately
- for event in self.events.iter() {
+ for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
@@ -124,22 +154,24 @@ impl Driver {
}
}
+ self.events = Some(events);
+
Ok(())
}
- fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
+ fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;
- let address = Address::from_usize(token.0);
+ let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
- let io = match self.inner.io_dispatch.get(address) {
+ let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};
if io
- .set_readiness(address, |curr| curr | ready.as_usize())
+ .set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
// token no longer valid!
@@ -164,6 +196,18 @@ impl Driver {
}
}
+impl Drop for Driver {
+ fn drop(&mut self) {
+ self.resources.for_each(|io| {
+ // If a task is waiting on the I/O resource, notify it. The task
+ // will then attempt to use the I/O resource and fail due to the
+ // driver being shutdown.
+ io.reader.wake();
+ io.writer.wake();
+ })
+ }
+}
+
impl Park for Driver {
type Unpark = Handle;
type Error = io::Error;
@@ -246,24 +290,20 @@ impl Inner {
&self,
source: &dyn Evented,
ready: mio::Ready,
- ) -> io::Result<Address> {
- let address = self.io_dispatch.alloc().ok_or_else(|| {
+ ) -> io::Result<slab::Ref<ScheduledIo>> {
+ let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;
- self.n_sources.fetch_add(1, SeqCst);
+ let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
- self.io.register(
- source,
- mio::Token(address.to_usize()),
- ready,
- mio::PollOpt::edge(),
- )?;
+ self.io
+ .register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
- Ok(address)
+ Ok(shared)
}
/// Deregisters an I/O resource from the reactor.
@@ -271,21 +311,11 @@ impl Inner {
self.io.deregister(source)
}
- pub(super) fn drop_source(&self, address: Address) {
- self.io_dispatch.remove(address);
- self.n_sources.fetch_sub(1, SeqCst);
- }
-
/// Registers interest in the I/O resource associated with `token`.
- pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
- let sched = self
- .io_dispatch
- .get(token)
- .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
-
+ pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
- Direction::Read => &sched.reader,
- Direction::Write => &sched.writer,
+ Direction::Read => &io.reader,
+ Direction::Write => &io.writer,
};
waker.register(w);
@@ -303,100 +333,3 @@ impl Direction {
}
}
}
-
-#[cfg(all(test, loom))]
-mod tests {
- use super::*;
- use loom::thread;
-
- // No-op `Evented` impl just so we can have something to pass to `add_source`.
- struct NotEvented;
-
- impl Evented for NotEvented {
- fn register(
- &self,
- _: &mio::Poll,
- _: mio::Token,
- _: mio::Ready,
- _: mio::PollOpt,
- ) -> io::Result<()> {
- Ok(())
- }
-
- fn reregister(
- &self,
- _: &mio::Poll,
- _: mio::Token,
- _: mio::Ready,
- _: mio::PollOpt,
- ) -> io::Result<()> {
- Ok(())
- }
-
- fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
- Ok(())
- }
- }
-
- #[test]
- fn tokens_unique_when_dropped() {
- loom::model(|| {
- let reactor = Driver::new().unwrap();
- let inner = reactor.inner;
- let inner2 = inner.clone();
-
- let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
- let thread = thread::spawn(move || {
- inner2.drop_source(token_1);
- });
-
- let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
- thread.join().unwrap();
-
- assert!(token_1 != token_2);
- })
- }
-
- #[test]
- fn tokens_unique_when_dropped_on_full_page() {
- loom::model(|| {
- let reactor = Driver::new().unwrap();
- let inner = reactor.inner;
- let inner2 = inner.clone();
- // add sources to fill up the first page so that the dropped index
- // may be reused.
- for _ in 0..31 {
- inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
- }
-
- let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
- let thread = thread::spawn(move || {
- inner2.drop_source(token_1);
- });
-
- let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
- thread.join().unwrap();
-
- assert!(token_1 != token_2);
- })
- }
-
- #[test]
- fn tokens_unique_concurrent_add() {
- loom::model(|| {
- let reactor = Driver::new().unwrap();
- let inner = reactor.inner;
- let inner2 = inner.clone();
-
- let thread = thread::spawn(move || {
- let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
- token_2
- });
-
- let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
- let token_2 = thread.join().unwrap();
-
- assert!(token_1 != token_2);
- })
- }
-}
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 7f6446e3..566f7daf 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -1,47 +1,30 @@
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
-use crate::util::bit;
-use crate::util::slab::{Address, Entry, Generation};
+use crate::util::slab::Entry;
-use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+/// Stored in the I/O driver resource slab.
#[derive(Debug)]
pub(crate) struct ScheduledIo {
+ /// Packs the resource's readiness with the resource's generation.
readiness: AtomicUsize,
+
+ /// Task waiting on read readiness
pub(crate) reader: AtomicWaker,
+
+ /// Task waiting on write readiness
pub(crate) writer: AtomicWaker,
}
-const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH);
-
impl Entry for ScheduledIo {
- fn generation(&self) -> Generation {
- unpack_generation(self.readiness.load(SeqCst))
- }
-
- fn reset(&self, generation: Generation) -> bool {
- let mut current = self.readiness.load(Acquire);
-
- loop {
- if unpack_generation(current) != generation {
- return false;
- }
-
- let next = PACK.pack(generation.next().to_usize(), 0);
-
- match self
- .readiness
- .compare_exchange(current, next, AcqRel, Acquire)
- {
- Ok(_) => break,
- Err(actual) => current = actual,
- }
- }
+ fn reset(&self) {
+ let state = self.readiness.load(Acquire);
- drop(self.reader.take_waker());
- drop(self.writer.take_waker());
+ let generation = super::GENERATION.unpack(state);
+ let next = super::GENERATION.pack_lossy(generation + 1, 0);
- true
+ self.readiness.store(next, Release);
}
}
@@ -56,24 +39,8 @@ impl Default for ScheduledIo {
}
impl ScheduledIo {
- #[cfg(all(test, loom))]
- /// Returns the current readiness value of this `ScheduledIo`, if the
- /// provided `token` is still a valid access.
- ///
- /// # Returns
- ///
- /// If the given token's generation no longer matches the `ScheduledIo`'s
- /// generation, then the corresponding IO resource has been removed and
- /// replaced with a new resource. In that case, this method returns `None`.
- /// Otherwise, this returns the current readiness.
- pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> {
- let ready = self.readiness.load(Acquire);
-
- if unpack_generation(ready) != address.generation() {
- return None;
- }
-
- Some(ready & !PACK.mask())
+ pub(crate) fn generation(&self) -> usize {
+ super::GENERATION.unpack(self.readiness.load(Acquire))
}
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
@@ -92,32 +59,35 @@ impl ScheduledIo {
/// Otherwise, this returns the previous readiness.
pub(crate) fn set_readiness(
&self,
- address: Address,
+ token: Option<usize>,
f: impl Fn(usize) -> usize,
) -> Result<usize, ()> {
- let generation = address.generation();
-
let mut current = self.readiness.load(Acquire);
loop {
- // Check that the generation for this access is still the current
- // one.
- if unpack_generation(current) != generation {
- return Err(());
+ let current_generation = super::GENERATION.unpack(current);
+
+ if let Some(token) = token {
+ // Check that the generation for this access is still the
+ // current one.
+ if super::GENERATION.unpack(token) != current_generation {
+ return Err(());
+ }
}
+
// Mask out the generation bits so that the modifying function
// doesn't see them.
let current_readiness = current & mio::Ready::all().as_usize();
let new = f(current_readiness);
debug_assert!(
- new <= !PACK.max_value(),
+ new <= super::ADDRESS.max_value(),
"new readiness value would overwrite generation bits!"
);
match self.readiness.compare_exchange(
current,
- PACK.pack(generation.to_usize(), new),
+ super::GENERATION.pack(current_generation, new),
AcqRel,
Acquire,
) {
@@ -135,7 +105,3 @@ impl Drop for ScheduledIo {
self.reader.wake();
}
}
-
-fn unpack_generation(src: usize) -> Generation {
- Generation::new(PACK.unpack(src))
-}
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index 77fe6dbc..63aaff56 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -1,5 +1,5 @@
-use crate::io::driver::{platform, Direction, Handle};
-use crate::util::slab::Address;
+use crate::io::driver::{platform, Direction, Handle, ScheduledIo};
+use crate::util::slab;
use mio::{self, Evented};
use std::io;
@@ -39,11 +39,17 @@ cfg_io_driver! {
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
pub struct Registration {
+ /// Handle to the associated driver.
handle: Handle,
- address: Address,
+
+ /// Reference to state stored by the driver.
+ shared: slab::Ref<ScheduledIo>,
}
}
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+
// ===== impl Registration =====
impl Registration {
@@ -104,7 +110,7 @@ impl Registration {
T: Evented,
{
let handle = Handle::current();
- let address = if let Some(inner) = handle.inner() {
+ let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {
return Err(io::Error::new(
@@ -113,7 +119,7 @@ impl Registration {
));
};
- Ok(Registration { handle, address })
+ Ok(Registration { handle, shared })
}
/// Deregisters the I/O resource from the reactor it is associated with.
@@ -272,14 +278,12 @@ impl Registration {
// If the task should be notified about new events, ensure that it has
// been registered
if let Some(ref cx) = cx {
- inner.register(self.address, direction, cx.waker().clone())
+ inner.register(&self.shared, direction, cx.waker().clone())
}
let mask = direction.mask();
let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize();
- let sched = inner.io_dispatch.get(self.address).unwrap();
-
// This consumes the current readiness state **except** for HUP and
// error. HUP and error are excluded because a) they are final states
// and never transitition out and b) both the read AND the write
@@ -296,9 +300,10 @@ impl Registration {
// AND write. A specific case that `EPOLLERR` occurs is when the read
// end of a pipe is closed. When this occurs, a peer blocked by
// writing to the pipe should be notified.
- let curr_ready = sched
- .set_readiness(self.address, |curr| curr & (!mask_no_hup))
- .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
+ let curr_ready = self
+ .shared
+ .set_readiness(None, |curr| curr & (!mask_no_hup))
+ .unwrap_or_else(|_| unreachable!());
let mut ready = mask & mio::Ready::from_usize(curr_ready);
@@ -306,14 +311,15 @@ impl Registration {
if let Some(cx) = cx {
// Update the task info
match direction {
- Direction::Read => sched.reader.register_by_ref(cx.waker()),
- Direction::Write => sched.writer.register_by_ref(cx.waker()),
+ Direction::Read => self.shared.reader.register_by_ref(cx.waker()),
+ Direction::Write => self.shared.writer.register_by_ref(cx.waker()),
}
// Try again
- let curr_ready = sched
- .set_readiness(self.address, |curr| curr & (!mask_no_hup))
- .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
+ let curr_ready = self
+ .shared
+ .set_readiness(None, |curr| curr & (!mask_no_hup))
+ .unwrap();
ready = mask & mio::Ready::from_usize(curr_ready);
}
}
@@ -326,15 +332,9 @@ impl Registration {
}
}
-unsafe impl Send for Registration {}
-unsafe impl Sync for Registration {}
-
impl Drop for Registration {
fn drop(&mut self) {
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return,
- };
- inner.drop_source(self.address);
+ drop(self.shared.reader.take_waker());
+ drop(self.shared.writer.take_waker());
}
}
diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs
index f7fd56cc..236645f0 100644
--- a/tokio/src/loom/std/atomic_ptr.rs
+++ b/tokio/src/loom/std/atomic_ptr.rs
@@ -1,5 +1,5 @@
use std::fmt;
-use std::ops::Deref;
+use std::ops::{Deref, DerefMut};
/// `AtomicPtr` providing an additional `load_unsync` function.
pub(crate) struct AtomicPtr<T> {
@@ -21,6 +21,12 @@ impl<T> Deref for AtomicPtr<T> {
}
}
+impl<T> DerefMut for AtomicPtr<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.inner
+ }
+}
+
impl<T> fmt::Debug for AtomicPtr<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.deref().fmt(fmt)
diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs
index e61ac216..ee756044 100644
--- a/tokio/src/util/bit.rs
+++ b/tokio/src/util/bit.rs
@@ -7,16 +7,6 @@ pub(crate) struct Pack {
}
impl Pack {
- /// Value is packed in the `width` most-significant bits.
- pub(crate) const fn most_significant(width: u32) -> Pack {
- let mask = mask_for(width).reverse_bits();
-
- Pack {
- mask,
- shift: mask.trailing_zeros(),
- }
- }
-
/// Value is packed in the `width` least-significant bits.
pub(crate) const fn least_significant(width: u32) -> Pack {
let mask = mask_for(width);
@@ -53,6 +43,14 @@ impl Pack {
(base & !self.mask) | (value << self.shift)
}
+ /// Packs the value with `base`, losing any bits of `value` that fit.
+ ///
+ /// If `value` is larger than the max value that can be represented by the
+ /// allotted width, the most significant bits are truncated.
+ pub(crate) fn pack_lossy(&self, value: usize, base: usize) -> usize {
+ self.pack(value & self.max_value(), base)
+ }
+
pub(crate) fn unpack(&self, src: usize) -> usize {
unpack(src, self.mask, self.shift)
}
diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs
new file mode 100644
index 00000000..cb7fd5e9
--- /dev/null
+++ b/tokio/src/util/slab.rs
@@ -0,0 +1,790 @@
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
+use crate::loom::sync::{Arc, Mutex};
+use crate::util::bit;
+use std::fmt;
+use std::mem;
+use std::ops;
+use std::ptr;
+use std::sync::atomic::Ordering::Relaxed;
+
+/// Amortized allocation for homogeneous data types.
+///
+/// The slab pre-allocates chunks of memory to store values. It uses a similar
+/// growing strategy as `Vec`. When new capacity is needed, the slab grows by
+/// 2x.
+///
+/// # Pages
+///
+/// Unlike `Vec`, growing does not require moving existing elements. Instead of
+/// being a continuous chunk of memory for all elements, `Slab` is an array of
+/// arrays. The top-level array is an array of pages. Each page is 2x bigger
+/// than the previous one. When the slab grows, a new page is allocated.
+///
+/// Pages are lazily initialized.
+///
+/// # Allocating
+///
+/// When allocating an object, first previously used slots are reused. If no
+/// previously used slot is available, a new slot is initialized in an existing
+/// page. If all pages are full, then a new page is allocated.
+///
+/// When an allocated object is released, it is pushed into it's page's free
+/// list. Allocating scans all pages for a free slot.
+///
+/// # Indexing
+///
+/// The slab is able to index values using an address. Even when the indexed
+/// object has been released, it is still safe to index. This is a key ability
+/// for using the slab with the I/O driver. Addresses are registered with the
+/// OS's selector and I/O resources can be released without synchronizing with
+/// the OS.
+///
+/// # Compaction
+///
+/// `Slab::compact` will release pages that have been allocated but are no
+/// longer used. This is done by scanning the pages and finding pages with no
+/// allocated objects. These pages are then freed.
+///
+/// # Synchronization
+///
+/// The `Slab` structure is able to provide (mostly) unsynchronized reads to
+/// values stored in the slab. Insertions and removals are synchronized. Reading
+/// objects via `Ref` is fully unsynchronized. Indexing objects uses amortized
+/// synchronization.
+///
+pub(crate) struct Slab<T> {
+ /// Array of pages. Each page is synchronized.
+ pages: [Arc<Page<T>>; NUM_PAGES],
+
+ /// Caches the array pointer & number of initialized slots.
+ cached: [CachedPage<T>; NUM_PAGES],
+}
+
+/// Allocate values in the associated slab.
+pub(crate) struct Allocator<T> {
+ /// Pages in the slab. The first page has a capacity of 16 elements. Each
+ /// following page has double the capacity of the previous page.
+ ///
+ /// Each returned `Ref` holds a reference count to this `Arc`.
+ pages: [Arc<Page<T>>; NUM_PAGES],
+}
+
+/// References a slot in the slab. Indexing a slot using an `Address` is memory
+/// safe even if the slot has been released or the page has been deallocated.
+/// However, it is not guaranteed that the slot has not been reused and is now
+/// represents a different value.
+///
+/// The I/O driver uses a counter to track the slot's generation. Once accessing
+/// the slot, the generations are compared. If they match, the value matches the
+/// address.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub(crate) struct Address(usize);
+
+/// An entry in the slab.
+pub(crate) trait Entry: Default {
+ /// Reset the entry's value and track the generation.
+ fn reset(&self);
+}
+
+/// A reference to a value stored in the slab
+pub(crate) struct Ref<T> {
+ value: *const Value<T>,
+}
+
+/// Maximum number of pages a slab can contain.
+const NUM_PAGES: usize = 19;
+
+/// Minimum number of slots a page can contain.
+const PAGE_INITIAL_SIZE: usize = 32;
+const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1;
+
+/// A page in the slab
+struct Page<T> {
+ /// Slots
+ slots: Mutex<Slots<T>>,
+
+ // Number of slots currently being used. This is not guaranteed to be up to
+ // date and should only be used as a hint.
+ used: AtomicUsize,
+
+ // Set to `true` when the page has been allocated.
+ allocated: AtomicBool,
+
+ // The number of slots the page can hold.
+ len: usize,
+
+ // Length of all previous pages combined
+ prev_len: usize,
+}
+
+struct CachedPage<T> {
+ /// Pointer to the page's slots.
+ slots: *const Slot<T>,
+
+ /// Number of initialized slots.
+ init: usize,
+}
+
+/// Page state
+struct Slots<T> {
+ /// Slots
+ slots: Vec<Slot<T>>,
+
+ head: usize,
+
+ /// Number of slots currently in use.
+ used: usize,
+}
+
+unsafe impl<T: Sync> Sync for Page<T> {}
+unsafe impl<T: Sync> Send for Page<T> {}
+unsafe impl<T: Sync> Sync for CachedPage<T> {}
+unsafe impl<T: Sync> Send for CachedPage<T> {}
+
+/// A slot in the slab. Contains slot-specific metadata.
+///
+/// `#[repr(C)]` guarantees that the struct starts w/ `value`. We use pointer
+/// math to map a value pointer to an index in the page.
+#[repr(C)]
+struct Slot<T> {
+ /// Pointed to by `Ref`.
+ value: UnsafeCell<Value<T>>,
+
+ /// Next entry in the free list.
+ next: u32,
+}
+
+/// Value paired with a reference to the page
+struct Value<T> {
+ /// Value stored in the value
+ value: T,
+
+ /// Pointer to the page containing the slot.
+ ///
+ /// A raw pointer is used as this creates a ref cycle.
+ page: *const Page<T>,
+}
+
+impl<T> Slab<T> {
+ /// Create a new, empty, slab
+ pub(crate) fn new() -> Slab<T> {
+ // Initializing arrays is a bit annoying. Instead of manually writing
+ // out an array and every single entry, `Default::default()` is used to
+ // initialize the array, then the array is iterated and each value is
+ // initialized.
+ let mut slab = Slab {
+ pages: Default::default(),
+ cached: Default::default(),
+ };
+
+ let mut len = PAGE_INITIAL_SIZE;
+ let mut prev_len: usize = 0;
+
+ for page in &mut slab.pages {
+ let page = Arc::get_mut(page).unwrap();
+ page.len = len;
+ page.prev_len = prev_len;
+ len *= 2;
+ prev_len += page.len;
+
+ // Ensure we don't exceed the max address space.
+ debug_assert!(
+ page.len - 1 + page.prev_len < (1 << 24),
+ "max = {:b}",
+ page.len - 1 + page.prev_len
+ );
+ }
+
+ slab
+ }
+
+ /// Returns a new `Allocator`.
+ ///
+ /// The `Allocator` supports concurrent allocation of objects.
+ pub(crate) fn allocator(&self) -> Allocator<T> {
+ Allocator {
+ pages: self.pages.clone(),
+ }
+ }
+
+ /// Returns a reference to the value stored at the given address.
+ ///
+ /// `&mut self` is used as the call may update internal cached state.
+ pub(crate) fn get(&mut self, addr: Address) -> Option<&T> {
+ let page_idx = addr.page();
+ let slot_idx = self.pages[page_idx].slot(addr);
+
+ // If the address references a slot that was last seen as uninitialized,
+ // the `CachedPage` is updated. This requires acquiring the page lock
+ // and updating the slot pointer and initialized offset.
+ if self.cached[page_idx].init <= slot_idx {
+ self.cached[page_idx].refresh(&self.pages[page_idx]);
+ }
+
+ // If the address **still** references an uninitialized slot, then the
+ // address is invalid and `None` is returned.
+ if self.cached[page_idx].init <= slot_idx {
+ return None;
+ }
+
+ // Get a reference to the value. The lifetime of the returned reference
+ // is bound to `&self`. The only way to invalidate the underlying memory
+ // is to call `compact()`. The lifetimes prevent calling `compact()`
+ // while references to values are outstanding.
+ //
+ // The referenced data is never