use crate::loom::cell::CausalCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::sync::mpsc::error::ClosedError; use crate::sync::mpsc::{error, list}; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; /// Channel sender pub(crate) struct Tx { inner: Arc>, permit: S::Permit, } impl fmt::Debug for Tx where S::Permit: fmt::Debug, S: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Tx") .field("inner", &self.inner) .field("permit", &self.permit) .finish() } } /// Channel receiver pub(crate) struct Rx { inner: Arc>, } impl fmt::Debug for Rx where S: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Rx").field("inner", &self.inner).finish() } } #[derive(Debug, Eq, PartialEq)] pub(crate) enum TrySendError { Closed, Full, } impl From<(T, TrySendError)> for error::SendError { fn from(src: (T, TrySendError)) -> error::SendError { match src.1 { TrySendError::Closed => error::SendError(src.0), TrySendError::Full => unreachable!(), } } } impl From<(T, TrySendError)> for error::TrySendError { fn from(src: (T, TrySendError)) -> error::TrySendError { match src.1 { TrySendError::Closed => error::TrySendError::Closed(src.0), TrySendError::Full => error::TrySendError::Full(src.0), } } } pub(crate) trait Semaphore { type Permit; fn new_permit() -> Self::Permit; /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. fn drop_permit(&self, permit: &mut Self::Permit); fn is_idle(&self) -> bool; fn add_permit(&self); fn poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Self::Permit, ) -> Poll>; fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; /// A value was sent into the channel and the permit held by `tx` is /// dropped. In this case, the permit should not immeditely be returned to /// the semaphore. Instead, the permit is returnred to the semaphore once /// the sent value is read by the rx handle. fn forget(&self, permit: &mut Self::Permit); fn close(&self); } struct Chan { /// Handle to the push half of the lock-free list. tx: list::Tx, /// Coordinates access to channel's capacity. semaphore: S, /// Receiver waker. Notified when a value is pushed into the channel. rx_waker: AtomicWaker, /// Tracks the number of outstanding sender handles. /// /// When this drops to zero, the send half of the channel is closed. tx_count: AtomicUsize, /// Only accessed by `Rx` handle. rx_fields: CausalCell>, } impl fmt::Debug for Chan where S: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Chan") .field("tx", &self.tx) .field("semaphore", &self.semaphore) .field("rx_waker", &self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() } } /// Fields only accessed by `Rx` handle. struct RxFields { /// Channel receiver. This field is only accessed by the `Receiver` type. list: list::Rx, /// `true` if `Rx::close` is called. rx_closed: bool, } impl fmt::Debug for RxFields { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("RxFields") .field("list", &self.list) .field("rx_closed", &self.rx_closed) .finish() } } unsafe impl Send for Chan {} unsafe impl Sync for Chan {} pub(crate) fn channel(semaphore: S) -> (Tx, Rx) where S: Semaphore, { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { tx, semaphore, rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), rx_fields: CausalCell::new(RxFields { list: rx, rx_closed: false, }), }); (Tx::new(chan.clone()), Rx::new(chan)) } // ===== impl Tx ===== impl Tx where S: Semaphore, { fn new(chan: Arc>) -> Tx { Tx { inner: chan, permit: S::new_permit(), } } pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.semaphore.poll_acquire(cx, &mut self.permit) } /// Send a message and notify the receiver. pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> { self.inner.try_send(value, &mut self.permit) } } impl Tx { pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> { self.inner.try_send(value, &mut ()) } } impl Clone for Tx where S: Semaphore, { fn clone(&self) -> Tx { // Using a Relaxed ordering here is sufficient as the caller holds a // strong ref to `self`, preventing a concurrent decrement to zero. self.inner.tx_count.fetch_add(1, Relaxed); Tx { inner: self.inner.clone(), permit: S::new_permit(), } } } impl Drop for Tx where S: Semaphore, { fn drop(&mut self) { self.inner.semaphore.drop_permit(&mut self.permit); if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; } // Close the list, which sends a `Close` message self.inner.tx.close(); // Notify the receiver self.inner.rx_waker.wake(); } } // ===== impl Rx ===== impl Rx where S: Semaphore, { fn new(chan: Arc>) -> Rx { Rx { inner: chan } } pub(crate) fn close(&mut self) { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; if rx_fields.rx_closed { return; } rx_fields.rx_closed = true; }); self.inner.semaphore.close(); } /// Receive the next value pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; macro_rules! try_recv { () => { match rx_fields.list.pop(&self.inner.tx) { Some(Value(value)) => { self.inner.semaphore.add_permit(); return Ready(Some(value)); } Some(Closed) => { // TODO: This check may not be required as it most // likely can only return `true` at this point. A // channel is closed when all tx handles are // dropped. Dropping a tx handle releases memory, // which ensures that if dropping the tx handle is // visible, then all messages sent are also visible. assert!(self.inner.semaphore.is_idle()); return Ready(None); } None => {} // fall through } }; } try_recv!(); self.inner.rx_waker.register_by_ref(cx.waker()); // It is possible that a value was pushed between attempting to read // and registering the task, so we have to check the channel a // second time here. try_recv!(); debug!( "recv; rx_closed = {:?}; is_idle = {:?}", rx_fields.rx_closed, self.inner.semaphore.is_idle() ); if rx_fields.rx_closed && self.inner.semaphore.is_idle() { Ready(None) } else { Pending } }) } } impl Drop for Rx where S: Semaphore, { fn drop(&mut self) { use super::block::Read::Value; self.close(); self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { self.inner.semaphore.add_permit(); } }) } } // ===== impl Chan ===== impl Chan where S: Semaphore, { fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> { if let Err(e) = self.semaphore.try_acquire(permit) { return Err((value, e)); } // Push the value self.tx.push(value); // Notify the rx task self.rx_waker.wake(); // Release the permit self.semaphore.forget(permit); Ok(()) } } impl Drop for Chan { fn drop(&mut self) { use super::block::Read::Value; // Safety: the only owner of the rx fields is Chan, and eing // inside its own Drop means we're the last ones to touch it. self.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} unsafe { rx_fields.list.free_blocks() }; }); } } use crate::sync::semaphore::TryAcquireError; impl From for TrySendError { fn from(src: TryAcquireError) -> TrySendError { if src.is_closed() { TrySendError::Closed } else if src.is_no_permits() { TrySendError::Full } else { unreachable!(); } } } // ===== impl Semaphore for (::Semaphore, capacity) ===== use crate::sync::semaphore::Permit; impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { type Permit = Permit; fn new_permit() -> Permit { Permit::new() } fn drop_permit(&self, permit: &mut Permit) { permit.release(&self.0); } fn add_permit(&self) { self.0.add_permits(1) } fn is_idle(&self) -> bool { self.0.available_permits() == self.1 } fn poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Permit, ) -> Poll> { permit .poll_acquire(cx, &self.0) .map_err(|_| ClosedError::new()) } fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { permit.try_acquire(&self.0)?; Ok(()) } fn forget(&self, permit: &mut Self::Permit) { permit.forget() } fn close(&self) { self.0.close(); } } // ===== impl Semaphore for AtomicUsize ===== use std::sync::atomic::Ordering::{Acquire, Release}; use std::usize; impl Semaphore for AtomicUsize { type Permit = (); fn new_permit() {} fn drop_permit(&self, _permit: &mut ()) {} fn add_permit(&self) { let prev = self.fetch_sub(2, Release); if prev >> 1 == 0 { // Something went wrong process::abort(); } } fn is_idle(&self) -> bool { self.load(Acquire) >> 1 == 0 } fn poll_acquire( &self, _cx: &mut Context<'_>, permit: &mut (), ) -> Poll> { Ready(self.try_acquire(permit).map_err(|_| ClosedError::new())) } fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { let mut curr = self.load(Acquire); loop { if curr & 1 == 1 { return Err(TrySendError::Closed); } if curr == usize::MAX ^ 1 { // Overflowed the ref count. There is no safe way to recover, so // abort the process. In practice, this should never happen. process::abort() } match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) { Ok(_) => return Ok(()), Err(actual) => { curr = actual; } } } } fn forget(&self, _permit: &mut ()) {} fn close(&self) { self.fetch_or(1, Release); } }