diff options
Diffstat (limited to 'tokio-sync/src')
-rw-r--r-- | tokio-sync/src/barrier.rs | 134 | ||||
-rw-r--r-- | tokio-sync/src/lib.rs | 43 | ||||
-rw-r--r-- | tokio-sync/src/loom.rs | 38 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/block.rs | 386 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/bounded.rs | 340 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/chan.rs | 450 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/list.rs | 347 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/mod.rs | 61 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/unbounded.rs | 233 | ||||
-rw-r--r-- | tokio-sync/src/mutex.rs | 148 | ||||
-rw-r--r-- | tokio-sync/src/oneshot.rs | 576 | ||||
-rw-r--r-- | tokio-sync/src/semaphore.rs | 1142 | ||||
-rw-r--r-- | tokio-sync/src/task/atomic_waker.rs | 323 | ||||
-rw-r--r-- | tokio-sync/src/task/mod.rs | 5 | ||||
-rw-r--r-- | tokio-sync/src/watch.rs | 459 |
15 files changed, 0 insertions, 4685 deletions
diff --git a/tokio-sync/src/barrier.rs b/tokio-sync/src/barrier.rs deleted file mode 100644 index 6a409e26..00000000 --- a/tokio-sync/src/barrier.rs +++ /dev/null @@ -1,134 +0,0 @@ -use crate::watch; -use std::sync::Mutex; - -/// A barrier enables multiple threads to synchronize the beginning of some computation. -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use std::sync::Arc; -/// use tokio_sync::Barrier; -/// use futures_util::future::join_all; -/// -/// let mut handles = Vec::with_capacity(10); -/// let barrier = Arc::new(Barrier::new(10)); -/// for _ in 0..10 { -/// let c = barrier.clone(); -/// // The same messages will be printed together. -/// // You will NOT see any interleaving. -/// handles.push(async move { -/// println!("before wait"); -/// let wr = c.wait().await; -/// println!("after wait"); -/// wr -/// }); -/// } -/// // Will not resolve until all "before wait" messages have been printed -/// let wrs = join_all(handles).await; -/// // Exactly one barrier will resolve as the "leader" -/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1); -/// # } -/// ``` -#[derive(Debug)] -pub struct Barrier { - state: Mutex<BarrierState>, - wait: watch::Receiver<usize>, - n: usize, -} - -#[derive(Debug)] -struct BarrierState { - waker: watch::Sender<usize>, - arrived: usize, - generation: usize, -} - -impl Barrier { - /// Creates a new barrier that can block a given number of threads. - /// - /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all - /// threads at once when the `n`th thread calls `wait`. - pub fn new(mut n: usize) -> Barrier { - let (waker, wait) = crate::watch::channel(0); - - if n == 0 { - // if n is 0, it's not clear what behavior the user wants. - // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every - // .wait() immediately unblocks, so we adopt that here as well. - n = 1; - } - - Barrier { - state: Mutex::new(BarrierState { - waker, - arrived: 0, - generation: 1, - }), - n, - wait, - } - } - - /// Does not resolve until all tasks have rendezvoused here. - /// - /// Barriers are re-usable after all threads have rendezvoused once, and can - /// be used continuously. - /// - /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from - /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads - /// will receive a result that will return `false` from `is_leader`. - pub async fn wait(&self) -> BarrierWaitResult { - // NOTE: we are taking a _synchronous_ lock here. - // It is okay to do so because the critical section is fast and never yields, so it cannot - // deadlock even if another future is concurrently holding the lock. - // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than - // the asynchronous counter-parts, so we should use them where possible [citation needed]. - // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across - // a yield point, and thus marks the returned future as !Send. - let generation = { - let mut state = self.state.lock().unwrap(); - let generation = state.generation; - state.arrived += 1; - if state.arrived == self.n { - // we are the leader for this generation - // wake everyone, increment the generation, and return - state - .waker - .broadcast(state.generation) - .expect("there is at least one receiver"); - state.arrived = 0; - state.generation += 1; - return BarrierWaitResult(true); - } - - generation - }; - - // we're going to have to wait for the last of the generation to arrive - let mut wait = self.wait.clone(); - - loop { - // note that the first time through the loop, this _will_ yield a generation - // immediately, since we cloned a receiver that has never seen any values. - if wait.recv().await.expect("sender hasn't been closed") >= generation { - break; - } - } - - BarrierWaitResult(false) - } -} - -/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused. -#[derive(Debug, Clone)] -pub struct BarrierWaitResult(bool); - -impl BarrierWaitResult { - /// Returns true if this thread from wait is the "leader thread". - /// - /// Only one thread will have `true` returned from their result, all other threads will have - /// `false` returned. - pub fn is_leader(&self) -> bool { - self.0 - } -} diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs deleted file mode 100644 index 6055f024..00000000 --- a/tokio-sync/src/lib.rs +++ /dev/null @@ -1,43 +0,0 @@ -#![doc(html_root_url = "https://docs.rs/tokio-sync/0.2.0-alpha.6")] -#![warn( - missing_debug_implementations, - missing_docs, - rust_2018_idioms, - unreachable_pub -)] -#![deny(intra_doc_link_resolution_failure)] -#![doc(test( - no_crate_inject, - attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) -))] - -//! Asynchronous synchronization primitives. -//! -//! This crate provides primitives for synchronizing asynchronous tasks. - -macro_rules! debug { - ($($t:tt)*) => { - if false { - println!($($t)*); - } - } -} - -macro_rules! if_fuzz { - ($($t:tt)*) => {{ - if false { $($t)* } - }} -} - -mod barrier; -mod loom; -pub mod mpsc; -mod mutex; -pub mod oneshot; -pub mod semaphore; -mod task; -pub mod watch; - -pub use barrier::{Barrier, BarrierWaitResult}; -pub use mutex::{Mutex, MutexGuard}; -pub use task::AtomicWaker; diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs deleted file mode 100644 index 564efc4f..00000000 --- a/tokio-sync/src/loom.rs +++ /dev/null @@ -1,38 +0,0 @@ -pub(crate) mod future { - pub(crate) use crate::task::AtomicWaker; -} - -pub(crate) mod sync { - pub(crate) use std::sync::atomic; - pub(crate) use std::sync::Arc; - - use std::cell::UnsafeCell; - - pub(crate) struct CausalCell<T>(UnsafeCell<T>); - - impl<T> CausalCell<T> { - pub(crate) fn new(data: T) -> CausalCell<T> { - CausalCell(UnsafeCell::new(data)) - } - - pub(crate) fn with<F, R>(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn with_mut<F, R>(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } - } -} - -pub(crate) mod thread { - pub(crate) fn yield_now() { - ::std::sync::atomic::spin_loop_hint(); - } -} diff --git a/tokio-sync/src/mpsc/block.rs b/tokio-sync/src/mpsc/block.rs deleted file mode 100644 index 7d7f2e53..00000000 --- a/tokio-sync/src/mpsc/block.rs +++ /dev/null @@ -1,386 +0,0 @@ -use crate::loom::{ - sync::atomic::{AtomicPtr, AtomicUsize}, - sync::CausalCell, - thread, -}; -use std::mem::MaybeUninit; -use std::ops; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; - -/// A block in a linked list. -/// -/// Each block in the list can hold up to `BLOCK_CAP` messages. -pub(crate) struct Block<T> { - /// The start index of this block. - /// - /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. - start_index: usize, - - /// The next block in the linked list. - next: AtomicPtr<Block<T>>, - - /// Bitfield tracking slots that are ready to have their values consumed. - ready_slots: AtomicUsize, - - /// The observed `tail_position` value *after* the block has been passed by - /// `block_tail`. - observed_tail_position: CausalCell<usize>, - - /// Array containing values pushed into the block. Values are stored in a - /// continuous array in order to improve cache line behavior when reading. - /// The values must be manually dropped. - values: Values<T>, -} - -pub(crate) enum Read<T> { - Value(T), - Closed, -} - -struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]); - -use super::BLOCK_CAP; - -/// Masks an index to get the block identifier -const BLOCK_MASK: usize = !(BLOCK_CAP - 1); - -/// Masks an index to get the value offset in a block. -const SLOT_MASK: usize = BLOCK_CAP - 1; - -/// Flag tracking that a block has gone through the sender's release routine. -/// -/// When this is set, the receiver may consider freeing the block. -const RELEASED: usize = 1 << BLOCK_CAP; - -/// Flag tracking all senders dropped. -/// -/// When this flag is set, the send half of the channel has closed. -const TX_CLOSED: usize = RELEASED << 1; - -/// Mask covering all bits used to track slot readiness. -const READY_MASK: usize = RELEASED - 1; - -/// Returns the index of the first slot in the block referenced by `slot_index`. -#[inline(always)] -pub(crate) fn start_index(slot_index: usize) -> usize { - BLOCK_MASK & slot_index -} - -/// Returns the offset into the block referenced by `slot_index`. -#[inline(always)] -pub(crate) fn offset(slot_index: usize) -> usize { - SLOT_MASK & slot_index -} - -impl<T> Block<T> { - pub(crate) fn new(start_index: usize) -> Block<T> { - Block { - // The absolute index in the channel of the first slot in the block. - start_index, - - // Pointer to the next block in the linked list. - next: AtomicPtr::new(ptr::null_mut()), - - ready_slots: AtomicUsize::new(0), - - observed_tail_position: CausalCell::new(0), - - // Value storage - values: unsafe { Values::uninitialized() }, - } - } - - /// Returns `true` if the block matches the given index - pub(crate) fn is_at_index(&self, index: usize) -> bool { - debug_assert!(offset(index) == 0); - self.start_index == index - } - - /// Returns the number of blocks between `self` and the block at the - /// specified index. - /// - /// `start_index` must represent a block *after* `self`. - pub(crate) fn distance(&self, other_index: usize) -> usize { - debug_assert!(offset(other_index) == 0); - other_index.wrapping_sub(self.start_index) / BLOCK_CAP - } - - /// Read the value at the given offset. - /// - /// Returns `None` if the slot is empty. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * No concurrent access to the slot. - pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> { - let offset = offset(slot_index); - - let ready_bits = self.ready_slots.load(Acquire); - - if !is_ready(ready_bits, offset) { - if is_tx_closed(ready_bits) { - return Some(Read::Closed); - } - - return None; - } - - // Get the value - let value = self.values[offset].with(|ptr| ptr::read(ptr)); - - Some(Read::Value(value.assume_init())) - } - - /// Write a value to the block at the given offset. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * The slot is empty. - /// * No concurrent access to the slot. - pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { - // Get the offset into the block - let slot_offset = offset(slot_index); - - self.values[slot_offset].with_mut(|ptr| { - ptr::write(ptr, MaybeUninit::new(value)); - }); - - // Release the value. After this point, the slot ref may no longer - // be used. It is possible for the receiver to free the memory at - // any point. - self.set_ready(slot_offset); - } - - /// Signal to the receiver that the sender half of the list is closed. - pub(crate) unsafe fn tx_close(&self) { - self.ready_slots.fetch_or(TX_CLOSED, Release); - } - - /// Reset the block to a blank state. This enables reusing blocks in the - /// channel. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * All slots are empty. - /// * The caller holds a unique pointer to the block. - pub(crate) unsafe fn reclaim(&mut self) { - self.start_index = 0; - self.next = AtomicPtr::new(ptr::null_mut()); - self.ready_slots = AtomicUsize::new(0); - } - - /// Release the block to the rx half for freeing. - /// - /// This function is called by the tx half once it can be guaranteed that no - /// more senders will attempt to access the block. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * The block will no longer be accessed by any sender. - pub(crate) unsafe fn tx_release(&self, tail_position: usize) { - // Track the observed tail_position. Any sender targetting a greater - // tail_position is guaranteed to not access this block. - self.observed_tail_position - .with_mut(|ptr| *ptr = tail_position); - - // Set the released bit, signalling to the receiver that it is safe to - // free the block's memory as soon as all slots **prior** to - // `observed_tail_position` have been filled. - self.ready_slots.fetch_or(RELEASED, Release); - } - - /// Mark a slot as ready - fn set_ready(&self, slot: usize) { - let mask = 1 << slot; - self.ready_slots.fetch_or(mask, Release); - } - - /// Returns `true` when all slots have their `ready` bits set. - /// - /// This indicates that the block is in its final state and will no longer - /// be mutated. - /// - /// # Implementation - /// - /// The implementation walks each slot checking the `ready` flag. It might - /// be that it would make more sense to coalesce ready flags as bits in a - /// single atomic cell. However, this could have negative impact on cache - /// behavior as there would be many more mutations to a single slot. - pub(crate) fn is_final(&self) -> bool { - self.ready_slots.load(Acquire) & READY_MASK == READY_MASK - } - - /// Returns the `observed_tail_position` value, if set - pub(crate) fn observed_tail_position(&self) -> Option<usize> { - if 0 == RELEASED & self.ready_slots.load(Acquire) { - None - } else { - Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) - } - } - - /// Load the next block - pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> { - let ret = NonNull::new(self.next.load(ordering)); - - debug_assert!(unsafe { - ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) - .unwrap_or(true) - }); - - ret - } - - /// Push `block` as the next block in the link. - /// - /// Returns Ok if successful, otherwise, a pointer to the next block in - /// the list is returned. - /// - /// This requires that the next pointer is null. - /// - /// # Ordering - /// - /// This performs a compare-and-swap on `next` using AcqRel ordering. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * `block` is not freed until it has been removed from the list. - pub(crate) unsafe fn try_push( - &self, - block: &mut NonNull<Block<T>>, - ordering: Ordering, - ) -> Result<(), NonNull<Block<T>>> { - block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); - - let next_ptr = self - .next - .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); - - match NonNull::new(next_ptr) { - Some(next_ptr) => Err(next_ptr), - None => Ok(()), - } - } - - /// Grow the `Block` linked list by allocating and appending a new block. - /// - /// The next block in the linked list is returned. This may or may not be - /// the one allocated by the function call. - /// - /// # Implementation - /// - /// It is assumed that `self.next` is null. A new block is allocated with - /// `start_index` set to be the next block. A compare-and-swap is performed - /// with AcqRel memory ordering. If the compare-and-swap is successful, the - /// newly allocated block is released to other threads walking the block - /// linked list. If the compare-and-swap fails, the current thread acquires - /// the next block in the linked list, allowing the current thread to access - /// the slots. - pub(crate) fn grow(&self) -> NonNull<Block<T>> { - // Create the new block. It is assumed that the block will become the - // next one after `&self`. If this turns out to not be the case, - // `start_index` is updated accordingly. - let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); - - let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; - - // Attempt to store the block. The first compare-and-swap attempt is - // "unrolled" due to minor differences in logic - // - // `AcqRel` is used as the ordering **only** when attempting the - // compare-and-swap on self.next. - // - // If the compare-and-swap fails, then the actual value of the cell is - // returned from this function and accessed by the caller. Given this, - // the memory must be acquired. - // - // `Release` ensures that the newly allocated block is available to - // other threads acquiring the next pointer. - let next = NonNull::new(self.next.compare_and_swap( - ptr::null_mut(), - new_block.as_ptr(), - AcqRel, - )); - - let next = match next { - Some(next) => next, - None => { - // The compare-and-swap succeeded and the newly allocated block - // is successfully pushed. - return new_block; - } - }; - - // There already is a next block in the linked list. The newly allocated - // block could be dropped and the discovered next block returned; - // however, that would be wasteful. Instead, the linked list is walked - // by repeatedly attempting to compare-and-swap the pointer into the - // `next` register until the compare-and-swap succeed. - // - // Care is taken to update new_block's start_index field as appropriate. - - let mut curr = next; - - // TODO: Should this iteration be capped? - loop { - let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; - - curr = match actual { - Ok(_) => { - return next; - } - Err(curr) => curr, - }; - - // When running outside of loom, this calls `spin_loop_hint`. - thread::yield_now(); - } - } -} - -/// Returns `true` if the specificed slot has a value ready to be consumed. -fn is_ready(bits: usize, slot: usize) -> bool { - let mask = 1 << slot; - mask == mask & bits -} - -/// Returns `true` if the closed flag has been set. -fn is_tx_closed(bits: usize) -> bool { - TX_CLOSED == bits & TX_CLOSED -} - -impl<T> Values<T> { - unsafe fn uninitialized() -> Values<T> { - let mut vals = MaybeUninit::uninit(); - - // When fuzzing, `CausalCell` needs to be initialized. - if_fuzz! { - let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>; - for i in 0..BLOCK_CAP { - p.add(i) - .write(CausalCell::new(MaybeUninit::uninit())); - } - } - - Values(vals.assume_init()) - } -} - -impl<T> ops::Index<usize> for Values<T> { - type Output = CausalCell<MaybeUninit<T>>; - - fn index(&self, index: usize) -> &Self::Output { - self.0.index(index) - } -} diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs deleted file mode 100644 index 711173ae..00000000 --- a/tokio-sync/src/mpsc/bounded.rs +++ /dev/null @@ -1,340 +0,0 @@ -use super::chan; - -use std::fmt; -use std::task::{Context, Poll}; - -#[cfg(feature = "async-traits")] -use std::pin::Pin; - -/// Send values to the associated `Receiver`. -/// -/// Instances are created by the [`channel`](fn.channel.html) function. -pub struct Sender<T> { - chan: chan::Tx<T, Semaphore>, -} - -impl<T> Clone for Sender<T> { - fn clone(&self) -> Self { - Sender { - chan: self.chan.clone(), - } - } -} - -impl<T> fmt::Debug for Sender<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Sender") - .field("chan", &self.chan) - .finish() - } -} - -/// Receive values from the associated `Sender`. -/// -/// Instances are created by the [`channel`](fn.channel.html) function. -pub struct Receiver<T> { - /// The channel receiver - chan: chan::Rx<T, Semaphore>, -} - -impl<T> fmt::Debug for Receiver<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Receiver") - .field("chan", &self.chan) - .finish() - } -} - -/// Error returned by the `Sender`. -#[derive(Debug)] -pub struct SendError(()); - -/// Error returned by `Sender::try_send`. -#[derive(Debug)] -pub struct TrySendError<T> { - kind: ErrorKind, - value: T, -} - -#[derive(Debug)] -enum ErrorKind { - Closed, - NoCapacity, -} - -/// Error returned by `Receiver`. -#[derive(Debug)] -pub struct RecvError(()); - -/// Create a bounded mpsc channel for communicating between asynchronous tasks, -/// returning the sender/receiver halves. -/// -/// All data sent on `Sender` will become available on `Receiver` in the same -/// order as it was sent. -/// -/// The `Sender` can be cloned to `send` to the same channel from multiple code -/// locations. Only one `Receiver` is supported. -/// -/// If the `Receiver` is disconnected while trying to `send`, the `send` method -/// will return a `SendError`. Similarly, if `Sender` is disconnected while -/// trying to `recv`, the `recv` method will return a `RecvError`. -/// -/// # Examples -/// -/// ```rust -/// use tokio::sync::mpsc; -/// -/// #[tokio::main] -/// async fn main() { -/// let (mut tx, mut rx) = mpsc::channel(100); -/// -/// tokio::spawn(async move { -/// for i in 0..10 { -/// if let Err(_) = tx.send(i).await { -/// println!("receiver dropped"); -/// return; -/// } -/// } -/// }); -/// -/// while let Some(i) = rx.recv().await { -/// println!("got = {}", i); -/// } -/// } -/// ``` -pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { - assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); - let semaphore = (crate::semaphore::Semaphore::new(buffer), buffer); - let (tx, rx) = chan::channel(semaphore); - - let tx = Sender::new(tx); - let rx = Receiver::new(rx); - - (tx, rx) -} - -/// Channel semaphore is a tuple of the semaphore implementation and a `usize` -/// representing the channel bound. -type Semaphore = (crate::semaphore::Semaphore, usize); - -impl<T> Receiver<T> { - pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> { - Receiver { chan } - } - - /// Receive the next value for this receiver. - /// - /// `None` is returned when all `Sender` halves have dropped, indicating - /// that no further values can be sent on the channel. - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); - /// - /// tokio::spawn(async move { - /// tx.send("hello").await.unwrap(); - /// }); - /// - /// assert_eq!(Some("hello"), rx.recv().await); - /// assert_eq!(None, rx.recv().await); - /// } - /// ``` - /// - /// Values are buffered: - /// - /// ``` - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); - /// - /// tx.send("hello").await.unwrap(); - /// tx.send("world").await.unwrap(); - /// - /// assert_eq!(Some("hello"), rx.recv().await); - /// assert_eq!(Some("world"), rx.recv().await); - /// } - /// ``` - pub async fn recv(&mut self) -> Option<T> { - use futures_util::future::poll_fn; - - poll_fn(|cx| self.poll_recv(cx)).await - } - - #[doc(hidden)] // TODO: remove - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.chan.recv(cx) - } - - /// Closes the receiving half of a channel, without dropping it. - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - pub fn close(&mut self) { - self.chan.close(); - } -} - -#[cfg(feature = "async-traits")] -impl<T> futures_core::Stream for Receiver<T> { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.get_mut().poll_recv(cx) - } -} - -impl<T> Sender<T> { - pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> { - Sender { chan } - } - - #[doc(hidden)] // TODO: remove - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { - self.chan.poll_ready(cx).map_err(|_| SendError(())) - } - - /// Attempts to send a message on this `Sender`, returning the message - /// if there was an error. - pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { - self.chan.try_send(message)?; - Ok(()) - } - - /// Send a value, waiting until there is capacity. - /// - /// # Examples - /// - /// In the following example, each call to `send` will block until the - /// previously sent value was received. - /// - /// ```rust - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); - /// - /// tokio::spawn(async move { - /// for i in 0..10 { - /// if let Err(_) = tx.send(i).await { - /// println!("receiver dropped"); - /// return; - /// } - /// } - /// }); - /// - /// while let Some(i) = rx.recv().await { - /// println!("got = {}", i); - /// } - /// } - /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError> { - use futures_util::future::poll_fn; - - poll_fn(|cx| self.poll_ready(cx)).await?; - - self.try_send(value).map_err(|_| SendError(())) - } -} - -#[cfg(feature = "async-traits")] -impl<T> futures_sink::Sink<T> for Sender<T> { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Sender::poll_ready(self.get_mut(), cx) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.as_mut().try_send(msg).map_err(|err| { - assert!(err.is_full(), "call `poll_ready` before sending"); - SendError(()) - }) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } -} - -// ===== impl SendError ===== - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for SendError {} - -// ===== impl TrySendError ===== - -impl<T> TrySendError<T> { - /// Get the inner value. - pub fn into_inner(self) -> T { - self.value - } - - /// Did the send fail because the channel has been closed? - pub fn is_closed(&self) -> bool { - if let ErrorKind::Closed = self.kind { - true - } else { - false - } - } - - /// Did the send fail because the channel was at capacity? - pub fn is_full(&self) -> bool { - if let ErrorKind::NoCapacity = self.kind { - true - } else { - false - } - } -} - -impl<T: fmt::Debug> fmt::Display for TrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = match self.kind { - ErrorKind::Closed => "channel closed", - ErrorKind::NoCapacity => "no available capacity", - }; - write!(fmt, "{}", descr) - } -} - -impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {} - -impl<T> From<(T, chan::TrySendError)> for TrySendError<T> { - fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> { - TrySendError { - value, - kind: match err { - chan::TrySendError::Closed => ErrorKind::Closed, - chan::TrySendError::NoPermits => ErrorKind::NoCapacity, - }, - } - } -} - -// ===== impl RecvError ===== - -impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for RecvError {} diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs deleted file mode 100644 index 122537fc..00000000 --- a/tokio-sync/src/mpsc/chan.rs +++ /dev/null @@ -1,450 +0,0 @@ -use super::list; -use crate::loom::{ - future::AtomicWaker, - sync::atomic::AtomicUsize, - sync::{Arc, CausalCell}, -}; -use std::fmt; -use std::process; -use std::sync::atomic::Ordering::{AcqRel, Relaxed}; -use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; - -/// Chan |