summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--azure-pipelines.yml1
-rw-r--r--tokio/Cargo.toml10
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/loom.rs45
-rw-r--r--tokio/src/net/driver/mod.rs8
-rw-r--r--tokio/src/net/driver/reactor/dispatch/iter.rs53
-rw-r--r--tokio/src/net/driver/reactor/dispatch/mod.rs36
-rw-r--r--tokio/src/net/driver/reactor/dispatch/pack.rs89
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/mod.rs257
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs171
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/stack.rs149
-rw-r--r--tokio/src/net/driver/reactor/dispatch/sharded_slab.rs274
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/mod.rs204
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/single_shard.rs181
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/small_slab.rs473
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/mod.rs30
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tid.rs168
-rw-r--r--tokio/src/net/driver/reactor/mod.rs (renamed from tokio/src/net/driver/reactor.rs)238
-rw-r--r--tokio/src/net/driver/registration.rs16
-rw-r--r--tokio/src/net/driver/sharded_rwlock.rs217
-rw-r--r--tokio/src/process/mod.rs2
-rw-r--r--tokio/src/process/unix/orphan.rs2
-rw-r--r--tokio/src/process/unix/reap.rs2
-rw-r--r--tokio/src/signal/registry.rs2
-rw-r--r--tokio/src/signal/unix.rs2
-rw-r--r--tokio/src/signal/windows.rs2
-rw-r--r--tokio/src/timer/wheel/level.rs2
-rw-r--r--tokio/src/timer/wheel/mod.rs2
28 files changed, 2326 insertions, 314 deletions
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index f9b0d505..5b3188ea 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -82,6 +82,7 @@ jobs:
rust: beta
crates:
- tokio-executor
+ - tokio
# Try cross compiling
- template: ci/azure-cross-compile.yml
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 6bbbf2c6..aa60ae22 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -41,7 +41,7 @@ io-util = ["io-traits", "pin-project", "memchr"]
io = ["io-traits", "io-util"]
macros = ["tokio-macros"]
net-full = ["tcp", "udp", "uds"]
-net-driver = ["mio", "tokio-executor/blocking"]
+net-driver = ["mio", "tokio-executor/blocking", "lazy_static"]
rt-current-thread = [
"timer",
"tokio-executor/current-thread",
@@ -113,6 +113,10 @@ version = "0.3.8"
default-features = false
optional = true
+[target.'cfg(loom)'.dependencies]
+# play nice with loom tests in other crates.
+loom = "0.2.11"
+
[dev-dependencies]
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }
tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
@@ -130,5 +134,9 @@ serde_json = "1.0"
tempfile = "3.1.0"
time = "0.1"
+# sharded slab tests
+loom = "0.2.11"
+proptest = "0.9.4"
+
[package.metadata.docs.rs]
all-features = true
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 249bf47a..f5e88b4a 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -69,7 +69,6 @@
//! }
//! }
//! ```
-
macro_rules! if_runtime {
($($i:item)*) => ($(
#[cfg(any(
@@ -97,6 +96,9 @@ pub mod io;
#[cfg(feature = "net-driver")]
pub mod net;
+#[cfg(feature = "net-driver")]
+mod loom;
+
pub mod prelude;
#[cfg(feature = "process")]
diff --git a/tokio/src/loom.rs b/tokio/src/loom.rs
new file mode 100644
index 00000000..57ce2df6
--- /dev/null
+++ b/tokio/src/loom.rs
@@ -0,0 +1,45 @@
+//! This module abstracts over `loom` and `std::sync` depending on whether we
+//! are running tests or not.
+pub(crate) use self::inner::*;
+
+#[cfg(all(test, loom))]
+mod inner {
+ pub(crate) use loom::sync::CausalCell;
+ pub(crate) use loom::sync::Mutex;
+ pub(crate) mod atomic {
+ pub(crate) use loom::sync::atomic::*;
+ pub(crate) use std::sync::atomic::Ordering;
+ }
+}
+
+#[cfg(not(all(test, loom)))]
+mod inner {
+ use std::cell::UnsafeCell;
+ pub(crate) use std::sync::atomic;
+ pub(crate) use std::sync::Mutex;
+
+ #[derive(Debug)]
+ pub(crate) struct CausalCell<T>(UnsafeCell<T>);
+
+ impl<T> CausalCell<T> {
+ pub(crate) fn new(data: T) -> CausalCell<T> {
+ CausalCell(UnsafeCell::new(data))
+ }
+
+ #[inline(always)]
+ pub(crate) fn with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*const T) -> R,
+ {
+ f(self.0.get())
+ }
+
+ #[inline(always)]
+ pub(crate) fn with_mut<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*mut T) -> R,
+ {
+ f(self.0.get())
+ }
+ }
+}
diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs
index 9079ccc7..b7f33d02 100644
--- a/tokio/src/net/driver/mod.rs
+++ b/tokio/src/net/driver/mod.rs
@@ -124,7 +124,15 @@
//! [`PollEvented`]: struct.PollEvented.html
//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
+#[cfg(loom)]
+macro_rules! loom_thread_local {
+ ($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
+}
+#[cfg(not(loom))]
+macro_rules! loom_thread_local {
+ ($($tts:tt)+) => { std::thread_local!{ $($tts)+ } }
+}
pub(crate) mod platform;
mod reactor;
mod registration;
diff --git a/tokio/src/net/driver/reactor/dispatch/iter.rs b/tokio/src/net/driver/reactor/dispatch/iter.rs
new file mode 100644
index 00000000..f785f5d4
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/iter.rs
@@ -0,0 +1,53 @@
+use super::{
+ page::{self, ScheduledIo},
+ Shard,
+};
+use std::slice;
+
+pub(in crate::net::driver::reactor) struct UniqueIter<'a> {
+ pub(super) shards: slice::IterMut<'a, Shard>,
+ pub(super) pages: slice::Iter<'a, page::Shared>,
+ pub(super) slots: Option<page::Iter<'a>>,
+}
+
+impl<'a> Iterator for UniqueIter<'a> {
+ type Item = &'a ScheduledIo;
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) {
+ return Some(item);
+ }
+
+ if let Some(page) = self.pages.next() {
+ self.slots = page.iter();
+ }
+
+ if let Some(shard) = self.shards.next() {
+ self.pages = shard.iter();
+ } else {
+ return None;
+ }
+ }
+ }
+}
+
+pub(in crate::net::driver::reactor) struct ShardIter<'a> {
+ pub(super) pages: slice::IterMut<'a, page::Shared>,
+ pub(super) slots: Option<page::Iter<'a>>,
+}
+
+impl<'a> Iterator for ShardIter<'a> {
+ type Item = &'a ScheduledIo;
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) {
+ return Some(item);
+ }
+ if let Some(page) = self.pages.next() {
+ self.slots = page.iter();
+ } else {
+ return None;
+ }
+ }
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/mod.rs b/tokio/src/net/driver/reactor/dispatch/mod.rs
new file mode 100644
index 00000000..d7262a35
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/mod.rs
@@ -0,0 +1,36 @@
+//! A lock-free concurrent slab.
+
+#[cfg(all(test, loom))]
+macro_rules! test_println {
+ ($($arg:tt)*) => {
+ println!("{:?} {}", crate::net::driver::reactor::dispatch::Tid::current(), format_args!($($arg)*))
+ }
+}
+
+mod iter;
+mod pack;
+mod page;
+mod sharded_slab;
+mod tid;
+
+#[cfg(all(test, loom))]
+// this is used by sub-modules
+use self::tests::test_util;
+use pack::{Pack, WIDTH};
+use sharded_slab::Shard;
+#[cfg(all(test, loom))]
+pub(crate) use sharded_slab::Slab;
+pub(crate) use sharded_slab::{SingleShard, MAX_SOURCES};
+use tid::Tid;
+
+#[cfg(target_pointer_width = "64")]
+const MAX_THREADS: usize = 4096;
+#[cfg(target_pointer_width = "32")]
+const MAX_THREADS: usize = 2048;
+const INITIAL_PAGE_SIZE: usize = 32;
+const MAX_PAGES: usize = WIDTH / 4;
+// Chosen arbitrarily.
+const RESERVED_BITS: usize = 5;
+
+#[cfg(test)]
+mod tests;
diff --git a/tokio/src/net/driver/reactor/dispatch/pack.rs b/tokio/src/net/driver/reactor/dispatch/pack.rs
new file mode 100644
index 00000000..0be44a48
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/pack.rs
@@ -0,0 +1,89 @@
+pub(super) const WIDTH: usize = std::mem::size_of::<usize>() * 8;
+
+/// Trait encapsulating the calculations required for bit-packing slab indices.
+///
+/// This allows us to avoid manually repeating some calculations when packing
+/// and unpacking indices.
+pub(crate) trait Pack: Sized {
+ // ====== provided by each implementation =================================
+
+ /// The number of bits occupied by this type when packed into a usize.
+ ///
+ /// This must be provided to determine the number of bits into which to pack
+ /// the type.
+ const LEN: usize;
+ /// The type packed on the less significant side of this type.
+ ///
+ /// If this type is packed into the least significant bit of a usize, this
+ /// should be `()`, which occupies no bytes.
+ ///
+ /// This is used to calculate the shift amount for packing this value.
+ type Prev: Pack;
+
+ // ====== calculated automatically ========================================
+
+ /// A number consisting of `Self::LEN` 1 bits, starting at the least
+ /// significant bit.
+ ///
+ /// This is the higest value this type can represent. This number is shifted
+ /// left by `Self::SHIFT` bits to calculate this type's `MASK`.
+ ///
+ /// This is computed automatically based on `Self::LEN`.
+ const BITS: usize = {
+ let shift = 1 << (Self::LEN - 1);
+ shift | (shift - 1)
+ };
+ /// The number of bits to shift a number to pack it into a usize with other
+ /// values.
+ ///
+ /// This is caculated automatically based on the `LEN` and `SHIFT` constants
+ /// of the previous value.
+ const SHIFT: usize = Self::Prev::SHIFT + Self::Prev::LEN;
+
+ /// The mask to extract only this type from a packed `usize`.
+ ///
+ /// This is calculated by shifting `Self::BITS` left by `Self::SHIFT`.
+ const MASK: usize = Self::BITS << Self::SHIFT;
+
+ fn as_usize(&self) -> usize;
+ fn from_usize(val: usize) -> Self;
+
+ #[inline(always)]
+ fn pack(&self, to: usize) -> usize {
+ let value = self.as_usize();
+ debug_assert!(value <= Self::BITS);
+
+ (to & !Self::MASK) | (value << Self::SHIFT)
+ }
+
+ #[inline(always)]
+ fn from_packed(from: usize) -> Self {
+ let value = (from & Self::MASK) >> Self::SHIFT;
+ debug_assert!(value <= Self::BITS);
+ Self::from_usize(value)
+ }
+}
+
+impl Pack for () {
+ const BITS: usize = 0;
+ const LEN: usize = 0;
+ const SHIFT: usize = 0;
+ const MASK: usize = 0;
+
+ type Prev = ();
+
+ fn as_usize(&self) -> usize {
+ unreachable!()
+ }
+ fn from_usize(_val: usize) -> Self {
+ unreachable!()
+ }
+
+ fn pack(&self, _to: usize) -> usize {
+ unreachable!()
+ }
+
+ fn from_packed(_from: usize) -> Self {
+ unreachable!()
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/page/mod.rs b/tokio/src/net/driver/reactor/dispatch/page/mod.rs
new file mode 100644
index 00000000..0b8d3c4c
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/page/mod.rs
@@ -0,0 +1,257 @@
+use super::{Pack, INITIAL_PAGE_SIZE, WIDTH};
+use crate::loom::CausalCell;
+
+pub(crate) mod scheduled_io;
+mod stack;
+pub(crate) use self::scheduled_io::ScheduledIo;
+use self::stack::TransferStack;
+use std::fmt;
+
+/// A page address encodes the location of a slot within a shard (the page
+/// number and offset within that page) as a single linear value.
+#[repr(transparent)]
+#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
+pub(crate) struct Addr {
+ addr: usize,
+}
+
+impl Addr {
+ const NULL: usize = Self::BITS + 1;
+ const INDEX_SHIFT: usize = INITIAL_PAGE_SIZE.trailing_zeros() as usize + 1;
+
+ pub(crate) fn index(self) -> usize {
+ // Since every page is twice as large as the previous page, and all page sizes
+ // are powers of two, we can determine the page index that contains a given
+ // address by shifting the address down by the smallest page size and
+ // looking at how many twos places necessary to represent that number,
+ // telling us what power of two page size it fits inside of. We can
+ // determine the number of twos places by counting the number of leading
+ // zeros (unused twos places) in the number's binary representation, and
+ // subtracting that count from the total number of bits in a word.
+ WIDTH - ((self.addr + INITIAL_PAGE_SIZE) >> Self::INDEX_SHIFT).leading_zeros() as usize
+ }
+
+ pub(crate) fn offset(self) -> usize {
+ self.addr
+ }
+}
+
+pub(super) fn size(n: usize) -> usize {
+ INITIAL_PAGE_SIZE * 2usize.pow(n as _)
+}
+
+impl Pack for Addr {
+ const LEN: usize = super::MAX_PAGES + Self::INDEX_SHIFT;
+
+ type Prev = ();
+
+ fn as_usize(&self) -> usize {
+ self.addr
+ }
+
+ fn from_usize(addr: usize) -> Self {
+ debug_assert!(addr <= Self::BITS);
+ Self { addr }
+ }
+}
+
+pub(in crate::net::driver) type Iter<'a> = std::slice::Iter<'a, ScheduledIo>;
+
+pub(crate) struct Local {
+ head: CausalCell<usize>,
+}
+
+pub(crate) struct Shared {
+ remote: TransferStack,
+ size: usize,
+ prev_sz: usize,
+ slab: CausalCell<Option<Box<[ScheduledIo]>>>,
+}
+
+impl Local {
+ pub(crate) fn new() -> Self {
+ Self {
+ head: CausalCell::new(0),
+ }
+ }
+
+ #[inline(always)]
+ fn head(&self) -> usize {
+ self.head.with(|head| unsafe { *head })
+ }
+
+ #[inline(always)]
+ fn set_head(&self, new_head: usize) {
+ self.head.with_mut(|head| unsafe {
+ *head = new_head;
+ })
+ }
+}
+
+impl Shared {
+ const NULL: usize = Addr::NULL;
+
+ pub(crate) fn new(size: usize, prev_sz: usize) -> Self {
+ Self {
+ prev_sz,
+ size,
+ remote: TransferStack::new(),
+ slab: CausalCell::new(None),
+ }
+ }
+
+ /// Allocates storage for this page if it does not allready exist.
+ ///
+ /// This requires unique access to the page (e.g. it is called from the
+ /// thread that owns the page, or, in the case of `SingleShard`, while the
+ /// lock is held). In order to indicate this, a reference to the page's
+ /// `Local` data is taken by this function; the `Local` argument is not
+ /// actually used, but requiring it ensures that this is only called when
+ /// local access is held.
+ #[cold]
+ fn alloc_page(&self, _: &Local) {
+ debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() }));
+
+ let mut slab = Vec::with_capacity(self.size);
+ slab.extend((1..self.size).map(ScheduledIo::new));
+ slab.push(ScheduledIo::new(Self::NULL));
+ self.slab.with_mut(|s| {
+ // this mut access is safe — it only occurs to initially
+ // allocate the page, which only happens on this thread; if the
+ // page has not yet been allocated, other threads will not try
+ // to access it yet.
+ unsafe {
+ *s = Some(slab.into_boxed_slice());
+ }
+ });
+ }
+
+ #[inline]
+ pub(crate) fn alloc(&self, local: &Local) -> Option<usize> {
+ let head = local.head();
+
+ // are there any items on the local free list? (fast path)
+ let head = if head < self.size {
+ head
+ } else {
+ // if the local free list is empty, pop all the items on the remote
+ // free list onto the local free list.
+ self.remote.pop_all()?
+ };
+
+ // if the head is still null, both the local and remote free lists are
+ // empty --- we can't fit any more items on this page.
+ if head == Self::NULL {
+ return None;
+ }
+
+ // do we need to allocate storage for this page?
+ let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() });
+ if page_needs_alloc {
+ self.alloc_page(local);
+ }
+
+ let gen = self.slab.with(|slab| {
+ let slab = unsafe { &*(slab) }
+ .as_ref()
+ .expect("page must have been allocated to alloc!");
+ let slot = &slab[head];
+ local.set_head(slot.next());
+ slot.alloc()
+ });
+
+ let index = head + self.prev_sz;
+ Some(gen.pack(index))
+ }
+
+ #[inline]
+ pub(in crate::net::driver) fn get(&self, addr: Addr) -> Option<&ScheduledIo> {
+ let page_offset = addr.offset() - self.prev_sz;
+ self.slab
+ .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset))
+ }
+
+ pub(crate) fn remove_local(&self, local: &Local, addr: Addr, idx: usize) {
+ let offset = addr.offset() - self.prev_sz;
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+ if slot.reset(scheduled_io::Generation::from_packed(idx)) {
+ slot.set_next(local.head());
+ local.set_head(offset);
+ }
+ })
+ }
+
+ pub(crate) fn remove_remote(&self, addr: Addr, idx: usize) {
+ let offset = addr.offset() - self.prev_sz;
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+ if !slot.reset(scheduled_io::Generation::from_packed(idx)) {
+ return;
+ }
+ self.remote.push(offset, |next| slot.set_next(next));
+ })
+ }
+
+ pub(in crate::net::driver) fn iter(&self) -> Option<Iter<'_>> {
+ let slab = self.slab.with(|slab| unsafe { (&*slab).as_ref() });
+ slab.map(|slab| slab.iter())
+ }
+}
+
+impl fmt::Debug for Local {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.head.with(|head| {
+ let head = unsafe { *head };
+ f.debug_struct("Local")
+ .field("head", &format_args!("{:#0x}", head))
+ .finish()
+ })
+ }
+}
+
+impl fmt::Debug for Shared {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shared")
+ .field("remote", &self.remote)
+ .field("prev_sz", &self.prev_sz)
+ .field("size", &self.size)
+ // .field("slab", &self.slab)
+ .finish()
+ }
+}
+
+impl fmt::Debug for Addr {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Addr")
+ .field("addr", &format_args!("{:#0x}", &self.addr))
+ .field("index", &self.index())
+ .field("offset", &self.offset())
+ .finish()
+ }
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+ use proptest::prelude::*;
+
+ proptest! {
+ #[test]
+ fn addr_roundtrips(pidx in 0usize..Addr::BITS) {
+ let addr = Addr::from_usize(pidx);
+ let packed = addr.pack(0);
+ assert_eq!(addr, Addr::from_packed(packed));
+ }
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
new file mode 100644
index 00000000..34a07ea8
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
@@ -0,0 +1,171 @@
+use super::super::{Pack, Tid, RESERVED_BITS, WIDTH};
+use crate::loom::{
+ atomic::{AtomicUsize, Ordering},
+ CausalCell,
+};
+
+use tokio_sync::AtomicWaker;
+
+#[derive(Debug)]
+pub(crate) struct ScheduledIo {
+ /// The offset of the next item on the free list.
+ next: CausalCell<usize>,
+ readiness: AtomicUsize,
+ pub(in crate::net::driver) reader: AtomicWaker,
+ pub(in crate::net::driver) writer: AtomicWaker,
+}
+
+#[repr(transparent)]
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
+pub(crate) struct Generation {
+ value: usize,
+}
+
+impl Pack for Generation {
+ /// Use all the remaining bits in the word for the generation counter, minus
+ /// any bits reserved by the user.
+ const LEN: usize = (WIDTH - RESERVED_BITS) - Self::SHIFT;
+
+ type Prev = Tid;
+
+ #[inline(always)]
+ fn from_usize(u: usize) -> Self {
+ debug_assert!(u <= Self::BITS);
+ Self::new(u)
+ }
+
+ #[inline(always)]
+ fn as_usize(&self) -> usize {
+ self.value
+ }
+}
+
+impl Generation {
+ const ONE: usize = 1 << Self::SHIFT;
+
+ fn new(value: usize) -> Self {
+ Self { value }
+ }
+
+ fn next(self) -> Self {
+ Self::from_usize((self.value + 1) % Self::BITS)
+ }
+}
+
+impl ScheduledIo {
+ pub(super) fn new(next: usize) -> Self {
+ Self {
+ next: CausalCell::new(next),
+ readiness: AtomicUsize::new(0),
+ reader: AtomicWaker::new(),
+ writer: AtomicWaker::new(),
+ }
+ }
+
+ #[inline]
+ pub(super) fn alloc(&self) -> Generation {
+ Generation::from_packed(self.readiness.load(Ordering::SeqCst))
+ }
+
+ #[inline(always)]
+ pub(super) fn next(&self) -> usize {
+ self.next.with(|next| unsafe { *next })
+ }
+
+ #[inline]
+ pub(super) fn reset(&self, gen: Generation) -> bool {
+ let mut current = self.readiness.load(Ordering::Acquire);
+ loop {
+ if Generation::from_packed(current) != gen {
+ return false;
+ }
+ let next_gen = gen.next().pack(0);
+ match self.readiness.compare_exchange(
+ current,
+ next_gen,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(actual) => current = actual,
+ }
+ }
+ drop(self.reader.take_waker());
+ drop(self.writer.take_waker());
+ true
+ }
+
+ #[inline(always)]
+ pub(super) fn set_next(&self, next: usize) {
+ self.next.with_mut(|n| unsafe {
+ (*n) = next;
+ })
+ }
+
+ /// Returns the current readiness value of this `ScheduledIo`, if the
+ /// provided `token` is still a valid access.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `None`.
+ /// Otherwise, this returns the current readiness.
+ pub(in crate::net::driver) fn get_readiness(&self, token: usize) -> Option<usize> {
+ let gen = token & Generation::MASK;
+ let ready = self.readiness.load(Ordering::Acquire);
+ if ready & Generation::MASK != gen {
+ return None;
+ }
+ Some(ready & (!Generation::MASK))
+ }
+
+ /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
+ /// the current value, returning the previous readiness value.
+ ///
+ /// # Arguments
+ /// - `token`: the token for this `ScheduledIo`.
+ /// - `f`: a closure returning a new readiness value given the previous
+ /// readiness.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `Err`.
+ /// Otherwise, this returns the previous readiness.
+ pub(in crate::net::driver) fn set_readiness(
+ &self,
+ token: usize,
+ f: impl Fn(usize) -> usize,
+ ) -> Result<usize, ()> {
+ let gen = token & Generation::MASK;
+ let mut current = self.readiness.load(Ordering::Acquire);
+ loop {
+ // Check that the generation for this access is still the current
+ // one.
+ if current & Generation::MASK != gen {
+ return Err(());
+ }
+ // Mask out the generation bits so that the modifying function
+ // doesn't see them.
+ let current_readiness = current & mio::Ready::all().as_usize();
+ let new = f(current_readiness);
+ debug_assert!(
+ new < Generation::ONE,
+ "new readiness value would overwrite generation bits!"
+ );
+
+ match self.readiness.compare_exchange(
+ current,
+ new | gen,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => return Ok(current),
+ // we lost the race, retry!
+ Err(actual) => current = actual,
+ }
+ }
+ }
+}
diff --git a/tokio/src/net/driver/reactor/dispatch/page/stack.rs b/tokio/src/net/driver/reactor/dispatch/page/stack.rs
new file mode 100644
index 00000000..26597dc2
--- /dev/null
+++ b/tokio/src/net/driver/reactor/dispatch/page/stack.rs
@@ -0,0 +1,149 @@
+use crate::loom::atomic::{AtomicUsize, Ordering};
+use std::fmt;
+
+pub(super) struct TransferStack {
+ head: AtomicUsize,
+}
+
+impl TransferStack {
+ pub(super) fn new() -> Self {
+ Self {
+ head: AtomicUsize::new(super::Addr::NULL),
+ }
+ }
+
+ pub(super) fn pop_all(&self) -> Option<usize> {
+ let val = self.head.swap(super::Addr::NULL, Ordering::Acquire);
+ if val == super::Addr::NULL {
+ None
+ } else {
+ Some(val)
+ }
+ }
+
+ pub(super) fn push(&self, value: usize, before: impl Fn(usize)) {
+ let mut next = self.head.load(Ordering::Relaxed);
+ loop {
+ before(next);
+
+ match self
+ .head
+ .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire)
+ {
+ // lost the race!
+ Err(actual) => next = actual,
+ Ok(_) => return,
+ }
+ }
+ }
+}
+
+impl fmt::Debug for TransferStack {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ // Loom likes to dump all its internal state in `fmt::Debug` impls, so
+ // we override this to just print the current value in tests.
+ f.debug_struct("TransferStack")
+ .field(
+ "head",
+ &format_args!("{:#x}", self.head.load(Ordering::Relaxed)),
+ )
+ .finish()
+ }
+}
+
+#[cfg(all(test, loom))]
+mod test {
+ use super::super::super::test_util;
+ use super::*;
+ use crate::loom::CausalCell;
+ use loom::thread;
+ use std::sync::Arc;
+
+ #[test]
+ fn transfer_stack() {
+ test_util::run_model("transfer_stack", || {
+ let causalities = [CausalCell::new(None), CausalCell::new(None)];
+ let shared = Arc::new((causalities, TransferStack::new()));
+ let shared1 = shared.clone();
+ let shared2 = shared.clone();
+
+ // Spawn two threads that both try to push to the stack.
+ let t1 = thread::spawn(move || {
+ let (causalities, stack) = &*shared1;
+ stack.push(0, |prev| {
+ causalities[0].with_mut(|c| unsafe {
+ *c = Some(prev);
+ });
+ test_println!("prev={:#x}", prev)
+ });
+ });
+
+ let t2 = thread::spawn(move || {
+ let (causalities, stack) = &*shared2;
+ stack.push(1, |prev| {
+ causalities[1].with_mut(|c| unsafe {
+ *c = Some(prev);
+ });
+ test_println!("prev={:#x}", prev)
+ });
+ });
+
+ let (causalities, stack) = &*shared;
+
+ // Try to pop from the stack...
+ let mut idx = stack.pop_all();
+ while idx == None {
+ idx = stack.pop_all();
+ thread::yield_now();
+ }
+ let idx = idx.unwrap();
+ test_println!("popped {:#x}", idx);
+
+ let saw_both = causalities[idx].with(|val| {
+ let val = unsafe { *val };
+ assert!(
+ val.is_some(),
+ "CausalCell write must happen-before index is pushed to the stack!",
+ );
+ // were there two entries in the stack? if so, check that
+ // both saw a write.
+ if let Some(c) = causalities.get(val.unwrap()) {
+ test_println!("saw both entries!");
+ c.with(|val| {
+ let val = unsafe { *val };
+ assert!(
+ val.is_some(),</