summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/barrier.rs135
-rw-r--r--tokio/src/sync/loom.rs48
-rw-r--r--tokio/src/sync/mod.rs58
-rw-r--r--tokio/src/sync/mpsc/block.rs387
-rw-r--r--tokio/src/sync/mpsc/bounded.rs337
-rw-r--r--tokio/src/sync/mpsc/chan.rs451
-rw-r--r--tokio/src/sync/mpsc/list.rs348
-rw-r--r--tokio/src/sync/mpsc/mod.rs67
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs230
-rw-r--r--tokio/src/sync/mutex.rs149
-rw-r--r--tokio/src/sync/oneshot.rs576
-rw-r--r--tokio/src/sync/semaphore.rs1142
-rw-r--r--tokio/src/sync/task/atomic_waker.rs323
-rw-r--r--tokio/src/sync/task/mod.rs4
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs45
-rw-r--r--tokio/src/sync/tests/loom_list.rs52
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs23
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs109
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs151
-rw-r--r--tokio/src/sync/tests/mod.rs7
-rw-r--r--tokio/src/sync/watch.rs454
21 files changed, 5096 insertions, 0 deletions
diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs
new file mode 100644
index 00000000..1582120e
--- /dev/null
+++ b/tokio/src/sync/barrier.rs
@@ -0,0 +1,135 @@
+use crate::sync::watch;
+
+use std::sync::Mutex;
+
+/// A barrier enables multiple threads to synchronize the beginning of some computation.
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio::sync::Barrier;
+/// use std::sync::Arc;
+/// use futures_util::future::join_all;
+///
+/// let mut handles = Vec::with_capacity(10);
+/// let barrier = Arc::new(Barrier::new(10));
+/// for _ in 0..10 {
+/// let c = barrier.clone();
+/// // The same messages will be printed together.
+/// // You will NOT see any interleaving.
+/// handles.push(async move {
+/// println!("before wait");
+/// let wr = c.wait().await;
+/// println!("after wait");
+/// wr
+/// });
+/// }
+/// // Will not resolve until all "before wait" messages have been printed
+/// let wrs = join_all(handles).await;
+/// // Exactly one barrier will resolve as the "leader"
+/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1);
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct Barrier {
+ state: Mutex<BarrierState>,
+ wait: watch::Receiver<usize>,
+ n: usize,
+}
+
+#[derive(Debug)]
+struct BarrierState {
+ waker: watch::Sender<usize>,
+ arrived: usize,
+ generation: usize,
+}
+
+impl Barrier {
+ /// Creates a new barrier that can block a given number of threads.
+ ///
+ /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all
+ /// threads at once when the `n`th thread calls `wait`.
+ pub fn new(mut n: usize) -> Barrier {
+ let (waker, wait) = crate::sync::watch::channel(0);
+
+ if n == 0 {
+ // if n is 0, it's not clear what behavior the user wants.
+ // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every
+ // .wait() immediately unblocks, so we adopt that here as well.
+ n = 1;
+ }
+
+ Barrier {
+ state: Mutex::new(BarrierState {
+ waker,
+ arrived: 0,
+ generation: 1,
+ }),
+ n,
+ wait,
+ }
+ }
+
+ /// Does not resolve until all tasks have rendezvoused here.
+ ///
+ /// Barriers are re-usable after all threads have rendezvoused once, and can
+ /// be used continuously.
+ ///
+ /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from
+ /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads
+ /// will receive a result that will return `false` from `is_leader`.
+ pub async fn wait(&self) -> BarrierWaitResult {
+ // NOTE: we are taking a _synchronous_ lock here.
+ // It is okay to do so because the critical section is fast and never yields, so it cannot
+ // deadlock even if another future is concurrently holding the lock.
+ // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than
+ // the asynchronous counter-parts, so we should use them where possible [citation needed].
+ // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
+ // a yield point, and thus marks the returned future as !Send.
+ let generation = {
+ let mut state = self.state.lock().unwrap();
+ let generation = state.generation;
+ state.arrived += 1;
+ if state.arrived == self.n {
+ // we are the leader for this generation
+ // wake everyone, increment the generation, and return
+ state
+ .waker
+ .broadcast(state.generation)
+ .expect("there is at least one receiver");
+ state.arrived = 0;
+ state.generation += 1;
+ return BarrierWaitResult(true);
+ }
+
+ generation
+ };
+
+ // we're going to have to wait for the last of the generation to arrive
+ let mut wait = self.wait.clone();
+
+ loop {
+ // note that the first time through the loop, this _will_ yield a generation
+ // immediately, since we cloned a receiver that has never seen any values.
+ if wait.recv().await.expect("sender hasn't been closed") >= generation {
+ break;
+ }
+ }
+
+ BarrierWaitResult(false)
+ }
+}
+
+/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused.
+#[derive(Debug, Clone)]
+pub struct BarrierWaitResult(bool);
+
+impl BarrierWaitResult {
+ /// Returns true if this thread from wait is the "leader thread".
+ ///
+ /// Only one thread will have `true` returned from their result, all other threads will have
+ /// `false` returned.
+ pub fn is_leader(&self) -> bool {
+ self.0
+ }
+}
diff --git a/tokio/src/sync/loom.rs b/tokio/src/sync/loom.rs
new file mode 100644
index 00000000..1b5a5c9d
--- /dev/null
+++ b/tokio/src/sync/loom.rs
@@ -0,0 +1,48 @@
+#[cfg(not(all(test, loom)))]
+mod imp {
+ pub(crate) mod future {
+ pub(crate) use crate::sync::task::AtomicWaker;
+ }
+
+ pub(crate) mod sync {
+ pub(crate) use std::sync::atomic;
+ pub(crate) use std::sync::Arc;
+
+ use std::cell::UnsafeCell;
+
+ pub(crate) struct CausalCell<T>(UnsafeCell<T>);
+
+ impl<T> CausalCell<T> {
+ pub(crate) fn new(data: T) -> CausalCell<T> {
+ CausalCell(UnsafeCell::new(data))
+ }
+
+ pub(crate) fn with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*const T) -> R,
+ {
+ f(self.0.get())
+ }
+
+ pub(crate) fn with_mut<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*mut T) -> R,
+ {
+ f(self.0.get())
+ }
+ }
+ }
+
+ pub(crate) mod thread {
+ pub(crate) fn yield_now() {
+ ::std::sync::atomic::spin_loop_hint();
+ }
+ }
+}
+
+#[cfg(all(test, loom))]
+mod imp {
+ pub(crate) use loom::*;
+}
+
+pub(crate) use self::imp::*;
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
new file mode 100644
index 00000000..84f6bd98
--- /dev/null
+++ b/tokio/src/sync/mod.rs
@@ -0,0 +1,58 @@
+//! Future-aware synchronization
+//!
+//! This module is enabled with the **`sync`** feature flag.
+//!
+//! Tasks sometimes need to communicate with each other. This module contains
+//! two basic abstractions for doing so:
+//!
+//! - [oneshot](oneshot/index.html), a way of sending a single value
+//! from one task to another.
+//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for
+//! sending values between tasks.
+//! - [`Mutex`](struct.Mutex.html), an asynchronous `Mutex`-like type.
+//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
+//! only stores the **most recently** sent value.
+
+macro_rules! debug {
+ ($($t:tt)*) => {
+ if false {
+ println!($($t)*);
+ }
+ }
+}
+
+macro_rules! if_loom {
+ ($($t:tt)*) => {{
+ #[cfg(loom)]
+ const LOOM: bool = true;
+ #[cfg(not(loom))]
+ const LOOM: bool = false;
+
+ if LOOM {
+ $($t)*
+ }
+ }}
+}
+
+mod barrier;
+pub use barrier::{Barrier, BarrierWaitResult};
+
+mod loom;
+
+pub mod mpsc;
+
+mod mutex;
+pub use mutex::{Mutex, MutexGuard};
+
+pub mod oneshot;
+
+pub mod semaphore;
+
+mod task;
+pub use task::AtomicWaker;
+
+pub mod watch;
+
+/// Unit tests
+#[cfg(test)]
+mod tests;
diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs
new file mode 100644
index 00000000..aea69384
--- /dev/null
+++ b/tokio/src/sync/mpsc/block.rs
@@ -0,0 +1,387 @@
+use crate::sync::loom::{
+ sync::atomic::{AtomicPtr, AtomicUsize},
+ sync::CausalCell,
+ thread,
+};
+
+use std::mem::MaybeUninit;
+use std::ops;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+pub(crate) struct Block<T> {
+ /// The start index of this block.
+ ///
+ /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
+ start_index: usize,
+
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Bitfield tracking slots that are ready to have their values consumed.
+ ready_slots: AtomicUsize,
+
+ /// The observed `tail_position` value *after* the block has been passed by
+ /// `block_tail`.
+ observed_tail_position: CausalCell<usize>,
+
+ /// Array containing values pushed into the block. Values are stored in a
+ /// continuous array in order to improve cache line behavior when reading.
+ /// The values must be manually dropped.
+ values: Values<T>,
+}
+
+pub(crate) enum Read<T> {
+ Value(T),
+ Closed,
+}
+
+struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]);
+
+use super::BLOCK_CAP;
+
+/// Masks an index to get the block identifier
+const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
+
+/// Masks an index to get the value offset in a block.
+const SLOT_MASK: usize = BLOCK_CAP - 1;
+
+/// Flag tracking that a block has gone through the sender's release routine.
+///
+/// When this is set, the receiver may consider freeing the block.
+const RELEASED: usize = 1 << BLOCK_CAP;
+
+/// Flag tracking all senders dropped.
+///
+/// When this flag is set, the send half of the channel has closed.
+const TX_CLOSED: usize = RELEASED << 1;
+
+/// Mask covering all bits used to track slot readiness.
+const READY_MASK: usize = RELEASED - 1;
+
+/// Returns the index of the first slot in the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn start_index(slot_index: usize) -> usize {
+ BLOCK_MASK & slot_index
+}
+
+/// Returns the offset into the block referenced by `slot_index`.
+#[inline(always)]
+pub(crate) fn offset(slot_index: usize) -> usize {
+ SLOT_MASK & slot_index
+}
+
+impl<T> Block<T> {
+ pub(crate) fn new(start_index: usize) -> Block<T> {
+ Block {
+ // The absolute index in the channel of the first slot in the block.
+ start_index,
+
+ // Pointer to the next block in the linked list.
+ next: AtomicPtr::new(ptr::null_mut()),
+
+ ready_slots: AtomicUsize::new(0),
+
+ observed_tail_position: CausalCell::new(0),
+
+ // Value storage
+ values: unsafe { Values::uninitialized() },
+ }
+ }
+
+ /// Returns `true` if the block matches the given index
+ pub(crate) fn is_at_index(&self, index: usize) -> bool {
+ debug_assert!(offset(index) == 0);
+ self.start_index == index
+ }
+
+ /// Returns the number of blocks between `self` and the block at the
+ /// specified index.
+ ///
+ /// `start_index` must represent a block *after* `self`.
+ pub(crate) fn distance(&self, other_index: usize) -> usize {
+ debug_assert!(offset(other_index) == 0);
+ other_index.wrapping_sub(self.start_index) / BLOCK_CAP
+ }
+
+ /// Read the value at the given offset.
+ ///
+ /// Returns `None` if the slot is empty.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
+ let offset = offset(slot_index);
+
+ let ready_bits = self.ready_slots.load(Acquire);
+
+ if !is_ready(ready_bits, offset) {
+ if is_tx_closed(ready_bits) {
+ return Some(Read::Closed);
+ }
+
+ return None;
+ }
+
+ // Get the value
+ let value = self.values[offset].with(|ptr| ptr::read(ptr));
+
+ Some(Read::Value(value.assume_init()))
+ }
+
+ /// Write a value to the block at the given offset.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The slot is empty.
+ /// * No concurrent access to the slot.
+ pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
+ // Get the offset into the block
+ let slot_offset = offset(slot_index);
+
+ self.values[slot_offset].with_mut(|ptr| {
+ ptr::write(ptr, MaybeUninit::new(value));
+ });
+
+ // Release the value. After this point, the slot ref may no longer
+ // be used. It is possible for the receiver to free the memory at
+ // any point.
+ self.set_ready(slot_offset);
+ }
+
+ /// Signal to the receiver that the sender half of the list is closed.
+ pub(crate) unsafe fn tx_close(&self) {
+ self.ready_slots.fetch_or(TX_CLOSED, Release);
+ }
+
+ /// Reset the block to a blank state. This enables reusing blocks in the
+ /// channel.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * All slots are empty.
+ /// * The caller holds a unique pointer to the block.
+ pub(crate) unsafe fn reclaim(&mut self) {
+ self.start_index = 0;
+ self.next = AtomicPtr::new(ptr::null_mut());
+ self.ready_slots = AtomicUsize::new(0);
+ }
+
+ /// Release the block to the rx half for freeing.
+ ///
+ /// This function is called by the tx half once it can be guaranteed that no
+ /// more senders will attempt to access the block.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * The block will no longer be accessed by any sender.
+ pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
+ // Track the observed tail_position. Any sender targetting a greater
+ // tail_position is guaranteed to not access this block.
+ self.observed_tail_position
+ .with_mut(|ptr| *ptr = tail_position);
+
+ // Set the released bit, signalling to the receiver that it is safe to
+ // free the block's memory as soon as all slots **prior** to
+ // `observed_tail_position` have been filled.
+ self.ready_slots.fetch_or(RELEASED, Release);
+ }
+
+ /// Mark a slot as ready
+ fn set_ready(&self, slot: usize) {
+ let mask = 1 << slot;
+ self.ready_slots.fetch_or(mask, Release);
+ }
+
+ /// Returns `true` when all slots have their `ready` bits set.
+ ///
+ /// This indicates that the block is in its final state and will no longer
+ /// be mutated.
+ ///
+ /// # Implementation
+ ///
+ /// The implementation walks each slot checking the `ready` flag. It might
+ /// be that it would make more sense to coalesce ready flags as bits in a
+ /// single atomic cell. However, this could have negative impact on cache
+ /// behavior as there would be many more mutations to a single slot.
+ pub(crate) fn is_final(&self) -> bool {
+ self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
+ }
+
+ /// Returns the `observed_tail_position` value, if set
+ pub(crate) fn observed_tail_position(&self) -> Option<usize> {
+ if 0 == RELEASED & self.ready_slots.load(Acquire) {
+ None
+ } else {
+ Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
+ }
+ }
+
+ /// Load the next block
+ pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
+ let ret = NonNull::new(self.next.load(ordering));
+
+ debug_assert!(unsafe {
+ ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
+ .unwrap_or(true)
+ });
+
+ ret
+ }
+
+ /// Push `block` as the next block in the link.
+ ///
+ /// Returns Ok if successful, otherwise, a pointer to the next block in
+ /// the list is returned.
+ ///
+ /// This requires that the next pointer is null.
+ ///
+ /// # Ordering
+ ///
+ /// This performs a compare-and-swap on `next` using AcqRel ordering.
+ ///
+ /// # Safety
+ ///
+ /// To maintain safety, the caller must ensure:
+ ///
+ /// * `block` is not freed until it has been removed from the list.
+ pub(crate) unsafe fn try_push(
+ &self,
+ block: &mut NonNull<Block<T>>,
+ ordering: Ordering,
+ ) -> Result<(), NonNull<Block<T>>> {
+ block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
+
+ let next_ptr = self
+ .next
+ .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
+
+ match NonNull::new(next_ptr) {
+ Some(next_ptr) => Err(next_ptr),
+ None => Ok(()),
+ }
+ }
+
+ /// Grow the `Block` linked list by allocating and appending a new block.
+ ///
+ /// The next block in the linked list is returned. This may or may not be
+ /// the one allocated by the function call.
+ ///
+ /// # Implementation
+ ///
+ /// It is assumed that `self.next` is null. A new block is allocated with
+ /// `start_index` set to be the next block. A compare-and-swap is performed
+ /// with AcqRel memory ordering. If the compare-and-swap is successful, the
+ /// newly allocated block is released to other threads walking the block
+ /// linked list. If the compare-and-swap fails, the current thread acquires
+ /// the next block in the linked list, allowing the current thread to access
+ /// the slots.
+ pub(crate) fn grow(&self) -> NonNull<Block<T>> {
+ // Create the new block. It is assumed that the block will become the
+ // next one after `&self`. If this turns out to not be the case,
+ // `start_index` is updated accordingly.
+ let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
+
+ let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
+
+ // Attempt to store the block. The first compare-and-swap attempt is
+ // "unrolled" due to minor differences in logic
+ //
+ // `AcqRel` is used as the ordering **only** when attempting the
+ // compare-and-swap on self.next.
+ //
+ // If the compare-and-swap fails, then the actual value of the cell is
+ // returned from this function and accessed by the caller. Given this,
+ // the memory must be acquired.
+ //
+ // `Release` ensures that the newly allocated block is available to
+ // other threads acquiring the next pointer.
+ let next = NonNull::new(self.next.compare_and_swap(
+ ptr::null_mut(),
+ new_block.as_ptr(),
+ AcqRel,
+ ));
+
+ let next = match next {
+ Some(next) => next,
+ None => {
+ // The compare-and-swap succeeded and the newly allocated block
+ // is successfully pushed.
+ return new_block;
+ }
+ };
+
+ // There already is a next block in the linked list. The newly allocated
+ // block could be dropped and the discovered next block returned;
+ // however, that would be wasteful. Instead, the linked list is walked
+ // by repeatedly attempting to compare-and-swap the pointer into the
+ // `next` register until the compare-and-swap succeed.
+ //
+ // Care is taken to update new_block's start_index field as appropriate.
+
+ let mut curr = next;
+
+ // TODO: Should this iteration be capped?
+ loop {
+ let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
+
+ curr = match actual {
+ Ok(_) => {
+ return next;
+ }
+ Err(curr) => curr,
+ };
+
+ // When running outside of loom, this calls `spin_loop_hint`.
+ thread::yield_now();
+ }
+ }
+}
+
+/// Returns `true` if the specificed slot has a value ready to be consumed.
+fn is_ready(bits: usize, slot: usize) -> bool {
+ let mask = 1 << slot;
+ mask == mask & bits
+}
+
+/// Returns `true` if the closed flag has been set.
+fn is_tx_closed(bits: usize) -> bool {
+ TX_CLOSED == bits & TX_CLOSED
+}
+
+impl<T> Values<T> {
+ unsafe fn uninitialized() -> Values<T> {
+ let mut vals = MaybeUninit::uninit();
+
+ // When fuzzing, `CausalCell` needs to be initialized.
+ if_loom! {
+ let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
+ for i in 0..BLOCK_CAP {
+ p.add(i)
+ .write(CausalCell::new(MaybeUninit::uninit()));
+ }
+ }
+
+ Values(vals.assume_init())
+ }
+}
+
+impl<T> ops::Index<usize> for Values<T> {
+ type Output = CausalCell<MaybeUninit<T>>;
+
+ fn index(&self, index: usize) -> &Self::Output {
+ self.0.index(index)
+ }
+}
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
new file mode 100644
index 00000000..787dd507
--- /dev/null
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -0,0 +1,337 @@
+use crate::sync::mpsc::chan;
+use crate::sync::semaphore;
+
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Send values to the associated `Receiver`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Sender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `Sender`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Receiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Error returned by the `Sender`.
+#[derive(Debug)]
+pub struct SendError(());
+
+/// Error returned by `Sender::try_send`.
+#[derive(Debug)]
+pub struct TrySendError<T> {
+ kind: ErrorKind,
+ value: T,
+}
+
+#[derive(Debug)]
+enum ErrorKind {
+ Closed,
+ NoCapacity,
+}
+
+/// Error returned by `Receiver`.
+#[derive(Debug)]
+pub struct RecvError(());
+
+/// Create a bounded mpsc channel for communicating between asynchronous tasks,
+/// returning the sender/receiver halves.
+///
+/// All data sent on `Sender` will become available on `Receiver` in the same
+/// order as it was sent.
+///
+/// The `Sender` can be cloned to `send` to the same channel from multiple code
+/// locations. Only one `Receiver` is supported.
+///
+/// If the `Receiver` is disconnected while trying to `send`, the `send` method
+/// will return a `SendError`. Similarly, if `Sender` is disconnected while
+/// trying to `recv`, the `recv` method will return a `RecvError`.
+///
+/// # Examples
+///
+/// ```rust
+/// use tokio::sync::mpsc;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (mut tx, mut rx) = mpsc::channel(100);
+///
+/// tokio::spawn(async move {
+/// for i in 0..10 {
+/// if let Err(_) = tx.send(i).await {
+/// println!("receiver dropped");
+/// return;
+/// }
+/// }
+/// });
+///
+/// while let Some(i) = rx.recv().await {
+/// println!("got = {}", i);
+/// }
+/// }
+/// ```
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
+ let semaphore = (semaphore::Semaphore::new(buffer), buffer);
+ let (tx, rx) = chan::channel(semaphore);
+
+ let tx = Sender::new(tx);
+ let rx = Receiver::new(rx);
+
+ (tx, rx)
+}
+
+/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
+/// representing the channel bound.
+type Semaphore = (semaphore::Semaphore, usize);
+
+impl<T> Receiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
+ Receiver { chan }
+ }
+
+ /// Receive the next value for this receiver.
+ ///
+ /// `None` is returned when all `Sender` halves have dropped, indicating
+ /// that no further values can be sent on the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("hello").await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tx.send("hello").await.unwrap();
+ /// tx.send("world").await.unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+}
+
+impl<T> futures_core::Stream for Receiver<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.get_mut().poll_recv(cx)
+ }
+}
+
+impl<T> Sender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
+ Sender { chan }
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ self.chan.poll_ready(cx).map_err(|_| SendError(()))
+ }
+
+ /// Attempts to send a message on this `Sender`, returning the message
+ /// if there was an error.
+ pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
+ self.chan.try_send(message)?;
+ Ok(())
+ }
+
+ /// Send a value, waiting until there is capacity.
+ ///
+ /// # Examples
+ ///
+ /// In the following example, each call to `send` will block until the
+ /// previously sent value was received.
+ ///
+ /// ```rust
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(_) = tx.send(i).await {
+ /// println!("receiver dropped");
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// }
+ /// }
+ /// ```
+ pub async fn send(&mut self, value: T) -> Result<(), SendError> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_ready(cx)).await?;
+
+ self.try_send(value).map_err(|_| SendError(()))
+ }
+}
+
+impl<T> futures_sink::Sink<T> for Sender<T> {
+ type Error = SendError;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Sender::poll_ready(self.get_mut(), cx)
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.as_mut().try_send(msg).map_err(|err| {
+ assert!(err.is_full(), "call `poll_ready` before sending");
+ SendError(())
+ })
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+// ===== impl SendError =====