summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2019-10-28 11:30:45 -0700
committerGitHub <noreply@github.com>2019-10-28 11:30:45 -0700
commit7eb264a0d0ee68433b20ecafabed53a70a9d43a4 (patch)
treebd909d05af537e75b788b34272870ee972b1fd0d /tokio
parent11952635840298d307dc10b26e6bdc78188c84ef (diff)
net: replace RwLock<Slab> with a lock free slab (#1625)
## Motivation The `tokio_net::driver` module currently stores the state associated with scheduled IO resources in a `Slab` implementation from the `slab` crate. Because inserting items into and removing items from `slab::Slab` requires mutable access, the slab must be placed within a `RwLock`. This has the potential to be a performance bottleneck especially in the context of the work-stealing scheduler where tasks and the reactor are often located on the same thread. `tokio-net` currently reimplements the `ShardedRwLock` type from `crossbeam` on top of `parking_lot`'s `RwLock` in an attempt to squeeze as much performance as possible out of the read-write lock around the slab. This introduces several dependencies that are not used elsewhere. ## Solution This branch replaces the `RwLock<Slab>` with a lock-free sharded slab implementation. The sharded slab is based on the concept of _free list sharding_ described by Leijen, Zorn, and de Moura in [_Mimalloc: Free List Sharding in Action_][mimalloc], which describes the implementation of a concurrent memory allocator. In this approach, the slab is sharded so that each thread has its own thread-local list of slab _pages_. Objects are always inserted into the local slab of the thread where the insertion is performed. Therefore, the insert operation needs not be synchronized. However, since objects can be _removed_ from the slab by threads other than the one on which they were inserted, removal operations can still occur concurrently. Therefore, Leijen et al. introduce a concept of _local_ and _global_ free lists. When an object is removed on the same thread it was originally inserted on, it is placed on the local free list; if it is removed on another thread, it goes on the global free list for the heap of the thread from which it originated. To find a free slot to insert into, the local free list is used first; if it is empty, the entire global free list is popped onto the local free list. Since the local free list is only ever accessed by the thread it belongs to, it does not require synchronization at all, and because the global free list is popped from infrequently, the cost of synchronization has a reduced impact. A majority of insertions can occur without any synchronization at all; and removals only require synchronization when an object has left its parent thread. The sharded slab was initially implemented in a separate crate (soon to be released), vendored in-tree to decrease `tokio-net`'s dependencies. Some code from the original implementation was removed or simplified, since it is only necessary to support `tokio-net`'s use case, rather than to provide a fully generic implementation. [mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf ## Performance These graphs were produced by out-of-tree `criterion` benchmarks of the sharded slab implementation. The first shows the results of a benchmark where an increasing number of items are inserted and then removed into a slab concurrently by five threads. It compares the performance of the sharded slab implementation with a `RwLock<slab::Slab>`: <img width="1124" alt="Screen Shot 2019-10-01 at 5 09 49 PM" src="https://user-images.githubusercontent.com/2796466/66078398-cd6c9f80-e516-11e9-9923-0ed6292e8498.png"> The second graph shows the results of a benchmark where an increasing number of items are inserted and then removed by a _single_ thread. It compares the performance of the sharded slab implementation with an `RwLock<slab::Slab>` and a `mut slab::Slab`. <img width="925" alt="Screen Shot 2019-10-01 at 5 13 45 PM" src="https://user-images.githubusercontent.com/2796466/66078469-f0974f00-e516-11e9-95b5-f65f0aa7e494.png"> Note that while the `mut slab::Slab` (i.e. no read-write lock) is (unsurprisingly) faster than the sharded slab in the single-threaded benchmark, the sharded slab outperforms the un-contended `RwLock<slab::Slab>`. This case, where the lock is uncontended and only accessed from a single thread, represents the best case for the current use of `slab` in `tokio-net`, since the lock cannot be conditionally removed in the single-threaded case. These benchmarks demonstrate that, while the sharded approach introduces a small constant-factor overhead, it offers significantly better performance across concurrent accesses. ## Notes This branch removes the following dependencies `tokio-net`: - `parking_lot` - `num_cpus` - `crossbeam_util` - `slab` This branch adds the following dev-dependencies: - `proptest` - `loom` Note that these dev dependencies were used to implement tests for the sharded-slab crate out-of-tree, and were necessary in order to vendor the existing tests. Alternatively, since the implementation is tested externally, we _could_ remove these tests in order to avoid picking up dev-dependencies. However, this means that we should try to ensure that `tokio-net`'s vendored implementation doesn't diverge significantly from upstream's, since it would be missing a majority of its tests. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio')
-rw-r--r--tokio/Cargo.toml10
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/loom.rs45
-rw-r--r--tokio/src/net/driver/mod.rs8
-rw-r--r--tokio/src/net/driver/reactor/dispatch/iter.rs53
-rw-r--r--tokio/src/net/driver/reactor/dispatch/mod.rs36
-rw-r--r--tokio/src/net/driver/reactor/dispatch/pack.rs89
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/mod.rs257
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs171
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/stack.rs149
-rw-r--r--tokio/src/net/driver/reactor/dispatch/sharded_slab.rs274
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/mod.rs204
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/single_shard.rs181
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/small_slab.rs473
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/mod.rs30
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tid.rs168
-rw-r--r--tokio/src/net/driver/reactor/mod.rs (renamed from tokio/src/net/driver/reactor.rs)238
-rw-r--r--tokio/src/net/driver/registration.rs16
-rw-r--r--tokio/src/net/driver/sharded_rwlock.rs217
-rw-r--r--tokio/src/process/mod.rs2
-rw-r--r--tokio/src/process/unix/orphan.rs2
-rw-r--r--tokio/src/process/unix/reap.rs2
-rw-r--r--tokio/src/signal/registry.rs2
-rw-r--r--tokio/src/signal/unix.rs2
-rw-r--r--tokio/src/signal/windows.rs2
-rw-r--r--tokio/src/timer/wheel/level.rs2
-rw-r--r--tokio/src/timer/wheel/mod.rs2
27 files changed, 2325 insertions, 314 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 6bbbf2c6..aa60ae22 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -41,7 +41,7 @@ io-util = ["io-traits", "pin-project", "memchr"]
io = ["io-traits", "io-util"]
macros = ["tokio-macros"]
net-full = ["tcp", "udp", "uds"]
-net-driver = ["mio", "tokio-executor/blocking"]
+net-driver = ["mio", "tokio-executor/blocking", "lazy_static"]
rt-current-thread = [
"timer",
"tokio-executor/current-thread",
@@ -113,6 +113,10 @@ version = "0.3.8"
default-features = false
optional = true
+[target.'cfg(loom)'.dependencies]
+# play nice with loom tests in other crates.
+loom = "0.2.11"
+
[dev-dependencies]
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }
tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
@@ -130,5 +134,9 @@ serde_json = "1.0"
tempfile = "3.1.0"
time = "0.1"
+# sharded slab tests
+loom = "0.2.11"
+proptest = "0.9.4"
+
[package.metadata.docs.rs]
all-features = true
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 249bf47a..f5e88b4a 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -69,7 +69,6 @@
//! }
//! }
//! ```
-
macro_rules! if_runtime {
($($i:item)*) => ($(
#[cfg(any(
@@ -97,6 +96,9 @@ pub mod io;
#[cfg(feature = "net-driver")]
pub mod net;
+#[cfg(feature = "net-driver")]
+mod loom;
+
pub mod prelude;
#[cfg(feature = "process")]
diff --git a/tokio/src/loom.rs b/tokio/src/loom.rs
new file mode 100644
index 00000000..57ce2df6
--- /dev/null
+++ b/tokio/src/loom.rs
@@ -0,0 +1,45 @@
+//! This module abstracts over `loom` and `std::sync` depending on whether we
+//! are running tests or not.
+pub(crate) use self::inner::*;
+
+#[cfg(all(test, loom))]
+mod inner {
+ pub(crate) use loom::sync::CausalCell;
+ pub(crate) use loom::sync::Mutex;
+ pub(crate) mod atomic {
+ pub(crate) use loom::sync::atomic::*;
+ pub(crate) use std::sync::atomic::Ordering;
+ }
+}
+
+#[cfg(not(all(test, loom)))]
+mod inner {
+ use std::cell::UnsafeCell;
+ pub(crate) use std::sync::atomic;
+ pub(crate) use std::sync::Mutex;
+
+ #[derive(Debug)]
+ pub(crate) struct CausalCell<T>(UnsafeCell<T>);
+
+ impl<T> CausalCell<T> {
+ pub(crate) fn new(data: T) -> CausalCell<T> {
+ CausalCell(UnsafeCell::new(data))
+ }
+
+ #[inline(always)]
+ pub(crate) fn with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*const T) -> R,
+ {
+ f(self.0.get())
+ }
+
+ #[inline(always)]
+ pub(crate) fn with_mut<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*mut T) -> R,
+ {
+ f(self.0.get())
+ }
+ }
+}
diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs
index 9079ccc7..b7f33d02 100644
--- a/tokio/src/net/driver/mod.rs
+++ b/tokio/src/net/driver/mod.rs
@@ -124,7 +124,15 @@
//! [`PollEvented`]: struct.PollEvented.html
//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
+#[cfg(loom)]
+macro_rules! loom_thread_local {
+ ($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
+}
+#[cfg(not(loom))]
+macro_rules! loom_thread_local {
+ ($($tts:tt)+) => { std::thread_local!{ $($tts)+ } }
+}
pub(crate) mod platform;
mod reactor;
mod registration;
diff --git a/tokio/src/net/driver/reactor/dispatch/iter.rs b/tokio/src/net/driver/reactor/dispatch/iter.rs
new file mode 100644
index 00000000..f785f5d4
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/iter.rs
@@ -0,0 +1,53 @@
+use super::{
+ page::{self, ScheduledIo},
+ Shard,
+};
+use std::slice;
+
+pub(in crate::net::driver::reactor) struct UniqueIter<'a> {
+ pub(super) shards: slice::IterMut<'a, Shard>,
+ pub(super) pages: slice::Iter<'a, page::Shared>,
+ pub(super) slots: Option<page::Iter<'a>>,
+}
+
+impl<'a> Iterator for UniqueIter<'a> {
+ type Item = &'a ScheduledIo;
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) {
+ return Some(item);
+ }
+
+ if let Some(page) = self.pages.next() {
+ self.slots = page.iter();
+ }
+
+ if let Some(shard) = self.shards.next() {
+ self.pages = shard.iter();
+ } else {
+ return None;
+ }
+ }
+ }
+}
+
+pub(in crate::net::driver::reactor) struct ShardIter<'a> {
+ pub(super) pages: slice::IterMut<'a, page::Shared>,
+ pub(super) slots: Option<page::Iter<'a>>,
+}
+
+impl<'a> Iterator for ShardIter<'a> {
+ type Item = &'a ScheduledIo;
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) {
+ return Some(item);
+ }
+ if let Some(page) = self.pages.next() {
+ self.slots = page.iter();
+ } else {
+ return None;
+ }
+ }
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/mod.rs b/tokio/src/net/driver/reactor/dispatch/mod.rs
new file mode 100644
index 00000000..d7262a35
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/mod.rs
@@ -0,0 +1,36 @@
+//! A lock-free concurrent slab.
+
+#[cfg(all(test, loom))]
+macro_rules! test_println {
+ ($($arg:tt)*) => {
+ println!("{:?} {}", crate::net::driver::reactor::dispatch::Tid::current(), format_args!($($arg)*))
+ }
+}
+
+mod iter;
+mod pack;
+mod page;
+mod sharded_slab;
+mod tid;
+
+#[cfg(all(test, loom))]
+// this is used by sub-modules
+use self::tests::test_util;
+use pack::{Pack, WIDTH};
+use sharded_slab::Shard;
+#[cfg(all(test, loom))]
+pub(crate) use sharded_slab::Slab;
+pub(crate) use sharded_slab::{SingleShard, MAX_SOURCES};
+use tid::Tid;
+
+#[cfg(target_pointer_width = "64")]
+const MAX_THREADS: usize = 4096;
+#[cfg(target_pointer_width = "32")]
+const MAX_THREADS: usize = 2048;
+const INITIAL_PAGE_SIZE: usize = 32;
+const MAX_PAGES: usize = WIDTH / 4;
+// Chosen arbitrarily.
+const RESERVED_BITS: usize = 5;
+
+#[cfg(test)]
+mod tests;
diff --git a/tokio/src/net/driver/reactor/dispatch/pack.rs b/tokio/src/net/driver/reactor/dispatch/pack.rs
new file mode 100644
index 00000000..0be44a48
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/pack.rs
@@ -0,0 +1,89 @@
+pub(super) const WIDTH: usize = std::mem::size_of::<usize>() * 8;
+
+/// Trait encapsulating the calculations required for bit-packing slab indices.
+///
+/// This allows us to avoid manually repeating some calculations when packing
+/// and unpacking indices.
+pub(crate) trait Pack: Sized {
+ // ====== provided by each implementation =================================
+
+ /// The number of bits occupied by this type when packed into a usize.
+ ///
+ /// This must be provided to determine the number of bits into which to pack
+ /// the type.
+ const LEN: usize;
+ /// The type packed on the less significant side of this type.
+ ///
+ /// If this type is packed into the least significant bit of a usize, this
+ /// should be `()`, which occupies no bytes.
+ ///
+ /// This is used to calculate the shift amount for packing this value.
+ type Prev: Pack;
+
+ // ====== calculated automatically ========================================
+
+ /// A number consisting of `Self::LEN` 1 bits, starting at the least
+ /// significant bit.
+ ///
+ /// This is the higest value this type can represent. This number is shifted
+ /// left by `Self::SHIFT` bits to calculate this type's `MASK`.
+ ///
+ /// This is computed automatically based on `Self::LEN`.
+ const BITS: usize = {
+ let shift = 1 << (Self::LEN - 1);
+ shift | (shift - 1)
+ };
+ /// The number of bits to shift a number to pack it into a usize with other
+ /// values.
+ ///
+ /// This is caculated automatically based on the `LEN` and `SHIFT` constants
+ /// of the previous value.
+ const SHIFT: usize = Self::Prev::SHIFT + Self::Prev::LEN;
+
+ /// The mask to extract only this type from a packed `usize`.
+ ///
+ /// This is calculated by shifting `Self::BITS` left by `Self::SHIFT`.
+ const MASK: usize = Self::BITS << Self::SHIFT;
+
+ fn as_usize(&self) -> usize;
+ fn from_usize(val: usize) -> Self;
+
+ #[inline(always)]
+ fn pack(&self, to: usize) -> usize {
+ let value = self.as_usize();
+ debug_assert!(value <= Self::BITS);
+
+ (to & !Self::MASK) | (value << Self::SHIFT)
+ }
+
+ #[inline(always)]
+ fn from_packed(from: usize) -> Self {
+ let value = (from & Self::MASK) >> Self::SHIFT;
+ debug_assert!(value <= Self::BITS);
+ Self::from_usize(value)
+ }
+}
+
+impl Pack for () {
+ const BITS: usize = 0;
+ const LEN: usize = 0;
+ const SHIFT: usize = 0;
+ const MASK: usize = 0;
+
+ type Prev = ();
+
+ fn as_usize(&self) -> usize {
+ unreachable!()
+ }
+ fn from_usize(_val: usize) -> Self {
+ unreachable!()
+ }
+
+ fn pack(&self, _to: usize) -> usize {
+ unreachable!()
+ }
+
+ fn from_packed(_from: usize) -> Self {
+ unreachable!()
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/page/mod.rs b/tokio/src/net/driver/reactor/dispatch/page/mod.rs
new file mode 100644
index 00000000..0b8d3c4c
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/page/mod.rs
@@ -0,0 +1,257 @@
+use super::{Pack, INITIAL_PAGE_SIZE, WIDTH};
+use crate::loom::CausalCell;
+
+pub(crate) mod scheduled_io;
+mod stack;
+pub(crate) use self::scheduled_io::ScheduledIo;
+use self::stack::TransferStack;
+use std::fmt;
+
+/// A page address encodes the location of a slot within a shard (the page
+/// number and offset within that page) as a single linear value.
+#[repr(transparent)]
+#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
+pub(crate) struct Addr {
+ addr: usize,
+}
+
+impl Addr {
+ const NULL: usize = Self::BITS + 1;
+ const INDEX_SHIFT: usize = INITIAL_PAGE_SIZE.trailing_zeros() as usize + 1;
+
+ pub(crate) fn index(self) -> usize {
+ // Since every page is twice as large as the previous page, and all page sizes
+ // are powers of two, we can determine the page index that contains a given
+ // address by shifting the address down by the smallest page size and
+ // looking at how many twos places necessary to represent that number,
+ // telling us what power of two page size it fits inside of. We can
+ // determine the number of twos places by counting the number of leading
+ // zeros (unused twos places) in the number's binary representation, and
+ // subtracting that count from the total number of bits in a word.
+ WIDTH - ((self.addr + INITIAL_PAGE_SIZE) >> Self::INDEX_SHIFT).leading_zeros() as usize
+ }
+
+ pub(crate) fn offset(self) -> usize {
+ self.addr
+ }
+}
+
+pub(super) fn size(n: usize) -> usize {
+ INITIAL_PAGE_SIZE * 2usize.pow(n as _)
+}
+
+impl Pack for Addr {
+ const LEN: usize = super::MAX_PAGES + Self::INDEX_SHIFT;
+
+ type Prev = ();
+
+ fn as_usize(&self) -> usize {
+ self.addr
+ }
+
+ fn from_usize(addr: usize) -> Self {
+ debug_assert!(addr <= Self::BITS);
+ Self { addr }
+ }
+}
+
+pub(in crate::net::driver) type Iter<'a> = std::slice::Iter<'a, ScheduledIo>;
+
+pub(crate) struct Local {
+ head: CausalCell<usize>,
+}
+
+pub(crate) struct Shared {
+ remote: TransferStack,
+ size: usize,
+ prev_sz: usize,
+ slab: CausalCell<Option<Box<[ScheduledIo]>>>,
+}
+
+impl Local {
+ pub(crate) fn new() -> Self {
+ Self {
+ head: CausalCell::new(0),
+ }
+ }
+
+ #[inline(always)]
+ fn head(&self) -> usize {
+ self.head.with(|head| unsafe { *head })
+ }
+
+ #[inline(always)]
+ fn set_head(&self, new_head: usize) {
+ self.head.with_mut(|head| unsafe {
+ *head = new_head;
+ })
+ }
+}
+
+impl Shared {
+ const NULL: usize = Addr::NULL;
+
+ pub(crate) fn new(size: usize, prev_sz: usize) -> Self {
+ Self {
+ prev_sz,
+ size,
+ remote: TransferStack::new(),
+ slab: CausalCell::new(None),
+ }
+ }
+
+ /// Allocates storage for this page if it does not allready exist.
+ ///
+ /// This requires unique access to the page (e.g. it is called from the
+ /// thread that owns the page, or, in the case of `SingleShard`, while the
+ /// lock is held). In order to indicate this, a reference to the page's
+ /// `Local` data is taken by this function; the `Local` argument is not
+ /// actually used, but requiring it ensures that this is only called when
+ /// local access is held.
+ #[cold]
+ fn alloc_page(&self, _: &Local) {
+ debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() }));
+
+ let mut slab = Vec::with_capacity(self.size);
+ slab.extend((1..self.size).map(ScheduledIo::new));
+ slab.push(ScheduledIo::new(Self::NULL));
+ self.slab.with_mut(|s| {
+ // this mut access is safe — it only occurs to initially
+ // allocate the page, which only happens on this thread; if the
+ // page has not yet been allocated, other threads will not try
+ // to access it yet.
+ unsafe {
+ *s = Some(slab.into_boxed_slice());
+ }
+ });
+ }
+
+ #[inline]
+ pub(crate) fn alloc(&self, local: &Local) -> Option<usize> {
+ let head = local.head();
+
+ // are there any items on the local free list? (fast path)
+ let head = if head < self.size {
+ head
+ } else {
+ // if the local free list is empty, pop all the items on the remote
+ // free list onto the local free list.
+ self.remote.pop_all()?
+ };
+
+ // if the head is still null, both the local and remote free lists are
+ // empty --- we can't fit any more items on this page.
+ if head == Self::NULL {
+ return None;
+ }
+
+ // do we need to allocate storage for this page?
+ let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() });
+ if page_needs_alloc {
+ self.alloc_page(local);
+ }
+
+ let gen = self.slab.with(|slab| {
+ let slab = unsafe { &*(slab) }
+ .as_ref()
+ .expect("page must have been allocated to alloc!");
+ let slot = &slab[head];
+ local.set_head(slot.next());
+ slot.alloc()
+ });
+
+ let index = head + self.prev_sz;
+ Some(gen.pack(index))
+ }
+
+ #[inline]
+ pub(in crate::net::driver) fn get(&self, addr: Addr) -> Option<&ScheduledIo> {
+ let page_offset = addr.offset() - self.prev_sz;
+ self.slab
+ .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset))
+ }
+
+ pub(crate) fn remove_local(&self, local: &Local, addr: Addr, idx: usize) {
+ let offset = addr.offset() - self.prev_sz;
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+ if slot.reset(scheduled_io::Generation::from_packed(idx)) {
+ slot.set_next(local.head());
+ local.set_head(offset);
+ }
+ })
+ }
+
+ pub(crate) fn remove_remote(&self, addr: Addr, idx: usize) {
+ let offset = addr.offset() - self.prev_sz;
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+ if !slot.reset(scheduled_io::Generation::from_packed(idx)) {
+ return;
+ }
+ self.remote.push(offset, |next| slot.set_next(next));
+ })
+ }
+
+ pub(in crate::net::driver) fn iter(&self) -> Option<Iter<'_>> {
+ let slab = self.slab.with(|slab| unsafe { (&*slab).as_ref() });
+ slab.map(|slab| slab.iter())
+ }
+}
+
+impl fmt::Debug for Local {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.head.with(|head| {
+ let head = unsafe { *head };
+ f.debug_struct("Local")
+ .field("head", &format_args!("{:#0x}", head))
+ .finish()
+ })
+ }
+}
+
+impl fmt::Debug for Shared {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shared")
+ .field("remote", &self.remote)
+ .field("prev_sz", &self.prev_sz)
+ .field("size", &self.size)
+ // .field("slab", &self.slab)
+ .finish()
+ }
+}
+
+impl fmt::Debug for Addr {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Addr")
+ .field("addr", &format_args!("{:#0x}", &self.addr))
+ .field("index", &self.index())
+ .field("offset", &self.offset())
+ .finish()
+ }
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+ use proptest::prelude::*;
+
+ proptest! {
+ #[test]
+ fn addr_roundtrips(pidx in 0usize..Addr::BITS) {
+ let addr = Addr::from_usize(pidx);
+ let packed = addr.pack(0);
+ assert_eq!(addr, Addr::from_packed(packed));
+ }
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
new file mode 100644
index 00000000..34a07ea8
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
@@ -0,0 +1,171 @@
+use super::super::{Pack, Tid, RESERVED_BITS, WIDTH};
+use crate::loom::{
+ atomic::{AtomicUsize, Ordering},
+ CausalCell,
+};
+
+use tokio_sync::AtomicWaker;
+
+#[derive(Debug)]
+pub(crate) struct ScheduledIo {
+ /// The offset of the next item on the free list.
+ next: CausalCell<usize>,
+ readiness: AtomicUsize,
+ pub(in crate::net::driver) reader: AtomicWaker,
+ pub(in crate::net::driver) writer: AtomicWaker,
+}
+
+#[repr(transparent)]
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
+pub(crate) struct Generation {
+ value: usize,
+}
+
+impl Pack for Generation {
+ /// Use all the remaining bits in the word for the generation counter, minus
+ /// any bits reserved by the user.
+ const LEN: usize = (WIDTH - RESERVED_BITS) - Self::SHIFT;
+
+ type Prev = Tid;
+
+ #[inline(always)]
+ fn from_usize(u: usize) -> Self {
+ debug_assert!(u <= Self::BITS);
+ Self::new(u)
+ }
+
+ #[inline(always)]
+ fn as_usize(&self) -> usize {
+ self.value
+ }
+}
+
+impl Generation {
+ const ONE: usize = 1 << Self::SHIFT;
+
+ fn new(value: usize) -> Self {
+ Self { value }
+ }
+
+ fn next(self) -> Self {
+ Self::from_usize((self.value + 1) % Self::BITS)
+ }
+}
+
+impl ScheduledIo {
+ pub(super) fn new(next: usize) -> Self {
+ Self {
+ next: CausalCell::new(next),
+ readiness: AtomicUsize::new(0),
+ reader: AtomicWaker::new(),
+ writer: AtomicWaker::new(),
+ }
+ }
+
+ #[inline]
+ pub(super) fn alloc(&self) -> Generation {
+ Generation::from_packed(self.readiness.load(Ordering::SeqCst))
+ }
+
+ #[inline(always)]
+ pub(super) fn next(&self) -> usize {
+ self.next.with(|next| unsafe { *next })
+ }
+
+ #[inline]
+ pub(super) fn reset(&self, gen: Generation) -> bool {
+ let mut current = self.readiness.load(Ordering::Acquire);
+ loop {
+ if Generation::from_packed(current) != gen {
+ return false;
+ }
+ let next_gen = gen.next().pack(0);
+ match self.readiness.compare_exchange(
+ current,
+ next_gen,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(actual) => current = actual,
+ }
+ }
+ drop(self.reader.take_waker());
+ drop(self.writer.take_waker());
+ true
+ }
+
+ #[inline(always)]
+ pub(super) fn set_next(&self, next: usize) {
+ self.next.with_mut(|n| unsafe {
+ (*n) = next;
+ })
+ }
+
+ /// Returns the current readiness value of this `ScheduledIo`, if the
+ /// provided `token` is still a valid access.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `None`.
+ /// Otherwise, this returns the current readiness.
+ pub(in crate::net::driver) fn get_readiness(&self, token: usize) -> Option<usize> {
+ let gen = token & Generation::MASK;
+ let ready = self.readiness.load(Ordering::Acquire);
+ if ready & Generation::MASK != gen {
+ return None;
+ }
+ Some(ready & (!Generation::MASK))
+ }
+
+ /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
+ /// the current value, returning the previous readiness value.
+ ///
+ /// # Arguments
+ /// - `token`: the token for this `ScheduledIo`.
+ /// - `f`: a closure returning a new readiness value given the previous
+ /// readiness.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `Err`.
+ /// Otherwise, this returns the previous readiness.
+ pub(in crate::net::driver) fn set_readiness(
+ &self,
+ token: usize,
+ f: impl Fn(usize) -> usize,
+ ) -> Result<usize, ()> {
+ let gen = token & Generation::MASK;
+ let mut current = self.readiness.load(Ordering::Acquire);
+ loop {
+ // Check that the generation for this access is still the current
+ // one.
+ if current & Generation::MASK != gen {
+ return Err(());
+ }
+ // Mask out the generation bits so that the modifying function
+ // doesn't see them.
+ let current_readiness = current & mio::Ready::all().as_usize();
+ let new = f(current_readiness);
+ debug_assert!(
+ new < Generation::ONE,
+ "new readiness value would overwrite generation bits!"
+ );
+
+ match self.readiness.compare_exchange(
+ current,
+ new | gen,
+ Ordering: