summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src')
-rw-r--r--tokio-sync/src/barrier.rs134
-rw-r--r--tokio-sync/src/lib.rs43
-rw-r--r--tokio-sync/src/loom.rs38
-rw-r--r--tokio-sync/src/mpsc/block.rs386
-rw-r--r--tokio-sync/src/mpsc/bounded.rs340
-rw-r--r--tokio-sync/src/mpsc/chan.rs450
-rw-r--r--tokio-sync/src/mpsc/list.rs347
-rw-r--r--tokio-sync/src/mpsc/mod.rs61
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs233
-rw-r--r--tokio-sync/src/mutex.rs148
-rw-r--r--tokio-sync/src/oneshot.rs576
-rw-r--r--tokio-sync/src/semaphore.rs1142
-rw-r--r--tokio-sync/src/task/atomic_waker.rs323
-rw-r--r--tokio-sync/src/task/mod.rs5
-rw-r--r--tokio-sync/src/watch.rs459
15 files changed, 0 insertions, 4685 deletions
diff --git a/tokio-sync/src/barrier.rs b/tokio-sync/src/barrier.rs
deleted file mode 100644
index 6a409e26..00000000
--- a/tokio-sync/src/barrier.rs
+++ /dev/null
@@ -1,134 +0,0 @@
-use crate::watch;
-use std::sync::Mutex;
-
-/// A barrier enables multiple threads to synchronize the beginning of some computation.
-///
-/// ```
-/// # #[tokio::main]
-/// # async fn main() {
-/// use std::sync::Arc;
-/// use tokio_sync::Barrier;
-/// use futures_util::future::join_all;
-///
-/// let mut handles = Vec::with_capacity(10);
-/// let barrier = Arc::new(Barrier::new(10));
-/// for _ in 0..10 {
-/// let c = barrier.clone();
-/// // The same messages will be printed together.
-/// // You will NOT see any interleaving.
-/// handles.push(async move {
-/// println!("before wait");
-/// let wr = c.wait().await;
-/// println!("after wait");
-/// wr
-/// });
-/// }
-/// // Will not resolve until all "before wait" messages have been printed
-/// let wrs = join_all(handles).await;
-/// // Exactly one barrier will resolve as the "leader"
-/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1);
-/// # }
-/// ```
-#[derive(Debug)]
-pub struct Barrier {
- state: Mutex<BarrierState>,
- wait: watch::Receiver<usize>,
- n: usize,
-}
-
-#[derive(Debug)]
-struct BarrierState {
- waker: watch::Sender<usize>,
- arrived: usize,
- generation: usize,
-}
-
-impl Barrier {
- /// Creates a new barrier that can block a given number of threads.
- ///
- /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all
- /// threads at once when the `n`th thread calls `wait`.
- pub fn new(mut n: usize) -> Barrier {
- let (waker, wait) = crate::watch::channel(0);
-
- if n == 0 {
- // if n is 0, it's not clear what behavior the user wants.
- // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every
- // .wait() immediately unblocks, so we adopt that here as well.
- n = 1;
- }
-
- Barrier {
- state: Mutex::new(BarrierState {
- waker,
- arrived: 0,
- generation: 1,
- }),
- n,
- wait,
- }
- }
-
- /// Does not resolve until all tasks have rendezvoused here.
- ///
- /// Barriers are re-usable after all threads have rendezvoused once, and can
- /// be used continuously.
- ///
- /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from
- /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads
- /// will receive a result that will return `false` from `is_leader`.
- pub async fn wait(&self) -> BarrierWaitResult {
- // NOTE: we are taking a _synchronous_ lock here.
- // It is okay to do so because the critical section is fast and never yields, so it cannot
- // deadlock even if another future is concurrently holding the lock.
- // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than
- // the asynchronous counter-parts, so we should use them where possible [citation needed].
- // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
- // a yield point, and thus marks the returned future as !Send.
- let generation = {
- let mut state = self.state.lock().unwrap();
- let generation = state.generation;
- state.arrived += 1;
- if state.arrived == self.n {
- // we are the leader for this generation
- // wake everyone, increment the generation, and return
- state
- .waker
- .broadcast(state.generation)
- .expect("there is at least one receiver");
- state.arrived = 0;
- state.generation += 1;
- return BarrierWaitResult(true);
- }
-
- generation
- };
-
- // we're going to have to wait for the last of the generation to arrive
- let mut wait = self.wait.clone();
-
- loop {
- // note that the first time through the loop, this _will_ yield a generation
- // immediately, since we cloned a receiver that has never seen any values.
- if wait.recv().await.expect("sender hasn't been closed") >= generation {
- break;
- }
- }
-
- BarrierWaitResult(false)
- }
-}
-
-/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused.
-#[derive(Debug, Clone)]
-pub struct BarrierWaitResult(bool);
-
-impl BarrierWaitResult {
- /// Returns true if this thread from wait is the "leader thread".
- ///
- /// Only one thread will have `true` returned from their result, all other threads will have
- /// `false` returned.
- pub fn is_leader(&self) -> bool {
- self.0
- }
-}
diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs
deleted file mode 100644
index 6055f024..00000000
--- a/tokio-sync/src/lib.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-#![doc(html_root_url = "https://docs.rs/tokio-sync/0.2.0-alpha.6")]
-#![warn(
- missing_debug_implementations,
- missing_docs,
- rust_2018_idioms,
- unreachable_pub
-)]
-#![deny(intra_doc_link_resolution_failure)]
-#![doc(test(
- no_crate_inject,
- attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
-))]
-
-//! Asynchronous synchronization primitives.
-//!
-//! This crate provides primitives for synchronizing asynchronous tasks.
-
-macro_rules! debug {
- ($($t:tt)*) => {
- if false {
- println!($($t)*);
- }
- }
-}
-
-macro_rules! if_fuzz {
- ($($t:tt)*) => {{
- if false { $($t)* }
- }}
-}
-
-mod barrier;
-mod loom;
-pub mod mpsc;
-mod mutex;
-pub mod oneshot;
-pub mod semaphore;
-mod task;
-pub mod watch;
-
-pub use barrier::{Barrier, BarrierWaitResult};
-pub use mutex::{Mutex, MutexGuard};
-pub use task::AtomicWaker;
diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs
deleted file mode 100644
index 564efc4f..00000000
--- a/tokio-sync/src/loom.rs
+++ /dev/null
@@ -1,38 +0,0 @@
-pub(crate) mod future {
- pub(crate) use crate::task::AtomicWaker;
-}
-
-pub(crate) mod sync {
- pub(crate) use std::sync::atomic;
- pub(crate) use std::sync::Arc;
-
- use std::cell::UnsafeCell;
-
- pub(crate) struct CausalCell<T>(UnsafeCell<T>);
-
- impl<T> CausalCell<T> {
- pub(crate) fn new(data: T) -> CausalCell<T> {
- CausalCell(UnsafeCell::new(data))
- }
-
- pub(crate) fn with<F, R>(&self, f: F) -> R
- where
- F: FnOnce(*const T) -> R,
- {
- f(self.0.get())
- }
-
- pub(crate) fn with_mut<F, R>(&self, f: F) -> R
- where
- F: FnOnce(*mut T) -> R,
- {
- f(self.0.get())
- }
- }
-}
-
-pub(crate) mod thread {
- pub(crate) fn yield_now() {
- ::std::sync::atomic::spin_loop_hint();
- }
-}
diff --git a/tokio-sync/src/mpsc/block.rs b/tokio-sync/src/mpsc/block.rs
deleted file mode 100644
index 7d7f2e53..00000000
--- a/tokio-sync/src/mpsc/block.rs
+++ /dev/null
@@ -1,386 +0,0 @@
-use crate::loom::{
- sync::atomic::{AtomicPtr, AtomicUsize},
- sync::CausalCell,
- thread,
-};
-use std::mem::MaybeUninit;
-use std::ops;
-use std::ptr::{self, NonNull};
-use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
-
-/// A block in a linked list.
-///
-/// Each block in the list can hold up to `BLOCK_CAP` messages.
-pub(crate) struct Block<T> {
- /// The start index of this block.
- ///
- /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
- start_index: usize,
-
- /// The next block in the linked list.
- next: AtomicPtr<Block<T>>,
-
- /// Bitfield tracking slots that are ready to have their values consumed.
- ready_slots: AtomicUsize,
-
- /// The observed `tail_position` value *after* the block has been passed by
- /// `block_tail`.
- observed_tail_position: CausalCell<usize>,
-
- /// Array containing values pushed into the block. Values are stored in a
- /// continuous array in order to improve cache line behavior when reading.
- /// The values must be manually dropped.
- values: Values<T>,
-}
-
-pub(crate) enum Read<T> {
- Value(T),
- Closed,
-}
-
-struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]);
-
-use super::BLOCK_CAP;
-
-/// Masks an index to get the block identifier
-const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
-
-/// Masks an index to get the value offset in a block.
-const SLOT_MASK: usize = BLOCK_CAP - 1;
-
-/// Flag tracking that a block has gone through the sender's release routine.
-///
-/// When this is set, the receiver may consider freeing the block.
-const RELEASED: usize = 1 << BLOCK_CAP;
-
-/// Flag tracking all senders dropped.
-///
-/// When this flag is set, the send half of the channel has closed.
-const TX_CLOSED: usize = RELEASED << 1;
-
-/// Mask covering all bits used to track slot readiness.
-const READY_MASK: usize = RELEASED - 1;
-
-/// Returns the index of the first slot in the block referenced by `slot_index`.
-#[inline(always)]
-pub(crate) fn start_index(slot_index: usize) -> usize {
- BLOCK_MASK & slot_index
-}
-
-/// Returns the offset into the block referenced by `slot_index`.
-#[inline(always)]
-pub(crate) fn offset(slot_index: usize) -> usize {
- SLOT_MASK & slot_index
-}
-
-impl<T> Block<T> {
- pub(crate) fn new(start_index: usize) -> Block<T> {
- Block {
- // The absolute index in the channel of the first slot in the block.
- start_index,
-
- // Pointer to the next block in the linked list.
- next: AtomicPtr::new(ptr::null_mut()),
-
- ready_slots: AtomicUsize::new(0),
-
- observed_tail_position: CausalCell::new(0),
-
- // Value storage
- values: unsafe { Values::uninitialized() },
- }
- }
-
- /// Returns `true` if the block matches the given index
- pub(crate) fn is_at_index(&self, index: usize) -> bool {
- debug_assert!(offset(index) == 0);
- self.start_index == index
- }
-
- /// Returns the number of blocks between `self` and the block at the
- /// specified index.
- ///
- /// `start_index` must represent a block *after* `self`.
- pub(crate) fn distance(&self, other_index: usize) -> usize {
- debug_assert!(offset(other_index) == 0);
- other_index.wrapping_sub(self.start_index) / BLOCK_CAP
- }
-
- /// Read the value at the given offset.
- ///
- /// Returns `None` if the slot is empty.
- ///
- /// # Safety
- ///
- /// To maintain safety, the caller must ensure:
- ///
- /// * No concurrent access to the slot.
- pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
- let offset = offset(slot_index);
-
- let ready_bits = self.ready_slots.load(Acquire);
-
- if !is_ready(ready_bits, offset) {
- if is_tx_closed(ready_bits) {
- return Some(Read::Closed);
- }
-
- return None;
- }
-
- // Get the value
- let value = self.values[offset].with(|ptr| ptr::read(ptr));
-
- Some(Read::Value(value.assume_init()))
- }
-
- /// Write a value to the block at the given offset.
- ///
- /// # Safety
- ///
- /// To maintain safety, the caller must ensure:
- ///
- /// * The slot is empty.
- /// * No concurrent access to the slot.
- pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
- // Get the offset into the block
- let slot_offset = offset(slot_index);
-
- self.values[slot_offset].with_mut(|ptr| {
- ptr::write(ptr, MaybeUninit::new(value));
- });
-
- // Release the value. After this point, the slot ref may no longer
- // be used. It is possible for the receiver to free the memory at
- // any point.
- self.set_ready(slot_offset);
- }
-
- /// Signal to the receiver that the sender half of the list is closed.
- pub(crate) unsafe fn tx_close(&self) {
- self.ready_slots.fetch_or(TX_CLOSED, Release);
- }
-
- /// Reset the block to a blank state. This enables reusing blocks in the
- /// channel.
- ///
- /// # Safety
- ///
- /// To maintain safety, the caller must ensure:
- ///
- /// * All slots are empty.
- /// * The caller holds a unique pointer to the block.
- pub(crate) unsafe fn reclaim(&mut self) {
- self.start_index = 0;
- self.next = AtomicPtr::new(ptr::null_mut());
- self.ready_slots = AtomicUsize::new(0);
- }
-
- /// Release the block to the rx half for freeing.
- ///
- /// This function is called by the tx half once it can be guaranteed that no
- /// more senders will attempt to access the block.
- ///
- /// # Safety
- ///
- /// To maintain safety, the caller must ensure:
- ///
- /// * The block will no longer be accessed by any sender.
- pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
- // Track the observed tail_position. Any sender targetting a greater
- // tail_position is guaranteed to not access this block.
- self.observed_tail_position
- .with_mut(|ptr| *ptr = tail_position);
-
- // Set the released bit, signalling to the receiver that it is safe to
- // free the block's memory as soon as all slots **prior** to
- // `observed_tail_position` have been filled.
- self.ready_slots.fetch_or(RELEASED, Release);
- }
-
- /// Mark a slot as ready
- fn set_ready(&self, slot: usize) {
- let mask = 1 << slot;
- self.ready_slots.fetch_or(mask, Release);
- }
-
- /// Returns `true` when all slots have their `ready` bits set.
- ///
- /// This indicates that the block is in its final state and will no longer
- /// be mutated.
- ///
- /// # Implementation
- ///
- /// The implementation walks each slot checking the `ready` flag. It might
- /// be that it would make more sense to coalesce ready flags as bits in a
- /// single atomic cell. However, this could have negative impact on cache
- /// behavior as there would be many more mutations to a single slot.
- pub(crate) fn is_final(&self) -> bool {
- self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
- }
-
- /// Returns the `observed_tail_position` value, if set
- pub(crate) fn observed_tail_position(&self) -> Option<usize> {
- if 0 == RELEASED & self.ready_slots.load(Acquire) {
- None
- } else {
- Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
- }
- }
-
- /// Load the next block
- pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
- let ret = NonNull::new(self.next.load(ordering));
-
- debug_assert!(unsafe {
- ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
- .unwrap_or(true)
- });
-
- ret
- }
-
- /// Push `block` as the next block in the link.
- ///
- /// Returns Ok if successful, otherwise, a pointer to the next block in
- /// the list is returned.
- ///
- /// This requires that the next pointer is null.
- ///
- /// # Ordering
- ///
- /// This performs a compare-and-swap on `next` using AcqRel ordering.
- ///
- /// # Safety
- ///
- /// To maintain safety, the caller must ensure:
- ///
- /// * `block` is not freed until it has been removed from the list.
- pub(crate) unsafe fn try_push(
- &self,
- block: &mut NonNull<Block<T>>,
- ordering: Ordering,
- ) -> Result<(), NonNull<Block<T>>> {
- block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
-
- let next_ptr = self
- .next
- .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
-
- match NonNull::new(next_ptr) {
- Some(next_ptr) => Err(next_ptr),
- None => Ok(()),
- }
- }
-
- /// Grow the `Block` linked list by allocating and appending a new block.
- ///
- /// The next block in the linked list is returned. This may or may not be
- /// the one allocated by the function call.
- ///
- /// # Implementation
- ///
- /// It is assumed that `self.next` is null. A new block is allocated with
- /// `start_index` set to be the next block. A compare-and-swap is performed
- /// with AcqRel memory ordering. If the compare-and-swap is successful, the
- /// newly allocated block is released to other threads walking the block
- /// linked list. If the compare-and-swap fails, the current thread acquires
- /// the next block in the linked list, allowing the current thread to access
- /// the slots.
- pub(crate) fn grow(&self) -> NonNull<Block<T>> {
- // Create the new block. It is assumed that the block will become the
- // next one after `&self`. If this turns out to not be the case,
- // `start_index` is updated accordingly.
- let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
-
- let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
-
- // Attempt to store the block. The first compare-and-swap attempt is
- // "unrolled" due to minor differences in logic
- //
- // `AcqRel` is used as the ordering **only** when attempting the
- // compare-and-swap on self.next.
- //
- // If the compare-and-swap fails, then the actual value of the cell is
- // returned from this function and accessed by the caller. Given this,
- // the memory must be acquired.
- //
- // `Release` ensures that the newly allocated block is available to
- // other threads acquiring the next pointer.
- let next = NonNull::new(self.next.compare_and_swap(
- ptr::null_mut(),
- new_block.as_ptr(),
- AcqRel,
- ));
-
- let next = match next {
- Some(next) => next,
- None => {
- // The compare-and-swap succeeded and the newly allocated block
- // is successfully pushed.
- return new_block;
- }
- };
-
- // There already is a next block in the linked list. The newly allocated
- // block could be dropped and the discovered next block returned;
- // however, that would be wasteful. Instead, the linked list is walked
- // by repeatedly attempting to compare-and-swap the pointer into the
- // `next` register until the compare-and-swap succeed.
- //
- // Care is taken to update new_block's start_index field as appropriate.
-
- let mut curr = next;
-
- // TODO: Should this iteration be capped?
- loop {
- let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
-
- curr = match actual {
- Ok(_) => {
- return next;
- }
- Err(curr) => curr,
- };
-
- // When running outside of loom, this calls `spin_loop_hint`.
- thread::yield_now();
- }
- }
-}
-
-/// Returns `true` if the specificed slot has a value ready to be consumed.
-fn is_ready(bits: usize, slot: usize) -> bool {
- let mask = 1 << slot;
- mask == mask & bits
-}
-
-/// Returns `true` if the closed flag has been set.
-fn is_tx_closed(bits: usize) -> bool {
- TX_CLOSED == bits & TX_CLOSED
-}
-
-impl<T> Values<T> {
- unsafe fn uninitialized() -> Values<T> {
- let mut vals = MaybeUninit::uninit();
-
- // When fuzzing, `CausalCell` needs to be initialized.
- if_fuzz! {
- let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
- for i in 0..BLOCK_CAP {
- p.add(i)
- .write(CausalCell::new(MaybeUninit::uninit()));
- }
- }
-
- Values(vals.assume_init())
- }
-}
-
-impl<T> ops::Index<usize> for Values<T> {
- type Output = CausalCell<MaybeUninit<T>>;
-
- fn index(&self, index: usize) -> &Self::Output {
- self.0.index(index)
- }
-}
diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs
deleted file mode 100644
index 711173ae..00000000
--- a/tokio-sync/src/mpsc/bounded.rs
+++ /dev/null
@@ -1,340 +0,0 @@
-use super::chan;
-
-use std::fmt;
-use std::task::{Context, Poll};
-
-#[cfg(feature = "async-traits")]
-use std::pin::Pin;
-
-/// Send values to the associated `Receiver`.
-///
-/// Instances are created by the [`channel`](fn.channel.html) function.
-pub struct Sender<T> {
- chan: chan::Tx<T, Semaphore>,
-}
-
-impl<T> Clone for Sender<T> {
- fn clone(&self) -> Self {
- Sender {
- chan: self.chan.clone(),
- }
- }
-}
-
-impl<T> fmt::Debug for Sender<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Sender")
- .field("chan", &self.chan)
- .finish()
- }
-}
-
-/// Receive values from the associated `Sender`.
-///
-/// Instances are created by the [`channel`](fn.channel.html) function.
-pub struct Receiver<T> {
- /// The channel receiver
- chan: chan::Rx<T, Semaphore>,
-}
-
-impl<T> fmt::Debug for Receiver<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Receiver")
- .field("chan", &self.chan)
- .finish()
- }
-}
-
-/// Error returned by the `Sender`.
-#[derive(Debug)]
-pub struct SendError(());
-
-/// Error returned by `Sender::try_send`.
-#[derive(Debug)]
-pub struct TrySendError<T> {
- kind: ErrorKind,
- value: T,
-}
-
-#[derive(Debug)]
-enum ErrorKind {
- Closed,
- NoCapacity,
-}
-
-/// Error returned by `Receiver`.
-#[derive(Debug)]
-pub struct RecvError(());
-
-/// Create a bounded mpsc channel for communicating between asynchronous tasks,
-/// returning the sender/receiver halves.
-///
-/// All data sent on `Sender` will become available on `Receiver` in the same
-/// order as it was sent.
-///
-/// The `Sender` can be cloned to `send` to the same channel from multiple code
-/// locations. Only one `Receiver` is supported.
-///
-/// If the `Receiver` is disconnected while trying to `send`, the `send` method
-/// will return a `SendError`. Similarly, if `Sender` is disconnected while
-/// trying to `recv`, the `recv` method will return a `RecvError`.
-///
-/// # Examples
-///
-/// ```rust
-/// use tokio::sync::mpsc;
-///
-/// #[tokio::main]
-/// async fn main() {
-/// let (mut tx, mut rx) = mpsc::channel(100);
-///
-/// tokio::spawn(async move {
-/// for i in 0..10 {
-/// if let Err(_) = tx.send(i).await {
-/// println!("receiver dropped");
-/// return;
-/// }
-/// }
-/// });
-///
-/// while let Some(i) = rx.recv().await {
-/// println!("got = {}", i);
-/// }
-/// }
-/// ```
-pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
- assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
- let semaphore = (crate::semaphore::Semaphore::new(buffer), buffer);
- let (tx, rx) = chan::channel(semaphore);
-
- let tx = Sender::new(tx);
- let rx = Receiver::new(rx);
-
- (tx, rx)
-}
-
-/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
-/// representing the channel bound.
-type Semaphore = (crate::semaphore::Semaphore, usize);
-
-impl<T> Receiver<T> {
- pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
- Receiver { chan }
- }
-
- /// Receive the next value for this receiver.
- ///
- /// `None` is returned when all `Sender` halves have dropped, indicating
- /// that no further values can be sent on the channel.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::sync::mpsc;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(100);
- ///
- /// tokio::spawn(async move {
- /// tx.send("hello").await.unwrap();
- /// });
- ///
- /// assert_eq!(Some("hello"), rx.recv().await);
- /// assert_eq!(None, rx.recv().await);
- /// }
- /// ```
- ///
- /// Values are buffered:
- ///
- /// ```
- /// use tokio::sync::mpsc;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(100);
- ///
- /// tx.send("hello").await.unwrap();
- /// tx.send("world").await.unwrap();
- ///
- /// assert_eq!(Some("hello"), rx.recv().await);
- /// assert_eq!(Some("world"), rx.recv().await);
- /// }
- /// ```
- pub async fn recv(&mut self) -> Option<T> {
- use futures_util::future::poll_fn;
-
- poll_fn(|cx| self.poll_recv(cx)).await
- }
-
- #[doc(hidden)] // TODO: remove
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.chan.recv(cx)
- }
-
- /// Closes the receiving half of a channel, without dropping it.
- ///
- /// This prevents any further messages from being sent on the channel while
- /// still enabling the receiver to drain messages that are buffered.
- pub fn close(&mut self) {
- self.chan.close();
- }
-}
-
-#[cfg(feature = "async-traits")]
-impl<T> futures_core::Stream for Receiver<T> {
- type Item = T;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.get_mut().poll_recv(cx)
- }
-}
-
-impl<T> Sender<T> {
- pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
- Sender { chan }
- }
-
- #[doc(hidden)] // TODO: remove
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
- self.chan.poll_ready(cx).map_err(|_| SendError(()))
- }
-
- /// Attempts to send a message on this `Sender`, returning the message
- /// if there was an error.
- pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
- self.chan.try_send(message)?;
- Ok(())
- }
-
- /// Send a value, waiting until there is capacity.
- ///
- /// # Examples
- ///
- /// In the following example, each call to `send` will block until the
- /// previously sent value was received.
- ///
- /// ```rust
- /// use tokio::sync::mpsc;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(1);
- ///
- /// tokio::spawn(async move {
- /// for i in 0..10 {
- /// if let Err(_) = tx.send(i).await {
- /// println!("receiver dropped");
- /// return;
- /// }
- /// }
- /// });
- ///
- /// while let Some(i) = rx.recv().await {
- /// println!("got = {}", i);
- /// }
- /// }
- /// ```
- pub async fn send(&mut self, value: T) -> Result<(), SendError> {
- use futures_util::future::poll_fn;
-
- poll_fn(|cx| self.poll_ready(cx)).await?;
-
- self.try_send(value).map_err(|_| SendError(()))
- }
-}
-
-#[cfg(feature = "async-traits")]
-impl<T> futures_sink::Sink<T> for Sender<T> {
- type Error = SendError;
-
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Sender::poll_ready(self.get_mut(), cx)
- }
-
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.as_mut().try_send(msg).map_err(|err| {
- assert!(err.is_full(), "call `poll_ready` before sending");
- SendError(())
- })
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-// ===== impl SendError =====
-
-impl fmt::Display for SendError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for SendError {}
-
-// ===== impl TrySendError =====
-
-impl<T> TrySendError<T> {
- /// Get the inner value.
- pub fn into_inner(self) -> T {
- self.value
- }
-
- /// Did the send fail because the channel has been closed?
- pub fn is_closed(&self) -> bool {
- if let ErrorKind::Closed = self.kind {
- true
- } else {
- false
- }
- }
-
- /// Did the send fail because the channel was at capacity?
- pub fn is_full(&self) -> bool {
- if let ErrorKind::NoCapacity = self.kind {
- true
- } else {
- false
- }
- }
-}
-
-impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- let descr = match self.kind {
- ErrorKind::Closed => "channel closed",
- ErrorKind::NoCapacity => "no available capacity",
- };
- write!(fmt, "{}", descr)
- }
-}
-
-impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {}
-
-impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
- fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
- TrySendError {
- value,
- kind: match err {
- chan::TrySendError::Closed => ErrorKind::Closed,
- chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
- },
- }
- }
-}
-
-// ===== impl RecvError =====
-
-impl fmt::Display for RecvError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for RecvError {}
diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs
deleted file mode 100644
index 122537fc..00000000
--- a/tokio-sync/src/mpsc/chan.rs
+++ /dev/null
@@ -1,450 +0,0 @@
-use super::list;
-use crate::loom::{
- future::AtomicWaker,
- sync::atomic::AtomicUsize,
- sync::{Arc, CausalCell},
-};
-use std::fmt;
-use std::process;
-use std::sync::atomic::Ordering::{AcqRel, Relaxed};
-use std::task::Poll::{Pending, Ready};
-use std::task::{Context, Poll};
-
-/// Chan