From 2b909d6805990abf0bc2a5dea9e7267ff87df704 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 29 Oct 2019 15:11:31 -0700 Subject: sync: move into `tokio` crate (#1705) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- tokio/src/sync/mpsc/block.rs | 387 +++++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/bounded.rs | 337 +++++++++++++++++++++++++++++ tokio/src/sync/mpsc/chan.rs | 451 +++++++++++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/list.rs | 348 ++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/mod.rs | 67 ++++++ tokio/src/sync/mpsc/unbounded.rs | 230 ++++++++++++++++++++ 6 files changed, 1820 insertions(+) create mode 100644 tokio/src/sync/mpsc/block.rs create mode 100644 tokio/src/sync/mpsc/bounded.rs create mode 100644 tokio/src/sync/mpsc/chan.rs create mode 100644 tokio/src/sync/mpsc/list.rs create mode 100644 tokio/src/sync/mpsc/mod.rs create mode 100644 tokio/src/sync/mpsc/unbounded.rs (limited to 'tokio/src/sync/mpsc') diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs new file mode 100644 index 00000000..aea69384 --- /dev/null +++ b/tokio/src/sync/mpsc/block.rs @@ -0,0 +1,387 @@ +use crate::sync::loom::{ + sync::atomic::{AtomicPtr, AtomicUsize}, + sync::CausalCell, + thread, +}; + +use std::mem::MaybeUninit; +use std::ops; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; + +/// A block in a linked list. +/// +/// Each block in the list can hold up to `BLOCK_CAP` messages. +pub(crate) struct Block { + /// The start index of this block. + /// + /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. + start_index: usize, + + /// The next block in the linked list. + next: AtomicPtr>, + + /// Bitfield tracking slots that are ready to have their values consumed. + ready_slots: AtomicUsize, + + /// The observed `tail_position` value *after* the block has been passed by + /// `block_tail`. + observed_tail_position: CausalCell, + + /// Array containing values pushed into the block. Values are stored in a + /// continuous array in order to improve cache line behavior when reading. + /// The values must be manually dropped. + values: Values, +} + +pub(crate) enum Read { + Value(T), + Closed, +} + +struct Values([CausalCell>; BLOCK_CAP]); + +use super::BLOCK_CAP; + +/// Masks an index to get the block identifier +const BLOCK_MASK: usize = !(BLOCK_CAP - 1); + +/// Masks an index to get the value offset in a block. +const SLOT_MASK: usize = BLOCK_CAP - 1; + +/// Flag tracking that a block has gone through the sender's release routine. +/// +/// When this is set, the receiver may consider freeing the block. +const RELEASED: usize = 1 << BLOCK_CAP; + +/// Flag tracking all senders dropped. +/// +/// When this flag is set, the send half of the channel has closed. +const TX_CLOSED: usize = RELEASED << 1; + +/// Mask covering all bits used to track slot readiness. +const READY_MASK: usize = RELEASED - 1; + +/// Returns the index of the first slot in the block referenced by `slot_index`. +#[inline(always)] +pub(crate) fn start_index(slot_index: usize) -> usize { + BLOCK_MASK & slot_index +} + +/// Returns the offset into the block referenced by `slot_index`. +#[inline(always)] +pub(crate) fn offset(slot_index: usize) -> usize { + SLOT_MASK & slot_index +} + +impl Block { + pub(crate) fn new(start_index: usize) -> Block { + Block { + // The absolute index in the channel of the first slot in the block. + start_index, + + // Pointer to the next block in the linked list. + next: AtomicPtr::new(ptr::null_mut()), + + ready_slots: AtomicUsize::new(0), + + observed_tail_position: CausalCell::new(0), + + // Value storage + values: unsafe { Values::uninitialized() }, + } + } + + /// Returns `true` if the block matches the given index + pub(crate) fn is_at_index(&self, index: usize) -> bool { + debug_assert!(offset(index) == 0); + self.start_index == index + } + + /// Returns the number of blocks between `self` and the block at the + /// specified index. + /// + /// `start_index` must represent a block *after* `self`. + pub(crate) fn distance(&self, other_index: usize) -> usize { + debug_assert!(offset(other_index) == 0); + other_index.wrapping_sub(self.start_index) / BLOCK_CAP + } + + /// Read the value at the given offset. + /// + /// Returns `None` if the slot is empty. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * No concurrent access to the slot. + pub(crate) unsafe fn read(&self, slot_index: usize) -> Option> { + let offset = offset(slot_index); + + let ready_bits = self.ready_slots.load(Acquire); + + if !is_ready(ready_bits, offset) { + if is_tx_closed(ready_bits) { + return Some(Read::Closed); + } + + return None; + } + + // Get the value + let value = self.values[offset].with(|ptr| ptr::read(ptr)); + + Some(Read::Value(value.assume_init())) + } + + /// Write a value to the block at the given offset. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * The slot is empty. + /// * No concurrent access to the slot. + pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { + // Get the offset into the block + let slot_offset = offset(slot_index); + + self.values[slot_offset].with_mut(|ptr| { + ptr::write(ptr, MaybeUninit::new(value)); + }); + + // Release the value. After this point, the slot ref may no longer + // be used. It is possible for the receiver to free the memory at + // any point. + self.set_ready(slot_offset); + } + + /// Signal to the receiver that the sender half of the list is closed. + pub(crate) unsafe fn tx_close(&self) { + self.ready_slots.fetch_or(TX_CLOSED, Release); + } + + /// Reset the block to a blank state. This enables reusing blocks in the + /// channel. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * All slots are empty. + /// * The caller holds a unique pointer to the block. + pub(crate) unsafe fn reclaim(&mut self) { + self.start_index = 0; + self.next = AtomicPtr::new(ptr::null_mut()); + self.ready_slots = AtomicUsize::new(0); + } + + /// Release the block to the rx half for freeing. + /// + /// This function is called by the tx half once it can be guaranteed that no + /// more senders will attempt to access the block. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * The block will no longer be accessed by any sender. + pub(crate) unsafe fn tx_release(&self, tail_position: usize) { + // Track the observed tail_position. Any sender targetting a greater + // tail_position is guaranteed to not access this block. + self.observed_tail_position + .with_mut(|ptr| *ptr = tail_position); + + // Set the released bit, signalling to the receiver that it is safe to + // free the block's memory as soon as all slots **prior** to + // `observed_tail_position` have been filled. + self.ready_slots.fetch_or(RELEASED, Release); + } + + /// Mark a slot as ready + fn set_ready(&self, slot: usize) { + let mask = 1 << slot; + self.ready_slots.fetch_or(mask, Release); + } + + /// Returns `true` when all slots have their `ready` bits set. + /// + /// This indicates that the block is in its final state and will no longer + /// be mutated. + /// + /// # Implementation + /// + /// The implementation walks each slot checking the `ready` flag. It might + /// be that it would make more sense to coalesce ready flags as bits in a + /// single atomic cell. However, this could have negative impact on cache + /// behavior as there would be many more mutations to a single slot. + pub(crate) fn is_final(&self) -> bool { + self.ready_slots.load(Acquire) & READY_MASK == READY_MASK + } + + /// Returns the `observed_tail_position` value, if set + pub(crate) fn observed_tail_position(&self) -> Option { + if 0 == RELEASED & self.ready_slots.load(Acquire) { + None + } else { + Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) + } + } + + /// Load the next block + pub(crate) fn load_next(&self, ordering: Ordering) -> Option>> { + let ret = NonNull::new(self.next.load(ordering)); + + debug_assert!(unsafe { + ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) + .unwrap_or(true) + }); + + ret + } + + /// Push `block` as the next block in the link. + /// + /// Returns Ok if successful, otherwise, a pointer to the next block in + /// the list is returned. + /// + /// This requires that the next pointer is null. + /// + /// # Ordering + /// + /// This performs a compare-and-swap on `next` using AcqRel ordering. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * `block` is not freed until it has been removed from the list. + pub(crate) unsafe fn try_push( + &self, + block: &mut NonNull>, + ordering: Ordering, + ) -> Result<(), NonNull>> { + block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); + + let next_ptr = self + .next + .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); + + match NonNull::new(next_ptr) { + Some(next_ptr) => Err(next_ptr), + None => Ok(()), + } + } + + /// Grow the `Block` linked list by allocating and appending a new block. + /// + /// The next block in the linked list is returned. This may or may not be + /// the one allocated by the function call. + /// + /// # Implementation + /// + /// It is assumed that `self.next` is null. A new block is allocated with + /// `start_index` set to be the next block. A compare-and-swap is performed + /// with AcqRel memory ordering. If the compare-and-swap is successful, the + /// newly allocated block is released to other threads walking the block + /// linked list. If the compare-and-swap fails, the current thread acquires + /// the next block in the linked list, allowing the current thread to access + /// the slots. + pub(crate) fn grow(&self) -> NonNull> { + // Create the new block. It is assumed that the block will become the + // next one after `&self`. If this turns out to not be the case, + // `start_index` is updated accordingly. + let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); + + let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; + + // Attempt to store the block. The first compare-and-swap attempt is + // "unrolled" due to minor differences in logic + // + // `AcqRel` is used as the ordering **only** when attempting the + // compare-and-swap on self.next. + // + // If the compare-and-swap fails, then the actual value of the cell is + // returned from this function and accessed by the caller. Given this, + // the memory must be acquired. + // + // `Release` ensures that the newly allocated block is available to + // other threads acquiring the next pointer. + let next = NonNull::new(self.next.compare_and_swap( + ptr::null_mut(), + new_block.as_ptr(), + AcqRel, + )); + + let next = match next { + Some(next) => next, + None => { + // The compare-and-swap succeeded and the newly allocated block + // is successfully pushed. + return new_block; + } + }; + + // There already is a next block in the linked list. The newly allocated + // block could be dropped and the discovered next block returned; + // however, that would be wasteful. Instead, the linked list is walked + // by repeatedly attempting to compare-and-swap the pointer into the + // `next` register until the compare-and-swap succeed. + // + // Care is taken to update new_block's start_index field as appropriate. + + let mut curr = next; + + // TODO: Should this iteration be capped? + loop { + let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; + + curr = match actual { + Ok(_) => { + return next; + } + Err(curr) => curr, + }; + + // When running outside of loom, this calls `spin_loop_hint`. + thread::yield_now(); + } + } +} + +/// Returns `true` if the specificed slot has a value ready to be consumed. +fn is_ready(bits: usize, slot: usize) -> bool { + let mask = 1 << slot; + mask == mask & bits +} + +/// Returns `true` if the closed flag has been set. +fn is_tx_closed(bits: usize) -> bool { + TX_CLOSED == bits & TX_CLOSED +} + +impl Values { + unsafe fn uninitialized() -> Values { + let mut vals = MaybeUninit::uninit(); + + // When fuzzing, `CausalCell` needs to be initialized. + if_loom! { + let p = vals.as_mut_ptr() as *mut CausalCell>; + for i in 0..BLOCK_CAP { + p.add(i) + .write(CausalCell::new(MaybeUninit::uninit())); + } + } + + Values(vals.assume_init()) + } +} + +impl ops::Index for Values { + type Output = CausalCell>; + + fn index(&self, index: usize) -> &Self::Output { + self.0.index(index) + } +} diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs new file mode 100644 index 00000000..787dd507 --- /dev/null +++ b/tokio/src/sync/mpsc/bounded.rs @@ -0,0 +1,337 @@ +use crate::sync::mpsc::chan; +use crate::sync::semaphore; + +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Send values to the associated `Receiver`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +pub struct Sender { + chan: chan::Tx, +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Sender { + chan: self.chan.clone(), + } + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Sender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `Sender`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +pub struct Receiver { + /// The channel receiver + chan: chan::Rx, +} + +impl fmt::Debug for Receiver { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Receiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Error returned by the `Sender`. +#[derive(Debug)] +pub struct SendError(()); + +/// Error returned by `Sender::try_send`. +#[derive(Debug)] +pub struct TrySendError { + kind: ErrorKind, + value: T, +} + +#[derive(Debug)] +enum ErrorKind { + Closed, + NoCapacity, +} + +/// Error returned by `Receiver`. +#[derive(Debug)] +pub struct RecvError(()); + +/// Create a bounded mpsc channel for communicating between asynchronous tasks, +/// returning the sender/receiver halves. +/// +/// All data sent on `Sender` will become available on `Receiver` in the same +/// order as it was sent. +/// +/// The `Sender` can be cloned to `send` to the same channel from multiple code +/// locations. Only one `Receiver` is supported. +/// +/// If the `Receiver` is disconnected while trying to `send`, the `send` method +/// will return a `SendError`. Similarly, if `Sender` is disconnected while +/// trying to `recv`, the `recv` method will return a `RecvError`. +/// +/// # Examples +/// +/// ```rust +/// use tokio::sync::mpsc; +/// +/// #[tokio::main] +/// async fn main() { +/// let (mut tx, mut rx) = mpsc::channel(100); +/// +/// tokio::spawn(async move { +/// for i in 0..10 { +/// if let Err(_) = tx.send(i).await { +/// println!("receiver dropped"); +/// return; +/// } +/// } +/// }); +/// +/// while let Some(i) = rx.recv().await { +/// println!("got = {}", i); +/// } +/// } +/// ``` +pub fn channel(buffer: usize) -> (Sender, Receiver) { + assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); + let semaphore = (semaphore::Semaphore::new(buffer), buffer); + let (tx, rx) = chan::channel(semaphore); + + let tx = Sender::new(tx); + let rx = Receiver::new(rx); + + (tx, rx) +} + +/// Channel semaphore is a tuple of the semaphore implementation and a `usize` +/// representing the channel bound. +type Semaphore = (semaphore::Semaphore, usize); + +impl Receiver { + pub(crate) fn new(chan: chan::Rx) -> Receiver { + Receiver { chan } + } + + /// Receive the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(100); + /// + /// tokio::spawn(async move { + /// tx.send("hello").await.unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(100); + /// + /// tx.send("hello").await.unwrap(); + /// tx.send("world").await.unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + #[doc(hidden)] // TODO: remove + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +impl futures_core::Stream for Receiver { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().poll_recv(cx) + } +} + +impl Sender { + pub(crate) fn new(chan: chan::Tx) -> Sender { + Sender { chan } + } + + #[doc(hidden)] // TODO: remove + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.poll_ready(cx).map_err(|_| SendError(())) + } + + /// Attempts to send a message on this `Sender`, returning the message + /// if there was an error. + pub fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.chan.try_send(message)?; + Ok(()) + } + + /// Send a value, waiting until there is capacity. + /// + /// # Examples + /// + /// In the following example, each call to `send` will block until the + /// previously sent value was received. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(_) = tx.send(i).await { + /// println!("receiver dropped"); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// } + /// } + /// ``` + pub async fn send(&mut self, value: T) -> Result<(), SendError> { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_ready(cx)).await?; + + self.try_send(value).map_err(|_| SendError(())) + } +} + +impl futures_sink::Sink for Sender { + type Error = SendError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sender::poll_ready(self.get_mut(), cx) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.as_mut().try_send(msg).map_err(|err| { + assert!(err.is_full(), "call `poll_ready` before sending"); + SendError(()) + }) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +// ===== impl SendError ===== + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for SendError {} + +// ===== impl TrySendError ===== + +impl TrySendError { + /// Get the inner value. + pub fn into_inner(self) -> T { + self.value + } + + /// Did the send fail because the channel has been closed? + pub fn is_closed(&self) -> bool { + if let ErrorKind::Closed = self.kind { + true + } else { + false + } + } + + /// Did the send fail because the channel was at capacity? + pub fn is_full(&self) -> bool { + if let ErrorKind::NoCapacity = self.kind { + true + } else { + false + } + } +} + +impl fmt::Display for TrySendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let descr = match self.kind { + ErrorKind::Closed => "channel closed", + ErrorKind::NoCapacity => "no available capacity", + }; + write!(fmt, "{}", descr) + } +} + +impl ::std::error::Error for TrySendError {} + +impl From<(T, chan::TrySendError)> for TrySendError { + fn from((value, err): (T, chan::TrySendError)) -> TrySendError { + TrySendError { + value, + kind: match err { + chan::TrySendError::Closed => ErrorKind::Closed, + chan::TrySendError::NoPermits => ErrorKind::NoCapacity, + }, + } + } +} + +// ===== impl RecvError ===== + +impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for RecvError {} diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs new file mode 100644 index 00000000..fe5ff904 --- /dev/null +++ b/tokio/src/sync/mpsc/chan.rs @@ -0,0 +1,451 @@ +use crate::sync::loom::{ + future::AtomicWaker, + sync::atomic::AtomicUsize, + sync::{Arc, CausalCell}, +}; +use crate::sync::mpsc::list; + +use std::fmt; +use std::process; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; + +/// Channel sender +pub(crate) struct Tx { + inner: Arc>, + permit: S::Permit, +} + +impl fmt::Debug for Tx +where + S::Permit: fmt::Debug, + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx") + .field("inner", &self.inner) + .field("permit", &self.permit) + .finish() + } +} + +/// Channel receiver +pub(crate) struct Rx { + inner: Arc>, +} + +impl fmt::Debug for Rx +where + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx").field("inner", &self.inner).finish() + } +} + +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum TrySendError { + Closed, + NoPermits, +} + +pub(crate) trait Semaphore { + type Permit; + + fn new_permit() -> Self::Permit; + + /// The permit is dropped without a value being sent. In this case, the + /// permit must be returned to the semaphore. + fn drop_permit(&self, permit: &mut Self::Permit); + + fn is_idle(&self) -> bool; + + fn add_permit(&self); + + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit) + -> Poll>; + + fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; + + /// A value was sent into the channel and the permit held by `tx` is + /// dropped. In this case, the permit should not immeditely be returned to + /// the semaphore. Instead, the permit is returnred to the semaphore once + /// the sent value is read by the rx handle. + fn forget(&self, permit: &mut Self::Permit); + + fn close(&self); +} + +struct Chan { + /// Handle to the push half of the lock-free list. + tx: list::Tx, + + /// Coordinates access to channel's capacity. + semaphore: S, + + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, + + /// Tracks the number of outstanding sender handles. + /// + /// When this drops to zero, the send half of the channel is closed. + tx_count: AtomicUsize, + + /// Only accessed by `Rx` handle. + rx_fields: CausalCell>, +} + +impl fmt::Debug for Chan +where + S: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Chan") + .field("tx", &self.tx) + .field("semaphore", &self.semaphore) + .field("rx_waker", &self.rx_waker) + .field("tx_count", &self.tx_count) + .field("rx_fields", &"...") + .finish() + } +} + +/// Fields only accessed by `Rx` handle. +struct RxFields { + /// Channel receiver. This field is only accessed by the `Receiver` type. + list: list::Rx, + + /// `true` if `Rx::close` is called. + rx_closed: bool, +} + +impl fmt::Debug for RxFields { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("RxFields") + .field("list", &self.list) + .field("rx_closed", &self.rx_closed) + .finish() + } +} + +unsafe impl Send for Chan {} +unsafe impl Sync for Chan {} + +pub(crate) fn channel(semaphore: S) -> (Tx, Rx) +where + S: Semaphore, +{ + let (tx, rx) = list::channel(); + + let chan = Arc::new(Chan { + tx, + semaphore, + rx_waker: AtomicWaker::new(), + tx_count: AtomicUsize::new(1), + rx_fields: CausalCell::new(RxFields { + list: rx, + rx_closed: false, + }), + }); + + (Tx::new(chan.clone()), Rx::new(chan)) +} + +// ===== impl Tx ===== + +impl Tx +where + S: Semaphore, +{ + fn new(chan: Arc>) -> Tx { + Tx { + inner: chan, + permit: S::new_permit(), + } + } + + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.semaphore.poll_acquire(cx, &mut self.permit) + } + + /// Send a message and notify the receiver. + pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> { + if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) { + return Err((value, e)); + } + + // Push the value + self.inner.tx.push(value); + + // Notify the rx task + self.inner.rx_waker.wake(); + + // Release the permit + self.inner.semaphore.forget(&mut self.permit); + + Ok(()) + } +} + +impl Clone for Tx +where + S: Semaphore, +{ + fn clone(&self) -> Tx { + // Using a Relaxed ordering here is sufficient as the caller holds a + // strong ref to `self`, preventing a concurrent decrement to zero. + self.inner.tx_count.fetch_add(1, Relaxed); + + Tx { + inner: self.inner.clone(), + permit: S::new_permit(), + } + } +} + +impl Drop for Tx +where + S: Semaphore, +{ + fn drop(&mut self) { + self.inner.semaphore.drop_permit(&mut self.permit); + + if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { + return; + } + + // Close the list, which sends a `Close` message + self.inner.tx.close(); + + // Notify the receiver + self.inner.rx_waker.wake(); + } +} + +// ===== impl Rx ===== + +impl Rx +where + S: Semaphore, +{ + fn new(chan: Arc>) -> Rx { + Rx { inner: chan } + } + + pub(crate) fn close(&mut self) { + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + if rx_fields.rx_closed { + return; + } + + rx_fields.rx_closed = true; + }); + + self.inner.semaphore.close(); + } + + /// Receive the next value + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { + use super::block::Read::*; + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + macro_rules! try_recv { + () => { + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + return Ready(Some(value)); + } + Some(Closed) => { + // TODO: This check may not be required as it most + // likely can only return `true` at this point. A + // channel is closed when all tx handles are + // dropped. Dropping a tx handle releases memory, + // which ensures that if dropping the tx handle is + // visible, then all messages sent are also visible. + assert!(self.inner.semaphore.is_idle()); + return Ready(None); + } + None => {} // fall through + } + }; + } + + try_recv!(); + + self.inner.rx_waker.register_by_ref(cx.waker()); + + // It is possible that a value was pushed between attempting to read + // and registering the task, so we have to check the channel a + // second time here. + try_recv!(); + + debug!( + "recv; rx_closed = {:?}; is_idle = {:?}", + rx_fields.rx_closed, + self.inner.semaphore.is_idle() + ); + + if rx_fields.rx_closed && self.inner.semaphore.is_idle() { + Ready(None) + } else { + Pending + } + }) + } +} + +impl Drop for Rx +where + S: Semaphore, +{ + fn drop(&mut self) { + use super::block::Read::Value; + + self.close(); + + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { + self.inner.semaphore.add_permit(); + } + }) + } +} + +// ===== impl Chan ===== + +impl Drop for Chan { + fn drop(&mut self) { + use super::block::Read::Value; + + // Safety: the only owner of the rx fields is Chan, and eing + // inside its own Drop means we're the last ones to touch it. + self.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + + while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} + unsafe { rx_fields.list.free_blocks() }; + }); + } +} + +use crate::sync::semaphore::TryAcquireError; + +impl From for TrySendError { + fn from(src: TryAcquireError) -> TrySendError { + if src.is_closed() { + TrySendError::Closed + } else if src.is_no_permits() { + TrySendError::NoPermits + } else { + unreachable!(); + } + } +} + +// ===== impl Semaphore for (::Semaphore, capacity) ===== + +use crate::sync::semaphore::Permit; + +impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { + type Permit = Permit; + + fn new_permit() -> Permit { + Permit::new() + } + + fn drop_permit(&self, permit: &mut Permit) { + permit.release(&self.0); + } + + fn add_permit(&self) { + self.0.add_permits(1) + } + + fn is_idle(&self) -> bool { + self.0.available_permits() == self.1 + } + + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll> { + permit.poll_acquire(cx, &self.0).map_err(|_| ()) + } + + fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { + permit.try_acquire(&self.0)?; + Ok(()) + } + + fn forget(&self, permit: &mut Self::Permit) { + permit.forget() + } + + fn close(&self) { + self.0.close(); + } +} + +// ===== impl Semaphore for AtomicUsize ===== + +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::usize; + +impl Semaphore for AtomicUsize { + type Permit = (); + + fn new_permit() {} + + fn drop_permit(&self, _permit: &mut ()) {} + + fn add_permit(&self) { + let prev = self.fetch_sub(2, Release); + + if prev >> 1 == 0 { + // Something went wrong + process::abort(); + } + } + + fn is_idle(&self) -> bool { + self.load(Acquire) >> 1 == 0 + } + + fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll> { + Ready(self.try_acquire(permit).map_err(|_| ())) + } + + fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { + let mut curr = self.load(Acquire); + + loop { + if curr & 1 == 1 { + return Err(TrySendError::Closed); + } + + if curr == usize::MAX ^ 1 { + // Overflowed the ref count. There is no safe way to recover, so + // abort the process. In practice, this should never happen. + process::abort() + } + + match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) { + Ok(_) => return Ok(()), + Err(actual) => { + curr = actual; + } + } + } + } + + fn forget(&self, _permit: &mut ()) {} + + fn close(&self) { + self.fetch_or(1, Release); + } +} diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs new file mode 100644 index 00000000..a1295bb0 --- /dev/null +++ b/tokio/src/sync/mpsc/list.rs @@ -0,0 +1,348 @@ +//! A concurrent, lock-free, FIFO list. + +use crate::sync::loom::{ + sync::atomic::{AtomicPtr, AtomicUsize}, + thread, +}; +use crate::sync::mpsc::block::{self, Block}; + +use std::fmt; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; + +/// List queue transmit handle +pub(crate) struct Tx { + /// Tail in the `Block` mpmc list. + block_tail: AtomicPtr>, + + /// Position to push the next message. This reference a block and offset + /// into the block. + tail_position: AtomicUsize, +} + +/// List queue receive handle +pub(crate) struct Rx { + /// Pointer to the block being processed + head: NonNull>, + + /// Next slot index to process + index: usize, + + /// Pointer to the next block pending release + free_head: NonNull>, +} + +pub(crate) fn channel() -> (Tx, Rx) { + // Create the initial block shared between the tx and rx halves. + let initial_block = Box::new(Block::new(0)); + let initial_block_ptr = Box::into_raw(initial_block); + + let tx = Tx { + block_tail: AtomicPtr::new(initial_block_ptr), + tail_position: AtomicUsize::new(0), + }; + + let head = NonNull::new(initial_block_ptr).unwrap(); + + let rx = Rx { + head, + index: 0, + free_head: head, + }; + + (tx, rx) +} + +impl Tx { + /// Push a value into the list. + pub(crate) fn push(&self, value: T) { + // First, claim a slot for the value. `Acquire` is used here to + // synchronize with the `fetch_add` in `reclaim_blocks`. + let slot_index = self.tail_position.fetch_add(1, Acquire); + + // Load the current block and write the value + let block = self.find_block(slot_index); + + unsafe { + // Write the value to the block + block.as_ref().write(slot_index, value); + } + } + + /// Close the send half of the list + /// + /// Similar process as pushing a value, but instead of writing the value & + /// setting the ready flag, the TX_CLOSED flag is set on the block. + pub(crate) fn close(&self) { + // First, claim a slot for the value. This is the last slot that will be + // claimed. + let slot_index = self.tail_position.fetch_add(1, Acquire); + + let block = self.find_block(slot_index); + + unsafe { block.as_ref().tx_close() } + } + + fn find_block(&self, slot_index: usize) -> NonNull> { + // The start index of the block that contains `index`. + let start_index = block::start_index(slot_index); + + // The index offset into the block + let offset = block::offset(slot_index); + + // Load the current head of the block + let mut block_ptr = self.block_tail.load(Acquire); + + let block = unsafe { &*block_ptr }; + + // Calculate the distance between the tail ptr and the target block + let distance = block.distance(start_index); + + // Decide if this call to `find_block` should attempt to update the + // `block_tail` pointer. + // + // Updating `block_tail` is not always performed in order to reduce + // contention. + // + // When set, as the routine walks the linked list, it attempts to update + // `block_tail`. If the update cannot be performed, `try_updating_tail` + // is unset. + let mut try_updating_tail = distance > offset; + + // Walk the linked list of blocks until the block with `start_index` is + // found. + loop { + let block = unsafe { &(*block_ptr) }; + + if block.is_at_index(start_index) { + return unsafe { NonNull::new_unchecked(block_ptr) }; + } + + let next_block = block + .load_next(Acquire) + // There is no allocated next block, grow the linked list. + .unwrap_or_else(|| block.grow()); + + // If the block is **not** final, then the tail pointer cannot be + // advanced any more. + try_updating_tail &= block.is_final(); + + if try_updating_tail { + // Advancing `block_tail` must happen when walking the linked + // list. `block_tail` may not advance passed any blocks that are + // not "final". At the point a block is finalized, it is unknown + // if there are any prior blocks that are unfinalized, which + // makes it impossible to advance `block_tail`. + // + // While walking the linked list, `block_tail` can be advanced + // as long as finalized blocks are traversed. + // + // Release ordering is used to ensure that any subsequent reads + // are able to see the memory pointed to by `block_tail`. + // + // Acquire is not needed as any "actual" value is not accessed. + // At this point, the linked list is walked to acquire blocks. + let actual = + self.block_tail + .compare_and_swap(block_ptr, next_block.as_ptr(), Release); + + if actual == block_ptr { + // Synchronize with any senders + let tail_position = self.tail_position.fetch_add(0, Release); + + unsafe { + block.tx_release(tail_position); + } + } else { + // A concurrent sender is also working on advancing + // `block_tail` and this thread is falling behind. + // + // Stop trying to advance the tail pointer + try_updating_tail = false; + } + } + + block_ptr = next_block.as_ptr(); + + thread::yield_now(); + } + } + + pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull>) { + debug!("+ reclaim_block({:p})", block); + // The block has been removed from the linked list and ownership + // is reclaimed. + // + // Before dropping the block, see if it can be reused by + // inserting it back at the end of the linked list. + // + // First, reset the data + block.as_mut().reclaim(); + + let mut reused = false; + + // Attempt to insert the block at the end + // + // Walk at most three times + // + let curr_ptr = self.block_tail.load(Acquire); + + // The pointer can never be null + debug_assert!(!curr_ptr.is_null()); + + let mut curr = NonNull::new_unchecked(curr_ptr); + + // TODO: Unify this logic with Block::grow + for _ in 0..3 { + match curr.as_ref().try_push(&mut block, AcqRel) { + Ok(_) => { + reused = true; + break; + } + Err(next) => { + curr = next; + } + } + } + + if !reused { + debug!(" + block freed {:p}", block); + let _ = Box::from_raw(block.as_ptr()); + } + } +} + +impl fmt::Debug for Tx { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Tx") + .field("block_tail", &self.block_tail.load(Relaxed)) + .field("tail_position", &self.tail_position.load(Relaxed)) + .finish() + } +} + +impl Rx { + /// Pop the next value off the queue + pub(crate) fn pop(&mut self, tx: &Tx) -> Option> { + // Advance `head`, if needed + if !self.try_advancing_head() { + debug!("+ !self.try_advancing_head() -> false"); + return None; + } + + self.reclaim_blocks(tx); + + unsafe { + let block = self.head.as_ref(); + + let ret = block.read(self.index); + + if let Some(block::Read::Value(..)) = ret { + self.index = self.index.wrapping_add(1); + } + + ret + } + } + + /// Try advancing the block pointer to the block referenced by `self.index`. + /// + /// Returns `true` if successful, `false` if there is no next block to load. + fn try_advancing_head(&mut self) -> bool { + let block_index = block::start_index(self.index); + + loop { + let next_block = { + let block = unsafe { self.head.as_ref() }; + + if block.is_at_index(block_index) { + return true; + } + + block.load_next(Acquire) + }; + + let next_block = match next_block { + Some(next_block) => next_block, + None => { + return false; + } + }; + + self.head = next_block; + + thread::yield_now(); + } + } + + fn reclaim_blocks(&mut self, tx: &Tx) { + debug!("+ reclaim_blocks()"); + + while self.free_head != self.head { + unsafe { + // Get a handle to the block that will be freed and update + // `free_head` to point to the next block. + let block = self.free_head; + + let observed_tail_position = block.as_ref().observed_tail_position(); + + let required_index = match observed_tail_position { + Some(i) => i, + None => return, + }; + + if required_index > self.index { + return; + } + + // We may read the next pointer with `Relaxed` ordering as it is + // guaranteed that the `reclaim_blocks` routine trails the `recv` + // routine. Any memory accessed by `reclaim_blocks` has already + // been acquired by `recv`. + let next_block = block.as_ref().load_next(Relaxed); + + // Update the free list head + self.free_head = next_block.unwrap(); + + // Push the emptied block onto the back of the queue, making it + // available to senders. + tx.reclaim_block(block); + } + + thread::yield_now(); + } + } + + /// Effectively `Drop` all the blocks. Should only be called once, when + /// the list is dropping. + pub(super) unsafe fn free_blocks(&mut self) { + debug!("+ free_blocks()"); + debug_assert_ne!(self.free_head, NonNull::dangling()); + + let mut cur = Some(self.free_head); + + #[cfg(debug_assertions)] + { + // to trigger the debug assert above so as to catch that we + // don't call `free_blocks` more than once. + self.free_head = NonNull::dangling(); + self.head = NonNull::dangling(); + } + + while let Some(block) = cur { + cur = block.as_ref().load_next(Relaxed); + debug!(" + free: block = {:p}", block); + drop(Box::from_raw(block.as_ptr())); + } + } +} + +impl fmt::Debug for Rx { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Rx") + .field("head", &self.head) + .field("index", &self.index) + .field("free_head", &self.free_head) + .finish() + } +} diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs new file mode 100644 index 00000000..3b95b954 --- /dev/null +++ b/tokio/src/sync/mpsc/mod.rs @@ -0,0 +1,67 @@ +//! A multi-producer, single-consumer queue for sending values across +//! asynchronous tasks. +//! +//! Similar to `std`, channel creation provides [`Receiver`] and [`Sender`] +//! handles. [`Receiver`] implements `Stream` and allows a task to read values +//! out of the channel. If there is no message to read, the current task will be +//! notified when a new value is sent. [`Sender`] implements the `Sink` trait +//! and allows sending messages into the channel. If the channel is at capacity, +//! the send is rejected and the task will be notified when additional capacity +//! is available. In other words, the channel provides backpressure. +//! +//! Unbounded channels are also available using the `unbounded_channel` +//! constructor. +//! +//! # Disconnection +//! +//! When all [`Sender`] handles have been dropped, it is no longer +//! possible to send values into the channel. This is considered the termination +//! event of the stream. As such, `Receiver::poll` returns `Ok(Ready(None))`. +//! +//! If the [`Receiver`] handle is dropped, then messages can no longer +//! be read out of the channel. In this case, all further attempts to send will +//! result in an error. +//! +//! # Clean Shutdown +//! +//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to +//! remain in the channel. Instead, it is usually desirable to perform a "clean" +//! shutdown. To do this, the receiver first calls `close`, which will prevent +//! any further messages to be sent into the channel. Then, the receiver +//! consumes the channel to completion, at which point the receiver can be +//! dropped. +//! +//! [`Sender`]: struct.Sender.html +//! [`Receiver`]: struct.Receiver.html + +pub(super) mod block; + +mod bounded; +pub use self::bounded::{channel, Receiver, Sender}; + +mod chan; + +pub(super) mod list; + +mod unbounded; +pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +pub mod error { + //! Channel error types + + pub use super::bounded::{RecvError, SendError, TrySendError}; + pub use super::unbounded::{UnboundedRecvError, UnboundedSendError, UnboundedTrySendError}; +} + +/// The number of values a block can contain. +/// +/// This value must be a power of 2. It also must be smaller than the number of +/// bits in `usize`. +#[cfg(all(target_pointer_width = "64", not(loom)))] +const BLOCK_CAP: usize = 32; + +#[cfg(all(not(target_pointer_width = "64"), not(loom)))] +const BLOCK_CAP: usize = 16; + +#[cfg(loom)] +const BLOCK_CAP: usize = 2; diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs new file mode 100644 index 00000000..5a73771e --- /dev/null +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -0,0 +1,230 @@ +use crate::sync::loom::sync::atomic::AtomicUsize; +use crate::sync::mpsc::chan; + +use std::fmt; +use std::task::{Context, Poll}; + +use std::pin::Pin; + +/// Send values to the associated `UnboundedReceiver`. +/// +/// Instances are created by the +/// [`unbounded_channel`](fn.unbounded_channel.html) function. +pub struct UnboundedSender { + chan: chan::Tx, +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + UnboundedSender { + chan: self.chan.clone(), + } + } +} + +impl fmt::Debug for UnboundedSender { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedSender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `UnboundedSender`. +/// +/// Instances are created by the +/// [`unbounded_channel`](fn.unbounded_channel.html) function. +pub struct UnboundedReceiver { + /// The channel receiver + chan: chan::Rx, +} + +impl fmt::Debug for UnboundedReceiver { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedReceiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Error returned by the `UnboundedSender`. +#[derive(Debug)] +pub struct UnboundedSendError(()); + +/// Returned by `UnboundedSender::try_send` when the channel has been closed. +#[derive(Debug)] +pub struct UnboundedTrySendError(T); + +/// Error returned by `UnboundedReceiver`. +#[derive(Debug)] +pub struct UnboundedRecvError(()); + +/// Create an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { + let (tx, rx) = chan::channel(AtomicUsize::new(0)); + + let tx = UnboundedSender::new(tx); + let rx = UnboundedReceiver::new(rx); + + (tx, rx) +} + +/// No capacity +type Semaphore = AtomicUsize; + +impl UnboundedReceiver { + pub(crate) fn new(chan: chan::Rx) -> UnboundedReceiver { + UnboundedReceiver { chan } + } + + #[doc(hidden)] // TODO: remove + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + + /// Receive the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tokio::spawn(async move { + /// tx.try_send("hello").unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tx.try_send("hello").unwrap(); + /// tx.try_send("world").unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +impl futures_core::Stream for UnboundedReceiver { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } +} + +impl UnboundedSender { + pub(crate) fn new(chan: chan::Tx) -> UnboundedSender { + UnboundedSender { chan } + } + + /// Attempts to send a message on this `UnboundedSender` without blocking. + pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError> { + self.chan.try_send(message)?; + Ok(()) + } +} + +impl futures_sink::Sink for UnboundedSender { + type Error = UnboundedSendError; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +// ===== impl UnboundedSendError ===== + +impl fmt::Display for UnboundedSendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedSendError {} + +// ===== impl TrySendError ===== + +impl UnboundedTrySendError { + /// Get the inner value. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl fmt::Display for UnboundedTrySendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedTrySendError {} + +impl From<(T, chan::TrySendError)> for UnboundedTrySendError { + fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError { + assert_eq!(chan::TrySendError::Closed, err); + UnboundedTrySendError(value) + } +} + +// ===== impl UnboundedRecvError ===== + +impl fmt::Display for UnboundedRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedRecvError {} -- cgit v1.2.3