diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-29 15:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-29 15:11:31 -0700 |
commit | 2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch) | |
tree | de255969c720c294af754b3840efabff3e6d69a0 /tokio/src | |
parent | c62ef2d232dea1535a8e22484fa2ca083f03e903 (diff) |
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src')
31 files changed, 5089 insertions, 20 deletions
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 16faa03e..2ad573d8 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -3,7 +3,7 @@ use crate::executor::loom::sync::{Arc, Condvar, Mutex}; use crate::executor::loom::thread; #[cfg(feature = "blocking")] -use tokio_sync::oneshot; +use crate::sync::oneshot; use std::cell::Cell; use std::collections::VecDeque; diff --git a/tokio/src/executor/thread_pool/shutdown.rs b/tokio/src/executor/thread_pool/shutdown.rs index 40d8f04a..b7c4177f 100644 --- a/tokio/src/executor/thread_pool/shutdown.rs +++ b/tokio/src/executor/thread_pool/shutdown.rs @@ -4,8 +4,7 @@ //! dropped, the `Receiver` receives a notification. use crate::executor::loom::sync::Arc; - -use tokio_sync::oneshot; +use crate::sync::oneshot; #[derive(Debug, Clone)] pub(super) struct Sender { diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs index 34a07ea8..9cd99a86 100644 --- a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs +++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs @@ -3,8 +3,7 @@ use crate::loom::{ atomic::{AtomicUsize, Ordering}, CausalCell, }; - -use tokio_sync::AtomicWaker; +use crate::sync::AtomicWaker; #[derive(Debug)] pub(crate) struct ScheduledIo { diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs index 542b5e1d..a66f21da 100644 --- a/tokio/src/net/unix/incoming.rs +++ b/tokio/src/net/unix/incoming.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "async-traits")] - use super::{UnixListener, UnixStream}; use futures_core::ready; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 3a36dc90..3cf8eff3 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -90,7 +90,6 @@ impl UnixListener { /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - #[cfg(feature = "async-traits")] pub fn incoming(self) -> super::Incoming { super::Incoming::new(self) } diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index 4447ca5c..977e3a0f 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -6,7 +6,6 @@ mod datagram; pub use self::datagram::UnixDatagram; mod incoming; -#[cfg(feature = "async-traits")] pub use self::incoming::Incoming; mod listener; diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 7d2fd7a9..e70d0495 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -1,6 +1,6 @@ use crate::signal::os::{OsExtraData, OsStorage}; -use tokio_sync::mpsc::Sender; +use crate::sync::mpsc::Sender; use lazy_static::lazy_static; use std::ops; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 075e788c..87871503 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -8,8 +8,7 @@ use crate::io::AsyncRead; use crate::net::util::PollEvented; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; - -use tokio_sync::mpsc::{channel, Receiver}; +use crate::sync::mpsc::{channel, Receiver}; use futures_core::stream::Stream; use libc::c_int; diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index abde334b..1e68d628 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -7,9 +7,8 @@ #![cfg(windows)] -use super::registry::{globals, EventId, EventInfo, Init, Storage}; - -use tokio_sync::mpsc::{channel, Receiver}; +use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; +use crate::sync::mpsc::{channel, Receiver}; use futures_core::stream::Stream; use std::convert::TryFrom; diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs new file mode 100644 index 00000000..1582120e --- /dev/null +++ b/tokio/src/sync/barrier.rs @@ -0,0 +1,135 @@ +use crate::sync::watch; + +use std::sync::Mutex; + +/// A barrier enables multiple threads to synchronize the beginning of some computation. +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio::sync::Barrier; +/// use std::sync::Arc; +/// use futures_util::future::join_all; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = barrier.clone(); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(async move { +/// println!("before wait"); +/// let wr = c.wait().await; +/// println!("after wait"); +/// wr +/// }); +/// } +/// // Will not resolve until all "before wait" messages have been printed +/// let wrs = join_all(handles).await; +/// // Exactly one barrier will resolve as the "leader" +/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1); +/// # } +/// ``` +#[derive(Debug)] +pub struct Barrier { + state: Mutex<BarrierState>, + wait: watch::Receiver<usize>, + n: usize, +} + +#[derive(Debug)] +struct BarrierState { + waker: watch::Sender<usize>, + arrived: usize, + generation: usize, +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all + /// threads at once when the `n`th thread calls `wait`. + pub fn new(mut n: usize) -> Barrier { + let (waker, wait) = crate::sync::watch::channel(0); + + if n == 0 { + // if n is 0, it's not clear what behavior the user wants. + // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every + // .wait() immediately unblocks, so we adopt that here as well. + n = 1; + } + + Barrier { + state: Mutex::new(BarrierState { + waker, + arrived: 0, + generation: 1, + }), + n, + wait, + } + } + + /// Does not resolve until all tasks have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from + /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads + /// will receive a result that will return `false` from `is_leader`. + pub async fn wait(&self) -> BarrierWaitResult { + // NOTE: we are taking a _synchronous_ lock here. + // It is okay to do so because the critical section is fast and never yields, so it cannot + // deadlock even if another future is concurrently holding the lock. + // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than + // the asynchronous counter-parts, so we should use them where possible [citation needed]. + // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across + // a yield point, and thus marks the returned future as !Send. + let generation = { + let mut state = self.state.lock().unwrap(); + let generation = state.generation; + state.arrived += 1; + if state.arrived == self.n { + // we are the leader for this generation + // wake everyone, increment the generation, and return + state + .waker + .broadcast(state.generation) + .expect("there is at least one receiver"); + state.arrived = 0; + state.generation += 1; + return BarrierWaitResult(true); + } + + generation + }; + + // we're going to have to wait for the last of the generation to arrive + let mut wait = self.wait.clone(); + + loop { + // note that the first time through the loop, this _will_ yield a generation + // immediately, since we cloned a receiver that has never seen any values. + if wait.recv().await.expect("sender hasn't been closed") >= generation { + break; + } + } + + BarrierWaitResult(false) + } +} + +/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused. +#[derive(Debug, Clone)] +pub struct BarrierWaitResult(bool); + +impl BarrierWaitResult { + /// Returns true if this thread from wait is the "leader thread". + /// + /// Only one thread will have `true` returned from their result, all other threads will have + /// `false` returned. + pub fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/tokio/src/sync/loom.rs b/tokio/src/sync/loom.rs new file mode 100644 index 00000000..1b5a5c9d --- /dev/null +++ b/tokio/src/sync/loom.rs @@ -0,0 +1,48 @@ +#[cfg(not(all(test, loom)))] +mod imp { + pub(crate) mod future { + pub(crate) use crate::sync::task::AtomicWaker; + } + + pub(crate) mod sync { + pub(crate) use std::sync::atomic; + pub(crate) use std::sync::Arc; + + use std::cell::UnsafeCell; + + pub(crate) struct CausalCell<T>(UnsafeCell<T>); + + impl<T> CausalCell<T> { + pub(crate) fn new(data: T) -> CausalCell<T> { + CausalCell(UnsafeCell::new(data)) + } + + pub(crate) fn with<F, R>(&self, f: F) -> R + where + F: FnOnce(*const T) -> R, + { + f(self.0.get()) + } + + pub(crate) fn with_mut<F, R>(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } + } + + pub(crate) mod thread { + pub(crate) fn yield_now() { + ::std::sync::atomic::spin_loop_hint(); + } + } +} + +#[cfg(all(test, loom))] +mod imp { + pub(crate) use loom::*; +} + +pub(crate) use self::imp::*; diff --git a/tokio/src/sync.rs b/tokio/src/sync/mod.rs index 36847235..84f6bd98 100644 --- a/tokio/src/sync.rs +++ b/tokio/src/sync/mod.rs @@ -13,6 +13,46 @@ //! - [watch](watch/index.html), a single-producer, multi-consumer channel that //! only stores the **most recently** sent value. -pub use tokio_sync::Barrier; -pub use tokio_sync::{mpsc, oneshot, watch}; -pub use tokio_sync::{Mutex, MutexGuard}; +macro_rules! debug { + ($($t:tt)*) => { + if false { + println!($($t)*); + } + } +} + +macro_rules! if_loom { + ($($t:tt)*) => {{ + #[cfg(loom)] + const LOOM: bool = true; + #[cfg(not(loom))] + const LOOM: bool = false; + + if LOOM { + $($t)* + } + }} +} + +mod barrier; +pub use barrier::{Barrier, BarrierWaitResult}; + +mod loom; + +pub mod mpsc; + +mod mutex; +pub use mutex::{Mutex, MutexGuard}; + +pub mod oneshot; + +pub mod semaphore; + +mod task; +pub use task::AtomicWaker; + +pub mod watch; + +/// Unit tests +#[cfg(test)] +mod tests; diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs new file mode 100644 index 00000000..aea69384 --- /dev/null +++ b/tokio/src/sync/mpsc/block.rs @@ -0,0 +1,387 @@ +use crate::sync::loom::{ + sync::atomic::{AtomicPtr, AtomicUsize}, + sync::CausalCell, + thread, +}; + +use std::mem::MaybeUninit; +use std::ops; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; + +/// A block in a linked list. +/// +/// Each block in the list can hold up to `BLOCK_CAP` messages. +pub(crate) struct Block<T> { + /// The start index of this block. + /// + /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. + start_index: usize, + + /// The next block in the linked list. + next: AtomicPtr<Block<T>>, + + /// Bitfield tracking slots that are ready to have their values consumed. + ready_slots: AtomicUsize, + + /// The observed `tail_position` value *after* the block has been passed by + /// `block_tail`. + observed_tail_position: CausalCell<usize>, + + /// Array containing values pushed into the block. Values are stored in a + /// continuous array in order to improve cache line behavior when reading. + /// The values must be manually dropped. + values: Values<T>, +} + +pub(crate) enum Read<T> { + Value(T), + Closed, +} + +struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]); + +use super::BLOCK_CAP; + +/// Masks an index to get the block identifier +const BLOCK_MASK: usize = !(BLOCK_CAP - 1); + +/// Masks an index to get the value offset in a block. +const SLOT_MASK: usize = BLOCK_CAP - 1; + +/// Flag tracking that a block has gone through the sender's release routine. +/// +/// When this is set, the receiver may consider freeing the block. +const RELEASED: usize = 1 << BLOCK_CAP; + +/// Flag tracking all senders dropped. +/// +/// When this flag is set, the send half of the channel has closed. +const TX_CLOSED: usize = RELEASED << 1; + +/// Mask covering all bits used to track slot readiness. +const READY_MASK: usize = RELEASED - 1; + +/// Returns the index of the first slot in the block referenced by `slot_index`. +#[inline(always)] +pub(crate) fn start_index(slot_index: usize) -> usize { + BLOCK_MASK & slot_index +} + +/// Returns the offset into the block referenced by `slot_index`. +#[inline(always)] +pub(crate) fn offset(slot_index: usize) -> usize { + SLOT_MASK & slot_index +} + +impl<T> Block<T> { + pub(crate) fn new(start_index: usize) -> Block<T> { + Block { + // The absolute index in the channel of the first slot in the block. + start_index, + + // Pointer to the next block in the linked list. + next: AtomicPtr::new(ptr::null_mut()), + + ready_slots: AtomicUsize::new(0), + + observed_tail_position: CausalCell::new(0), + + // Value storage + values: unsafe { Values::uninitialized() }, + } + } + + /// Returns `true` if the block matches the given index + pub(crate) fn is_at_index(&self, index: usize) -> bool { + debug_assert!(offset(index) == 0); + self.start_index == index + } + + /// Returns the number of blocks between `self` and the block at the + /// specified index. + /// + /// `start_index` must represent a block *after* `self`. + pub(crate) fn distance(&self, other_index: usize) -> usize { + debug_assert!(offset(other_index) == 0); + other_index.wrapping_sub(self.start_index) / BLOCK_CAP + } + + /// Read the value at the given offset. + /// + /// Returns `None` if the slot is empty. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * No concurrent access to the slot. + pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> { + let offset = offset(slot_index); + + let ready_bits = self.ready_slots.load(Acquire); + + if !is_ready(ready_bits, offset) { + if is_tx_closed(ready_bits) { + return Some(Read::Closed); + } + + return None; + } + + // Get the value + let value = self.values[offset].with(|ptr| ptr::read(ptr)); + + Some(Read::Value(value.assume_init())) + } + + /// Write a value to the block at the given offset. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * The slot is empty. + /// * No concurrent access to the slot. + pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { + // Get the offset into the block + let slot_offset = offset(slot_index); + + self.values[slot_offset].with_mut(|ptr| { + ptr::write(ptr, MaybeUninit::new(value)); + }); + + // Release the value. After this point, the slot ref may no longer + // be used. It is possible for the receiver to free the memory at + // any point. + self.set_ready(slot_offset); + } + + /// Signal to the receiver that the sender half of the list is closed. + pub(crate) unsafe fn tx_close(&self) { + self.ready_slots.fetch_or(TX_CLOSED, Release); + } + + /// Reset the block to a blank state. This enables reusing blocks in the + /// channel. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * All slots are empty. + /// * The caller holds a unique pointer to the block. + pub(crate) unsafe fn reclaim(&mut self) { + self.start_index = 0; + self.next = AtomicPtr::new(ptr::null_mut()); + self.ready_slots = AtomicUsize::new(0); + } + + /// Release the block to the rx half for freeing. + /// + /// This function is called by the tx half once it can be guaranteed that no + /// more senders will attempt to access the block. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * The block will no longer be accessed by any sender. + pub(crate) unsafe fn tx_release(&self, tail_position: usize) { + // Track the observed tail_position. Any sender targetting a greater + // tail_position is guaranteed to not access this block. + self.observed_tail_position + .with_mut(|ptr| *ptr = tail_position); + + // Set the released bit, signalling to the receiver that it is safe to + // free the block's memory as soon as all slots **prior** to + // `observed_tail_position` have been filled. + self.ready_slots.fetch_or(RELEASED, Release); + } + + /// Mark a slot as ready + fn set_ready(&self, slot: usize) { + let mask = 1 << slot; + self.ready_slots.fetch_or(mask, Release); + } + + /// Returns `true` when all slots have their `ready` bits set. + /// + /// This indicates that the block is in its final state and will no longer + /// be mutated. + /// + /// # Implementation + /// + /// The implementation walks each slot checking the `ready` flag. It might + /// be that it would make more sense to coalesce ready flags as bits in a + /// single atomic cell. However, this could have negative impact on cache + /// behavior as there would be many more mutations to a single slot. + pub(crate) fn is_final(&self) -> bool { + self.ready_slots.load(Acquire) & READY_MASK == READY_MASK + } + + /// Returns the `observed_tail_position` value, if set + pub(crate) fn observed_tail_position(&self) -> Option<usize> { + if 0 == RELEASED & self.ready_slots.load(Acquire) { + None + } else { + Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) + } + } + + /// Load the next block + pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> { + let ret = NonNull::new(self.next.load(ordering)); + + debug_assert!(unsafe { + ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) + .unwrap_or(true) + }); + + ret + } + + /// Push `block` as the next block in the link. + /// + /// Returns Ok if successful, otherwise, a pointer to the next block in + /// the list is returned. + /// + /// This requires that the next pointer is null. + /// + /// # Ordering + /// + /// This performs a compare-and-swap on `next` using AcqRel ordering. + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * `block` is not freed until it has been removed from the list. + pub(crate) unsafe fn try_push( + &self, + block: &mut NonNull<Block<T>>, + ordering: Ordering, + ) -> Result<(), NonNull<Block<T>>> { + block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); + + let next_ptr = self + .next + .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); + + match NonNull::new(next_ptr) { + Some(next_ptr) => Err(next_ptr), + None => Ok(()), + } + } + + /// Grow the `Block` linked list by allocating and appending a new block. + /// + /// The next block in the linked list is returned. This may or may not be + /// the one allocated by the function call. + /// + /// # Implementation + /// + /// It is assumed that `self.next` is null. A new block is allocated with + /// `start_index` set to be the next block. A compare-and-swap is performed + /// with AcqRel memory ordering. If the compare-and-swap is successful, the + /// newly allocated block is released to other threads walking the block + /// linked list. If the compare-and-swap fails, the current thread acquires + /// the next block in the linked list, allowing the current thread to access + /// the slots. + pub(crate) fn grow(&self) -> NonNull<Block<T>> { + // Create the new block. It is assumed that the block will become the + // next one after `&self`. If this turns out to not be the case, + // `start_index` is updated accordingly. + let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); + + let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; + + // Attempt to store the block. The first compare-and-swap attempt is + // "unrolled" due to minor differences in logic + // + // `AcqRel` is used as the ordering **only** when attempting the + // compare-and-swap on self.next. + // + // If the compare-and-swap fails, then the actual value of the cell is + // returned from this function and accessed by the caller. Given this, + // the memory must be acquired. + // + // `Release` ensures that the newly allocated block is available to + // other threads acquiring the next pointer. + let next = NonNull::new(self.next.compare_and_swap( + ptr::null_mut(), + new_block.as_ptr(), + AcqRel, + )); + + let next = match next { + Some(next) => next, + None => { + // The compare-and-swap succeeded and the newly allocated block + // is successfully pushed. + return new_block; + } + }; + + // There already is a next block in the linked list. The newly allocated + // block could be dropped and the discovered next block returned; + // however, that would be wasteful. Instead, the linked list is walked + // by repeatedly attempting to compare-and-swap the pointer into the + // `next` register until the compare-and-swap succeed. + // + // Care is taken to update new_block's start_index field as appropriate. + + let mut curr = next; + + // TODO: Should this iteration be capped? + loop { + let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; + + curr = match actual { + Ok(_) => { + return next; + } + Err(curr) => curr, + }; + + // When running outside of loom, this calls `spin_loop_hint`. + thread::yield_now(); + } + } +} + +/// Returns `true` if the specificed slot has a value ready to be consumed. +fn is_ready(bits: usize, slot: usize) -> bool { + let mask = 1 << slot; + mask == mask & bits +} + +/// Returns `true` if the closed flag has been set. +fn is_tx_closed(bits: usize) -> bool { + TX_CLOSED == bits & TX_CLOSED +} + +impl<T> Values<T> { + unsafe fn uninitialized() -> Values<T> { + let mut vals = MaybeUninit::uninit(); + + // When fuzzing, `CausalCell` needs to be initialized. + if_loom! { + let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>; + for i in 0..BLOCK_CAP { + p.add(i) + .write(CausalCell::new(MaybeUninit::uninit())); + } + } + + Values(vals.assume_init()) + } +} + +impl<T> ops::Index<usize> for Values<T> { + type Output = CausalCell<MaybeUninit<T>>; + + fn index(&self, index: usize) -> &Self::Output { + self.0.index(index) + } +} diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs new file mode 100644 index 00000000..787dd507 --- /dev/null +++ b/tokio/src/sync/mpsc/bounded.rs @@ -0,0 +1,337 @@ +use crate::sync::mpsc::chan; +use crate::sync::semaphore; + +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Send values to the associated `Receiver`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +pub struct Sender<T> { + chan: chan::Tx<T, Semaphore>, +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { + chan: self.chan.clone(), + } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Sender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `Sender`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +pub struct Receiver<T> { + /// The channel receiver + chan: chan::Rx<T, Semaphore>, +} + +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Receiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Error returned by the `Sender`. +#[derive(Debug)] +pub struct SendError(()); + +/// Error returned by `Sender::try_send`. +#[derive(Debug)] +pub struct Tr |