summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/executor/blocking/mod.rs2
-rw-r--r--tokio/src/executor/thread_pool/shutdown.rs3
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs3
-rw-r--r--tokio/src/net/unix/incoming.rs2
-rw-r--r--tokio/src/net/unix/listener.rs1
-rw-r--r--tokio/src/net/unix/mod.rs1
-rw-r--r--tokio/src/signal/registry.rs2
-rw-r--r--tokio/src/signal/unix.rs3
-rw-r--r--tokio/src/signal/windows.rs5
-rw-r--r--tokio/src/sync/barrier.rs135
-rw-r--r--tokio/src/sync/loom.rs48
-rw-r--r--tokio/src/sync/mod.rs (renamed from tokio/src/sync.rs)46
-rw-r--r--tokio/src/sync/mpsc/block.rs387
-rw-r--r--tokio/src/sync/mpsc/bounded.rs337
-rw-r--r--tokio/src/sync/mpsc/chan.rs451
-rw-r--r--tokio/src/sync/mpsc/list.rs348
-rw-r--r--tokio/src/sync/mpsc/mod.rs67
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs230
-rw-r--r--tokio/src/sync/mutex.rs149
-rw-r--r--tokio/src/sync/oneshot.rs576
-rw-r--r--tokio/src/sync/semaphore.rs1142
-rw-r--r--tokio/src/sync/task/atomic_waker.rs323
-rw-r--r--tokio/src/sync/task/mod.rs4
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs45
-rw-r--r--tokio/src/sync/tests/loom_list.rs52
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs23
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs109
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs151
-rw-r--r--tokio/src/sync/tests/mod.rs7
-rw-r--r--tokio/src/sync/watch.rs454
-rw-r--r--tokio/src/timer/timer/entry.rs3
31 files changed, 5089 insertions, 20 deletions
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs
index 16faa03e..2ad573d8 100644
--- a/tokio/src/executor/blocking/mod.rs
+++ b/tokio/src/executor/blocking/mod.rs
@@ -3,7 +3,7 @@
use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::loom::thread;
#[cfg(feature = "blocking")]
-use tokio_sync::oneshot;
+use crate::sync::oneshot;
use std::cell::Cell;
use std::collections::VecDeque;
diff --git a/tokio/src/executor/thread_pool/shutdown.rs b/tokio/src/executor/thread_pool/shutdown.rs
index 40d8f04a..b7c4177f 100644
--- a/tokio/src/executor/thread_pool/shutdown.rs
+++ b/tokio/src/executor/thread_pool/shutdown.rs
@@ -4,8 +4,7 @@
//! dropped, the `Receiver` receives a notification.
use crate::executor::loom::sync::Arc;
-
-use tokio_sync::oneshot;
+use crate::sync::oneshot;
#[derive(Debug, Clone)]
pub(super) struct Sender {
diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
index 34a07ea8..9cd99a86 100644
--- a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
+++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
@@ -3,8 +3,7 @@ use crate::loom::{
atomic::{AtomicUsize, Ordering},
CausalCell,
};
-
-use tokio_sync::AtomicWaker;
+use crate::sync::AtomicWaker;
#[derive(Debug)]
pub(crate) struct ScheduledIo {
diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs
index 542b5e1d..a66f21da 100644
--- a/tokio/src/net/unix/incoming.rs
+++ b/tokio/src/net/unix/incoming.rs
@@ -1,5 +1,3 @@
-#![cfg(feature = "async-traits")]
-
use super::{UnixListener, UnixStream};
use futures_core::ready;
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 3a36dc90..3cf8eff3 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -90,7 +90,6 @@ impl UnixListener {
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
- #[cfg(feature = "async-traits")]
pub fn incoming(self) -> super::Incoming {
super::Incoming::new(self)
}
diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs
index 4447ca5c..977e3a0f 100644
--- a/tokio/src/net/unix/mod.rs
+++ b/tokio/src/net/unix/mod.rs
@@ -6,7 +6,6 @@ mod datagram;
pub use self::datagram::UnixDatagram;
mod incoming;
-#[cfg(feature = "async-traits")]
pub use self::incoming::Incoming;
mod listener;
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs
index 7d2fd7a9..e70d0495 100644
--- a/tokio/src/signal/registry.rs
+++ b/tokio/src/signal/registry.rs
@@ -1,6 +1,6 @@
use crate::signal::os::{OsExtraData, OsStorage};
-use tokio_sync::mpsc::Sender;
+use crate::sync::mpsc::Sender;
use lazy_static::lazy_static;
use std::ops;
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index 075e788c..87871503 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -8,8 +8,7 @@
use crate::io::AsyncRead;
use crate::net::util::PollEvented;
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
-
-use tokio_sync::mpsc::{channel, Receiver};
+use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use libc::c_int;
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
index abde334b..1e68d628 100644
--- a/tokio/src/signal/windows.rs
+++ b/tokio/src/signal/windows.rs
@@ -7,9 +7,8 @@
#![cfg(windows)]
-use super::registry::{globals, EventId, EventInfo, Init, Storage};
-
-use tokio_sync::mpsc::{channel, Receiver};
+use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage};
+use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use std::convert::TryFrom;
diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs
new file mode 100644
index 00000000..1582120e
--- /dev/null
+++ b/tokio/src/sync/barrier.rs
@@ -0,0 +1,135 @@
+use crate::sync::watch;
+
+use std::sync::Mutex;
+
+/// A barrier enables multiple threads to synchronize the beginning of some computation.
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio::sync::Barrier;
+/// use std::sync::Arc;
+/// use futures_util::future::join_all;
+///
+/// let mut handles = Vec::with_capacity(10);
+/// let barrier = Arc::new(Barrier::new(10));
+/// for _ in 0..10 {
+/// let c = barrier.clone();
+/// // The same messages will be printed together.
+/// // You will NOT see any interleaving.
+/// handles.push(async move {
+/// println!("before wait");
+/// let wr = c.wait().await;
+/// println!("after wait");
+/// wr
+/// });
+/// }
+/// // Will not resolve until all "before wait" messages have been printed
+/// let wrs = join_all(handles).await;
+/// // Exactly one barrier will resolve as the "leader"
+/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1);
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct Barrier {
+ state: Mutex<BarrierState>,
+ wait: watch::Receiver<usize>,
+ n: usize,
+}
+
+#[derive(Debug)]
+struct BarrierState {
+ waker: watch::Sender<usize>,
+ arrived: usize,
+ generation: usize,
+}
+
+impl Barrier {
+ /// Creates a new barrier that can block a given number of threads.
+ ///
+ /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all
+ /// threads at once when the `n`th thread calls `wait`.
+ pub fn new(mut n: usize) -> Barrier {
+ let (waker, wait) = crate::sync::watch::channel(0);
+
+ if n == 0 {
+ // if n is 0, it's not clear what behavior the user wants.
+ // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every
+ // .wait() immediately unblocks, so we adopt that here as well.
+ n = 1;
+ }
+
+ Barrier {
+ state: Mutex::new(BarrierState {
+ waker,
+ arrived: 0,
+ generation: 1,
+ }),
+ n,
+ wait,
+ }
+ }
+
+ /// Does not resolve until all tasks have rendezvoused here.
+ ///
+ /// Barriers are re-usable after all threads have rendezvoused once, and can
+ /// be used continuously.
+ ///
+ /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from
+ /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads
+ /// will receive a result that will return `false` from `is_leader`.
+ pub async fn wait(&self) -> BarrierWaitResult {
+ // NOTE: we are taking a _synchronous_ lock here.
+ // It is okay to do so because the critical section is fast and never yields, so it cannot
+ // deadlock even if another future is concurrently holding the lock.
+ // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than
+ // the asynchronous counter-parts, so we should use them where possible [citation needed].
+ // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
+ // a yield point, and thus marks the returned future as !Send.
+ let generation = {
+ let mut state = self.state.lock().unwrap();
+ let generation = state.generation;
+ state.arrived += 1;
+ if state.arrived == self.n {
+ // we are the leader for this generation
+ // wake everyone, increment the generation, and return
+ state
+ .waker
+ .broadcast(state.generation)
+ .expect("there is at least one receiver");
+ state.arrived = 0;
+ state.generation += 1;
+ return BarrierWaitResult(true);
+ }
+
+ generation
+ };
+
+ // we're going to have to wait for the last of the generation to arrive
+ let mut wait = self.wait.clone();
+
+ loop {
+ // note that the first time through the loop, this _will_ yield a generation
+ // immediately, since we cloned a receiver that has never seen any values.
+ if wait.recv().await.expect("sender hasn't been closed") >= generation {
+ break;
+ }
+ }
+
+ BarrierWaitResult(false)
+ }
+}
+
+/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused.
+#[derive(Debug, Clone)]
+pub struct BarrierWaitResult(bool);
+
+impl BarrierWaitResult {
+ /// Returns true if this thread from wait is the "leader thread".
+ ///
+ /// Only one thread will have `true` returned from their result, all other threads will have
+ /// `false` returned.
+ pub fn is_leader(&self) -> bool {
+ self.0
+ }
+}
diff --git a/tokio/src/sync/loom.rs b/tokio/src/sync/loom.rs
new file mode 100644
index 00000000..1b5a5c9d
--- /dev/null
+++ b/tokio/src/sync/loom.rs
@@ -0,0 +1,48 @@
+#[cfg(not(all(test, loom)))]
+mod imp {
+ pub(crate) mod future {
+ pub(crate) use crate::sync::task::AtomicWaker;
+ }
+
+ pub(crate) mod sync {
+ pub(crate) use std::sync::atomic;
+ pub(crate) use std::sync::Arc;
+
+ use std::cell::UnsafeCell;
+
+ pub(crate) struct CausalCell<T>(UnsafeCell<T>);
+
+ impl<T> CausalCell<T> {
+ pub(crate) fn new(data: T) -> CausalCell<T> {
+ CausalCell(UnsafeCell::new(data))
+ }
+
+ pub(crate) fn with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*const T) -> R,
+ {
+ f(self.0.get())
+ }
+
+ pub(crate) fn with_mut<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*mut T) -> R,
+ {
+ f(self.0.get())
+ }
+ }
+ }
+
+ pub(crate) mod thread {
+ pub(crate) fn yield_now() {
+ ::std::sync::atomic::spin_loop_hint();
+ }
+ }
+}
+
+#[cfg(all(test, loom))]
+mod imp {
+ pub(crate) use loom::*;
+}
+
+pub(crate) use self::imp::*;
diff --git a/tokio/src/sync.rs b/tokio/src/sync/mod.rs
index 36847235..84f6bd98 100644
--- a/tokio/src/sync.rs
+++ b/tokio/src/sync/mod.rs
@@ -13,6 +13,46 @@
//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
//! only stores the **most recently** sent value.
-pub use tokio_sync::Barrier;
-pub use tokio_sync::{mpsc, oneshot, watch};
-pub use tokio_sync::{Mutex, MutexGuard};
+macro_rules! debug {
+ ($($t:tt)*) => {
+ if false {
+ println!($($t)*);
+ }
+ }
+}
+
+macro_rules! if_loom {
+ ($($t:tt)*) => {{
+ #[cfg(loom)]
+ const LOOM: bool = true;
+ #[cfg(not(loom))]
+ const LOOM: bool = false;
+
+ if LOOM {
+ $($t)*
+ }
+ }}
+}
+
+mod barrier;
+pub use barrier::{Barrier, BarrierWaitResult};
+
+mod loom;
+
+pub mod mpsc;
+
+mod mutex;
+pub use mutex::{Mutex, MutexGuard};
+
+pub mod oneshot;
+
+pub mod semaphore;
+
+mod task;
+pub use task::AtomicWaker;
+
+pub mod watch;
+
+/// Unit tests
+#[cfg(test)]
+mod tests;
diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs
new file mode 100644
index 00000000..aea69384
--- /dev/null
+++ b/tokio/src/sync/mpsc/block.rs
@@ -0,0 +1,387 @@
+use crate::sync::loom::{
+ sync::atomic::{AtomicPtr, AtomicUsize},
+ sync::CausalCell,
+ thread,
+};
+
+use std::mem::MaybeUninit;
+use std::ops;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+pub(crate) struct Block<T> {
+ /// The start index of this block.
+ ///
+ /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
+ start_index: usize,
+
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Bitfield tracking slots that are ready to have their values consumed.
+ ready_slots: AtomicUsize,
+
+ /// The observed `tail_position` value *after* the block has been passed by
+ /// `block_tail`.
+ observed_tail_position: CausalCell<usize>,
+
+ /// Array containing values pushed into the block. Values are stored in a
+ /// continuous array in order to improve cache line behavior when reading.
+ /// The values must be manually dropped.
+ values: Values<T>,
+}
+
+pub(crate) enum Read<T> {
+ Value(T),
+ Closed,
+}
+
+struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]);
+
+use super::BLOCK_CAP;
+
+/// Masks an index to get the block identifier
+const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
+
+/// Masks an index to get the value offset in a block.
+const SLOT_MASK: usize = BLOCK_CAP - 1;
+
+/// Flag tracking that a block has gone through the sender's release routine.
+///
+/// When this is set, the receiver may consider freeing the block.
+const RELEASED: usize = 1 << BLOCK_CAP;
+
+/// Flag tracking all senders dropped.
+///
+/// When this flag is set, the send half of the channel has closed.
+const TX_CLOSED: usize = RELEASED << 1;
+
+/// Mask covering all bits used to track slot readiness.
+const READY_MASK: usize = RELEASED - 1;
+
+/// Returns the index of the first slot in the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn start_index(slot_index: usize) -> usize {
+ BLOCK_MASK & slot_index
+}
+
+/// Returns the offset into the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn offset(slot_index: usize) -> usize {
+ SLOT_MASK & slot_index
+}
+
+impl<T> Block<T> {
+ pub(crate) fn new(start_index: usize) -> Block<T> {
+ Block {
+ // The absolute index in the channel of the first slot in the block.
+ start_index,
+
+ // Pointer to the next block in the linked list.
+ next: AtomicPtr::new(ptr::null_mut()),
+
+ ready_slots: AtomicUsize::new(0),
+
+ observed_tail_position: CausalCell::new(0),
+
+ // Value storage
+ values: unsafe { Values::uninitialized() },
+ }
+ }
+
+ /// Returns `true` if the block matches the given index
+ pub(crate) fn is_at_index(&self, index: usize) -> bool {
+ debug_assert!(offset(index) == 0);
+ self.start_index == index
+ }
+
+ /// Returns the number of blocks between `self` and the block at the
+ /// specified index.
+ ///
+ /// `start_index` must represent a block *after* `self`.
+ pub(crate) fn distance(&self, other_index: usize) -> usize {
+ debug_assert!(offset(other_index) == 0);
+ other_index.wrapping_sub(self.start_index) / BLOCK_CAP
+ }
+
+ /// Read the value at the given offset.
+ ///
+ /// Returns `None` if the slot is empty.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
+ let offset = offset(slot_index);
+
+ let ready_bits = self.ready_slots.load(Acquire);
+
+ if !is_ready(ready_bits, offset) {
+ if is_tx_closed(ready_bits) {
+ return Some(Read::Closed);
+ }
+
+ return None;
+ }
+
+ // Get the value
+ let value = self.values[offset].with(|ptr| ptr::read(ptr));
+
+ Some(Read::Value(value.assume_init()))
+ }
+
+ /// Write a value to the block at the given offset.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The slot is empty.
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
+ // Get the offset into the block
+ let slot_offset = offset(slot_index);
+
+ self.values[slot_offset].with_mut(|ptr| {
+ ptr::write(ptr, MaybeUninit::new(value));
+ });
+
+ // Release the value. After this point, the slot ref may no longer
+ // be used. It is possible for the receiver to free the memory at
+ // any point.
+ self.set_ready(slot_offset);
+ }
+
+ /// Signal to the receiver that the sender half of the list is closed.
+ pub(crate) unsafe fn tx_close(&self) {
+ self.ready_slots.fetch_or(TX_CLOSED, Release);
+ }
+
+ /// Reset the block to a blank state. This enables reusing blocks in the
+ /// channel.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * All slots are empty.
+ /// * The caller holds a unique pointer to the block.
+ pub(crate) unsafe fn reclaim(&mut self) {
+ self.start_index = 0;
+ self.next = AtomicPtr::new(ptr::null_mut());
+ self.ready_slots = AtomicUsize::new(0);
+ }
+
+ /// Release the block to the rx half for freeing.
+ ///
+ /// This function is called by the tx half once it can be guaranteed that no
+ /// more senders will attempt to access the block.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The block will no longer be accessed by any sender.
+ pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
+ // Track the observed tail_position. Any sender targetting a greater
+ // tail_position is guaranteed to not access this block.
+ self.observed_tail_position
+ .with_mut(|ptr| *ptr = tail_position);
+
+ // Set the released bit, signalling to the receiver that it is safe to
+ // free the block's memory as soon as all slots **prior** to
+ // `observed_tail_position` have been filled.
+ self.ready_slots.fetch_or(RELEASED, Release);
+ }
+
+ /// Mark a slot as ready
+ fn set_ready(&self, slot: usize) {
+ let mask = 1 << slot;
+ self.ready_slots.fetch_or(mask, Release);
+ }
+
+ /// Returns `true` when all slots have their `ready` bits set.
+ ///
+ /// This indicates that the block is in its final state and will no longer
+ /// be mutated.
+ ///
+ /// # Implementation
+ ///
+ /// The implementation walks each slot checking the `ready` flag. It might
+ /// be that it would make more sense to coalesce ready flags as bits in a
+ /// single atomic cell. However, this could have negative impact on cache
+ /// behavior as there would be many more mutations to a single slot.
+ pub(crate) fn is_final(&self) -> bool {
+ self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
+ }
+
+ /// Returns the `observed_tail_position` value, if set
+ pub(crate) fn observed_tail_position(&self) -> Option<usize> {
+ if 0 == RELEASED & self.ready_slots.load(Acquire) {
+ None
+ } else {
+ Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
+ }
+ }
+
+ /// Load the next block
+ pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
+ let ret = NonNull::new(self.next.load(ordering));
+
+ debug_assert!(unsafe {
+ ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
+ .unwrap_or(true)
+ });
+
+ ret
+ }
+
+ /// Push `block` as the next block in the link.
+ ///
+ /// Returns Ok if successful, otherwise, a pointer to the next block in
+ /// the list is returned.
+ ///
+ /// This requires that the next pointer is null.
+ ///
+ /// # Ordering
+ ///
+ /// This performs a compare-and-swap on `next` using AcqRel ordering.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * `block` is not freed until it has been removed from the list.
+ pub(crate) unsafe fn try_push(
+ &self,
+ block: &mut NonNull<Block<T>>,
+ ordering: Ordering,
+ ) -> Result<(), NonNull<Block<T>>> {
+ block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
+
+ let next_ptr = self
+ .next
+ .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
+
+ match NonNull::new(next_ptr) {
+ Some(next_ptr) => Err(next_ptr),
+ None => Ok(()),
+ }
+ }
+
+ /// Grow the `Block` linked list by allocating and appending a new block.
+ ///
+ /// The next block in the linked list is returned. This may or may not be
+ /// the one allocated by the function call.
+ ///
+ /// # Implementation
+ ///
+ /// It is assumed that `self.next` is null. A new block is allocated with
+ /// `start_index` set to be the next block. A compare-and-swap is performed
+ /// with AcqRel memory ordering. If the compare-and-swap is successful, the
+ /// newly allocated block is released to other threads walking the block
+ /// linked list. If the compare-and-swap fails, the current thread acquires
+ /// the next block in the linked list, allowing the current thread to access
+ /// the slots.
+ pub(crate) fn grow(&self) -> NonNull<Block<T>> {
+ // Create the new block. It is assumed that the block will become the
+ // next one after `&self`. If this turns out to not be the case,
+ // `start_index` is updated accordingly.
+ let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
+
+ let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
+
+ // Attempt to store the block. The first compare-and-swap attempt is
+ // "unrolled" due to minor differences in logic
+ //
+ // `AcqRel` is used as the ordering **only** when attempting the
+ // compare-and-swap on self.next.
+ //
+ // If the compare-and-swap fails, then the actual value of the cell is
+ // returned from this function and accessed by the caller. Given this,
+ // the memory must be acquired.
+ //
+ // `Release` ensures that the newly allocated block is available to
+ // other threads acquiring the next pointer.
+ let next = NonNull::new(self.next.compare_and_swap(
+ ptr::null_mut(),
+ new_block.as_ptr(),
+ AcqRel,
+ ));
+
+ let next = match next {
+ Some(next) => next,
+ None => {
+ // The compare-and-swap succeeded and the newly allocated block
+ // is successfully pushed.
+ return new_block;
+ }
+ };
+
+ // There already is a next block in the linked list. The newly allocated
+ // block could be dropped and the discovered next block returned;
+ // however, that would be wasteful. Instead, the linked list is walked
+ // by repeatedly attempting to compare-and-swap the pointer into the
+ // `next` register until the compare-and-swap succeed.
+ //
+ // Care is taken to update new_block's start_index field as appropriate.
+
+ let mut curr = next;
+
+ // TODO: Should this iteration be capped?
+ loop {
+ let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
+
+ curr = match actual {
+ Ok(_) => {
+ return next;
+ }
+ Err(curr) => curr,
+ };
+
+ // When running outside of loom, this calls `spin_loop_hint`.
+ thread::yield_now();
+ }
+ }
+}
+
+/// Returns `true` if the specificed slot has a value ready to be consumed.
+fn is_ready(bits: usize, slot: usize) -> bool {
+ let mask = 1 << slot;
+ mask == mask & bits
+}
+
+/// Returns `true` if the closed flag has been set.
+fn is_tx_closed(bits: usize) -> bool {
+ TX_CLOSED == bits & TX_CLOSED
+}
+
+impl<T> Values<T> {
+ unsafe fn uninitialized() -> Values<T> {
+ let mut vals = MaybeUninit::uninit();
+
+ // When fuzzing, `CausalCell` needs to be initialized.
+ if_loom! {
+ let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
+ for i in 0..BLOCK_CAP {
+ p.add(i)
+ .write(CausalCell::new(MaybeUninit::uninit()));
+ }
+ }
+
+ Values(vals.assume_init())
+ }
+}
+
+impl<T> ops::Index<usize> for Values<T> {
+ type Output = CausalCell<MaybeUninit<T>>;
+
+ fn index(&self, index: usize) -> &Self::Output {
+ self.0.index(index)
+ }
+}
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
new file mode 100644
index 00000000..787dd507
--- /dev/null
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -0,0 +1,337 @@
+use crate::sync::mpsc::chan;
+use crate::sync::semaphore;
+
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Send values to the associated `Receiver`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Sender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `Sender`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Receiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Error returned by the `Sender`.
+#[derive(Debug)]
+pub struct SendError(());
+
+/// Error returned by `Sender::try_send`.
+#[derive(Debug)]
+pub struct Tr