summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mpsc
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r--tokio/src/sync/mpsc/block.rs387
-rw-r--r--tokio/src/sync/mpsc/bounded.rs337
-rw-r--r--tokio/src/sync/mpsc/chan.rs451
-rw-r--r--tokio/src/sync/mpsc/list.rs348
-rw-r--r--tokio/src/sync/mpsc/mod.rs67
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs230
6 files changed, 1820 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs
new file mode 100644
index 00000000..aea69384
--- /dev/null
+++ b/tokio/src/sync/mpsc/block.rs
@@ -0,0 +1,387 @@
+use crate::sync::loom::{
+ sync::atomic::{AtomicPtr, AtomicUsize},
+ sync::CausalCell,
+ thread,
+};
+
+use std::mem::MaybeUninit;
+use std::ops;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+pub(crate) struct Block<T> {
+ /// The start index of this block.
+ ///
+ /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
+ start_index: usize,
+
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Bitfield tracking slots that are ready to have their values consumed.
+ ready_slots: AtomicUsize,
+
+ /// The observed `tail_position` value *after* the block has been passed by
+ /// `block_tail`.
+ observed_tail_position: CausalCell<usize>,
+
+ /// Array containing values pushed into the block. Values are stored in a
+ /// continuous array in order to improve cache line behavior when reading.
+ /// The values must be manually dropped.
+ values: Values<T>,
+}
+
+pub(crate) enum Read<T> {
+ Value(T),
+ Closed,
+}
+
+struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]);
+
+use super::BLOCK_CAP;
+
+/// Masks an index to get the block identifier
+const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
+
+/// Masks an index to get the value offset in a block.
+const SLOT_MASK: usize = BLOCK_CAP - 1;
+
+/// Flag tracking that a block has gone through the sender's release routine.
+///
+/// When this is set, the receiver may consider freeing the block.
+const RELEASED: usize = 1 << BLOCK_CAP;
+
+/// Flag tracking all senders dropped.
+///
+/// When this flag is set, the send half of the channel has closed.
+const TX_CLOSED: usize = RELEASED << 1;
+
+/// Mask covering all bits used to track slot readiness.
+const READY_MASK: usize = RELEASED - 1;
+
+/// Returns the index of the first slot in the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn start_index(slot_index: usize) -> usize {
+ BLOCK_MASK & slot_index
+}
+
+/// Returns the offset into the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn offset(slot_index: usize) -> usize {
+ SLOT_MASK & slot_index
+}
+
+impl<T> Block<T> {
+ pub(crate) fn new(start_index: usize) -> Block<T> {
+ Block {
+ // The absolute index in the channel of the first slot in the block.
+ start_index,
+
+ // Pointer to the next block in the linked list.
+ next: AtomicPtr::new(ptr::null_mut()),
+
+ ready_slots: AtomicUsize::new(0),
+
+ observed_tail_position: CausalCell::new(0),
+
+ // Value storage
+ values: unsafe { Values::uninitialized() },
+ }
+ }
+
+ /// Returns `true` if the block matches the given index
+ pub(crate) fn is_at_index(&self, index: usize) -> bool {
+ debug_assert!(offset(index) == 0);
+ self.start_index == index
+ }
+
+ /// Returns the number of blocks between `self` and the block at the
+ /// specified index.
+ ///
+ /// `start_index` must represent a block *after* `self`.
+ pub(crate) fn distance(&self, other_index: usize) -> usize {
+ debug_assert!(offset(other_index) == 0);
+ other_index.wrapping_sub(self.start_index) / BLOCK_CAP
+ }
+
+ /// Read the value at the given offset.
+ ///
+ /// Returns `None` if the slot is empty.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
+ let offset = offset(slot_index);
+
+ let ready_bits = self.ready_slots.load(Acquire);
+
+ if !is_ready(ready_bits, offset) {
+ if is_tx_closed(ready_bits) {
+ return Some(Read::Closed);
+ }
+
+ return None;
+ }
+
+ // Get the value
+ let value = self.values[offset].with(|ptr| ptr::read(ptr));
+
+ Some(Read::Value(value.assume_init()))
+ }
+
+ /// Write a value to the block at the given offset.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The slot is empty.
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
+ // Get the offset into the block
+ let slot_offset = offset(slot_index);
+
+ self.values[slot_offset].with_mut(|ptr| {
+ ptr::write(ptr, MaybeUninit::new(value));
+ });
+
+ // Release the value. After this point, the slot ref may no longer
+ // be used. It is possible for the receiver to free the memory at
+ // any point.
+ self.set_ready(slot_offset);
+ }
+
+ /// Signal to the receiver that the sender half of the list is closed.
+ pub(crate) unsafe fn tx_close(&self) {
+ self.ready_slots.fetch_or(TX_CLOSED, Release);
+ }
+
+ /// Reset the block to a blank state. This enables reusing blocks in the
+ /// channel.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * All slots are empty.
+ /// * The caller holds a unique pointer to the block.
+ pub(crate) unsafe fn reclaim(&mut self) {
+ self.start_index = 0;
+ self.next = AtomicPtr::new(ptr::null_mut());
+ self.ready_slots = AtomicUsize::new(0);
+ }
+
+ /// Release the block to the rx half for freeing.
+ ///
+ /// This function is called by the tx half once it can be guaranteed that no
+ /// more senders will attempt to access the block.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The block will no longer be accessed by any sender.
+ pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
+ // Track the observed tail_position. Any sender targetting a greater
+ // tail_position is guaranteed to not access this block.
+ self.observed_tail_position
+ .with_mut(|ptr| *ptr = tail_position);
+
+ // Set the released bit, signalling to the receiver that it is safe to
+ // free the block's memory as soon as all slots **prior** to
+ // `observed_tail_position` have been filled.
+ self.ready_slots.fetch_or(RELEASED, Release);
+ }
+
+ /// Mark a slot as ready
+ fn set_ready(&self, slot: usize) {
+ let mask = 1 << slot;
+ self.ready_slots.fetch_or(mask, Release);
+ }
+
+ /// Returns `true` when all slots have their `ready` bits set.
+ ///
+ /// This indicates that the block is in its final state and will no longer
+ /// be mutated.
+ ///
+ /// # Implementation
+ ///
+ /// The implementation walks each slot checking the `ready` flag. It might
+ /// be that it would make more sense to coalesce ready flags as bits in a
+ /// single atomic cell. However, this could have negative impact on cache
+ /// behavior as there would be many more mutations to a single slot.
+ pub(crate) fn is_final(&self) -> bool {
+ self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
+ }
+
+ /// Returns the `observed_tail_position` value, if set
+ pub(crate) fn observed_tail_position(&self) -> Option<usize> {
+ if 0 == RELEASED & self.ready_slots.load(Acquire) {
+ None
+ } else {
+ Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
+ }
+ }
+
+ /// Load the next block
+ pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
+ let ret = NonNull::new(self.next.load(ordering));
+
+ debug_assert!(unsafe {
+ ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
+ .unwrap_or(true)
+ });
+
+ ret
+ }
+
+ /// Push `block` as the next block in the link.
+ ///
+ /// Returns Ok if successful, otherwise, a pointer to the next block in
+ /// the list is returned.
+ ///
+ /// This requires that the next pointer is null.
+ ///
+ /// # Ordering
+ ///
+ /// This performs a compare-and-swap on `next` using AcqRel ordering.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * `block` is not freed until it has been removed from the list.
+ pub(crate) unsafe fn try_push(
+ &self,
+ block: &mut NonNull<Block<T>>,
+ ordering: Ordering,
+ ) -> Result<(), NonNull<Block<T>>> {
+ block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
+
+ let next_ptr = self
+ .next
+ .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
+
+ match NonNull::new(next_ptr) {
+ Some(next_ptr) => Err(next_ptr),
+ None => Ok(()),
+ }
+ }
+
+ /// Grow the `Block` linked list by allocating and appending a new block.
+ ///
+ /// The next block in the linked list is returned. This may or may not be
+ /// the one allocated by the function call.
+ ///
+ /// # Implementation
+ ///
+ /// It is assumed that `self.next` is null. A new block is allocated with
+ /// `start_index` set to be the next block. A compare-and-swap is performed
+ /// with AcqRel memory ordering. If the compare-and-swap is successful, the
+ /// newly allocated block is released to other threads walking the block
+ /// linked list. If the compare-and-swap fails, the current thread acquires
+ /// the next block in the linked list, allowing the current thread to access
+ /// the slots.
+ pub(crate) fn grow(&self) -> NonNull<Block<T>> {
+ // Create the new block. It is assumed that the block will become the
+ // next one after `&self`. If this turns out to not be the case,
+ // `start_index` is updated accordingly.
+ let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
+
+ let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
+
+ // Attempt to store the block. The first compare-and-swap attempt is
+ // "unrolled" due to minor differences in logic
+ //
+ // `AcqRel` is used as the ordering **only** when attempting the
+ // compare-and-swap on self.next.
+ //
+ // If the compare-and-swap fails, then the actual value of the cell is
+ // returned from this function and accessed by the caller. Given this,
+ // the memory must be acquired.
+ //
+ // `Release` ensures that the newly allocated block is available to
+ // other threads acquiring the next pointer.
+ let next = NonNull::new(self.next.compare_and_swap(
+ ptr::null_mut(),
+ new_block.as_ptr(),
+ AcqRel,
+ ));
+
+ let next = match next {
+ Some(next) => next,
+ None => {
+ // The compare-and-swap succeeded and the newly allocated block
+ // is successfully pushed.
+ return new_block;
+ }
+ };
+
+ // There already is a next block in the linked list. The newly allocated
+ // block could be dropped and the discovered next block returned;
+ // however, that would be wasteful. Instead, the linked list is walked
+ // by repeatedly attempting to compare-and-swap the pointer into the
+ // `next` register until the compare-and-swap succeed.
+ //
+ // Care is taken to update new_block's start_index field as appropriate.
+
+ let mut curr = next;
+
+ // TODO: Should this iteration be capped?
+ loop {
+ let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
+
+ curr = match actual {
+ Ok(_) => {
+ return next;
+ }
+ Err(curr) => curr,
+ };
+
+ // When running outside of loom, this calls `spin_loop_hint`.
+ thread::yield_now();
+ }
+ }
+}
+
+/// Returns `true` if the specificed slot has a value ready to be consumed.
+fn is_ready(bits: usize, slot: usize) -> bool {
+ let mask = 1 << slot;
+ mask == mask & bits
+}
+
+/// Returns `true` if the closed flag has been set.
+fn is_tx_closed(bits: usize) -> bool {
+ TX_CLOSED == bits & TX_CLOSED
+}
+
+impl<T> Values<T> {
+ unsafe fn uninitialized() -> Values<T> {
+ let mut vals = MaybeUninit::uninit();
+
+ // When fuzzing, `CausalCell` needs to be initialized.
+ if_loom! {
+ let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
+ for i in 0..BLOCK_CAP {
+ p.add(i)
+ .write(CausalCell::new(MaybeUninit::uninit()));
+ }
+ }
+
+ Values(vals.assume_init())
+ }
+}
+
+impl<T> ops::Index<usize> for Values<T> {
+ type Output = CausalCell<MaybeUninit<T>>;
+
+ fn index(&self, index: usize) -> &Self::Output {
+ self.0.index(index)
+ }
+}
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
new file mode 100644
index 00000000..787dd507
--- /dev/null
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -0,0 +1,337 @@
+use crate::sync::mpsc::chan;
+use crate::sync::semaphore;
+
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Send values to the associated `Receiver`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Sender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `Sender`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Receiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Error returned by the `Sender`.
+#[derive(Debug)]
+pub struct SendError(());
+
+/// Error returned by `Sender::try_send`.
+#[derive(Debug)]
+pub struct TrySendError<T> {
+ kind: ErrorKind,
+ value: T,
+}
+
+#[derive(Debug)]
+enum ErrorKind {
+ Closed,
+ NoCapacity,
+}
+
+/// Error returned by `Receiver`.
+#[derive(Debug)]
+pub struct RecvError(());
+
+/// Create a bounded mpsc channel for communicating between asynchronous tasks,
+/// returning the sender/receiver halves.
+///
+/// All data sent on `Sender` will become available on `Receiver` in the same
+/// order as it was sent.
+///
+/// The `Sender` can be cloned to `send` to the same channel from multiple code
+/// locations. Only one `Receiver` is supported.
+///
+/// If the `Receiver` is disconnected while trying to `send`, the `send` method
+/// will return a `SendError`. Similarly, if `Sender` is disconnected while
+/// trying to `recv`, the `recv` method will return a `RecvError`.
+///
+/// # Examples
+///
+/// ```rust
+/// use tokio::sync::mpsc;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (mut tx, mut rx) = mpsc::channel(100);
+///
+/// tokio::spawn(async move {
+/// for i in 0..10 {
+/// if let Err(_) = tx.send(i).await {
+/// println!("receiver dropped");
+/// return;
+/// }
+/// }
+/// });
+///
+/// while let Some(i) = rx.recv().await {
+/// println!("got = {}", i);
+/// }
+/// }
+/// ```
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
+ let semaphore = (semaphore::Semaphore::new(buffer), buffer);
+ let (tx, rx) = chan::channel(semaphore);
+
+ let tx = Sender::new(tx);
+ let rx = Receiver::new(rx);
+
+ (tx, rx)
+}
+
+/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
+/// representing the channel bound.
+type Semaphore = (semaphore::Semaphore, usize);
+
+impl<T> Receiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
+ Receiver { chan }
+ }
+
+ /// Receive the next value for this receiver.
+ ///
+ /// `None` is returned when all `Sender` halves have dropped, indicating
+ /// that no further values can be sent on the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("hello").await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tx.send("hello").await.unwrap();
+ /// tx.send("world").await.unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+}
+
+impl<T> futures_core::Stream for Receiver<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.get_mut().poll_recv(cx)
+ }
+}
+
+impl<T> Sender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
+ Sender { chan }
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ self.chan.poll_ready(cx).map_err(|_| SendError(()))
+ }
+
+ /// Attempts to send a message on this `Sender`, returning the message
+ /// if there was an error.
+ pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
+ self.chan.try_send(message)?;
+ Ok(())
+ }
+
+ /// Send a value, waiting until there is capacity.
+ ///
+ /// # Examples
+ ///
+ /// In the following example, each call to `send` will block until the
+ /// previously sent value was received.
+ ///
+ /// ```rust
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(_) = tx.send(i).await {
+ /// println!("receiver dropped");
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// }
+ /// }
+ /// ```
+ pub async fn send(&mut self, value: T) -> Result<(), SendError> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_ready(cx)).await?;
+
+ self.try_send(value).map_err(|_| SendError(()))
+ }
+}
+
+impl<T> futures_sink::Sink<T> for Sender<T> {
+ type Error = SendError;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Sender::poll_ready(self.get_mut(), cx)
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.as_mut().try_send(msg).map_err(|err| {
+ assert!(err.is_full(), "call `poll_ready` before sending");
+ SendError(())
+ })
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+// ===== impl SendError =====
+
+impl fmt::Display for SendError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl ::std::error::Error for SendError {}
+
+// ===== impl TrySendError =====
+
+impl<T> TrySendError<T> {
+ /// Get the inner value.
+ pub fn into_inner(self) -> T {
+ self.value
+ }
+
+ /// Did the send fail because the channel has been closed?
+ pub fn is_closed(&self) -> bool {
+ if let ErrorKind::Closed = self.kind {
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Did the send fail because the channel was at capacity?
+ pub fn is_full(&self) -> bool {
+ if let ErrorKind::NoCapacity = self.kind {
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let descr = match self.kind {
+ ErrorKind::Closed => "channel closed",
+ ErrorKind::NoCapacity => "no available capacity",
+ };
+ write!(fmt, "{}", descr)
+ }
+}
+
+impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {}
+
+impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
+ fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
+ TrySendError {
+ value,
+ kind: match err {
+ chan::TrySendError::Closed => ErrorKind::Closed,
+ chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
+ },
+ }
+ }
+}
+
+// ===== impl RecvError =====
+
+impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl ::std::error::Error for RecvError {}
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
new file mode 100644
index 00000000..fe5ff904
--- /dev/null
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -0,0 +1,451 @@
+use crate::sync::loom::{
+ future::AtomicWaker,
+ sync::atomic::AtomicUsize,
+ sync::{Arc, CausalCell},
+};
+use crate::sync::mpsc::list;
+
+use std::fmt;
+use std::process;
+use std::sync::atomic::Ordering::{AcqRel, Relaxed};
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
+
+/// Channel sender
+pub(crate) struct Tx<T, S: Semaphore> {
+ inner: Arc<Chan<T, S>>,
+ permit: S::Permit,
+}
+
+impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
+where
+ S::Permit: fmt::Debug,
+ S: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Tx")
+ .field("inner", &self.inner)
+ .field("permit", &self.permit)
+ .finish()
+ }
+}
+
+/// Channel receiver
+pub(crate) struct Rx<T, S: Semaphore> {
+ inner: Arc<Chan<T, S>>,
+}
+
+impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Rx").field("inner", &self.inner).finish()
+ }
+}
+
+#[derive(Debug, Eq, PartialEq)]
+pub(crate) enum TrySendError {
+ Closed,
+ NoPermits,
+}
+
+pub(crate) trait Semaphore {
+ type Permit;
+
+ fn new_permit() -> Self::Permit;
+
+ /// The permit is dropped without a value being sent. In this case, the
+ /// permit must be returned to the semaphore.
+ fn drop_permit(&self, permit: &mut Self::Permit);
+
+ fn is_idle(&self) -> bool;
+
+ fn add_permit(&self);
+
+ fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
+ -> Poll<Result<(), ()>>;
+
+ fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
+
+ /// A value was sent into the channel and the permit held by `tx` is
+ /// dropped. In this case, the permit should not immeditely be returned to
+ /// the semaphore. Instead, the permit is returnred to the semaphore once
+ /// the sent value is read by the rx handle.
+ fn forget(&self, permit: &mut Self::Permit);
+
+ fn close(&self);
+}
+
+struct Chan<T, S> {
+ /// Handle to the push half of the lock-free list.
+ tx: list::Tx<T>,
+
+ /// Coordinates access to channel's capacity.
+ semaphore: S,
+
+ /// Receiver waker. Notified when a value is pushed into the channel.
+ rx_waker: AtomicWaker,
+
+ /// Tracks the number of outstanding sender handles.
+ ///
+ /// When this drops to zero, the send half of the channel is closed.
+ tx_count: AtomicUsize,
+
+ /// Only accessed by `Rx` handle.
+ rx_fields: CausalCell<RxFields<T>>,
+}
+
+impl<T, S> fmt::Debug for Chan<T, S>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Chan")
+ .field("tx", &self.tx)
+ .field("semaphore", &self.semaphore)
+ .field("rx_waker", &self.rx_waker)
+ .field("tx_count", &self.tx_count)
+ .field("rx_fields", &"...")
+ .finish()
+ }
+}
+
+/// Fields only accessed by `Rx` handle.
+struct RxFields<T> {
+ /// Channel receiver. This field is only accessed by the `Receiver` type.
+ list: list::Rx<T>,
+
+ /// `true` if `Rx::close` is called.
+ rx_closed: bool,
+}
+
+impl<T> fmt::Debug for RxFields<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("RxFields")
+ .field("list", &self.list)
+ .field("rx_closed", &self.rx_closed)
+ .finish()
+ }
+}
+
+unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
+unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
+
+pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
+where
+ S: Semaphore,
+{
+ let (tx, rx) = list::channel();
+
+ let chan = Arc::new(Chan {
+ tx,
+ semaphore,
+ rx_waker: AtomicWaker::new(),
+ tx_count: AtomicUsize::new(1),
+ rx_fields: CausalCell::new(RxFields {
+ list: rx,
+ rx_closed: false,
+ }),
+ });
+
+ (Tx::new(chan.clone()), Rx::new(chan))
+}
+
+// ===== impl Tx =====
+
+impl<T, S> Tx<T, S>
+where
+ S: Semaphore,
+{
+ fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
+ Tx {
+ inner: chan,
+ permit: S::new_permit(),
+ }
+ }
+
+ pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
+ self.inner.semaphore.poll_acquire(cx, &mut self.permit)
+ }
+
+ /// Send a message and notify the receiver.
+ pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
+ if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) {
+ return Err((value, e));
+ }
+
+ // Push the value
+ self.inner.tx.push(value);
+
+ // Notify the rx task
+ self.inner.rx_waker.wake();
+
+ // Release the permit
+ self.inner.semaphore.forget(&mut self.permit);
+
+ Ok(())
+ }
+}
+
+impl<T, S> Clone for Tx<T, S>
+where
+ S: Semaphore,
+{
+ fn clone(&self) -> Tx<T, S> {
+ // Using a Relaxed ordering here is sufficient as the caller holds a
+ // strong ref to `self`, preventing a concurrent decrement to zero.
+ self.inner.tx_count.fetch_add(1, Relaxed);
+
+ Tx {
+ inner: self.inner.clone(),
+ permit: S::new_permit(),
+ }
+ }
+}
+
+impl<T, S> Drop for Tx<T, S>
+where
+ S: Semaphore,
+{
+ fn drop(&mut self) {
+ self.inner.semaphore.drop_permit(&mut self.permit);
+
+ if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
+ return;
+ }
+
+ // Close the list, which sends a `Close` message
+ self.inner.tx.close();
+
+ // Notify the receiver
+ self.inner.rx_waker.wake();
+ }
+}
+
+// ===== impl Rx =====
+
+impl<T, S> Rx<T, S>
+where
+ S: Semaphore,
+{
+ fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
+ Rx { inner: chan }
+ }
+
+ pub(crate) fn close(&mut self) {
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ if rx_fields.rx_closed {
+ return;
+ }
+
+ rx_fields.rx_closed = true;
+ });
+
+ self.inner.semaphore.close();
+ }
+
+ /// Receive the next value
+ pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ use super::block::Read::*;
+
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ macro_rules! try_recv {
+ () => {
+ match rx_fields.list.pop(&self.inner.tx) {
+ Some(Value(value)) => {
+ self.inner.semaphore.add_permit();
+ return Ready(Some(value));
+ }
+ Some(Closed) => {
+ // TODO: This check may not be required as it most
+ // likely can only return `true` at this point. A
+ // channel is closed when all tx handles are
+ // dropped. Dropping a tx handle releases memory,
+ // which ensures that if dropping the tx handle is
+ // visible, then all messages sent are also visible.
+ assert!(self.inner.semaphore.is_idle());
+ return Ready(None);
+ }
+ None => {} // fall through
+ }
+ };
+ }
+
+ try_recv!();
+
+ self.inner.rx_waker.register_by_ref(cx.waker());
+
+ // It is possible that a value was pushed between attempting to read
+ // and registering the task, so we have to check the channel a
+ // second time here.
+ try_recv!();
+
+ debug!(
+ "recv; rx_closed = {:?}; is_idle = {:?}",
+ rx_fields.rx_closed,
+ self.inner.semaphore.is_idle()
+ );
+
+ if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
+ Ready(None)
+ } else {
+ Pending
+ }
+ })
+ }
+}
+
+impl<T, S> Drop for Rx<T, S>
+where
+ S: Semaphore,
+{
+ fn drop(&mut self) {
+ use super::block::Read::Value;
+
+ self.close();
+
+