diff options
Diffstat (limited to 'tokio/src/sync/mpsc/chan.rs')
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 268 |
1 files changed, 35 insertions, 233 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 0a53cda2..2d3f0149 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; -use crate::sync::mpsc::error::{ClosedError, TryRecvError}; -use crate::sync::mpsc::{error, list}; +use crate::sync::mpsc::error::TryRecvError; +use crate::sync::mpsc::list; use std::fmt; use std::process; @@ -12,21 +12,13 @@ use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; /// Channel sender -pub(crate) struct Tx<T, S: Semaphore> { +pub(crate) struct Tx<T, S> { inner: Arc<Chan<T, S>>, - permit: S::Permit, } -impl<T, S: Semaphore> fmt::Debug for Tx<T, S> -where - S::Permit: fmt::Debug, - S: fmt::Debug, -{ +impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Tx") - .field("inner", &self.inner) - .field("permit", &self.permit) - .finish() + fmt.debug_struct("Tx").field("inner", &self.inner).finish() } } @@ -35,71 +27,20 @@ pub(crate) struct Rx<T, S: Semaphore> { inner: Arc<Chan<T, S>>, } -impl<T, S: Semaphore> fmt::Debug for Rx<T, S> -where - S: fmt::Debug, -{ +impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Rx").field("inner", &self.inner).finish() } } -#[derive(Debug, Eq, PartialEq)] -pub(crate) enum TrySendError { - Closed, - Full, -} - -impl<T> From<(T, TrySendError)> for error::SendError<T> { - fn from(src: (T, TrySendError)) -> error::SendError<T> { - match src.1 { - TrySendError::Closed => error::SendError(src.0), - TrySendError::Full => unreachable!(), - } - } -} - -impl<T> From<(T, TrySendError)> for error::TrySendError<T> { - fn from(src: (T, TrySendError)) -> error::TrySendError<T> { - match src.1 { - TrySendError::Closed => error::TrySendError::Closed(src.0), - TrySendError::Full => error::TrySendError::Full(src.0), - } - } -} - pub(crate) trait Semaphore { - type Permit; - - fn new_permit() -> Self::Permit; - - /// The permit is dropped without a value being sent. In this case, the - /// permit must be returned to the semaphore. - /// - /// # Return - /// - /// Returns true if the permit was acquired. - fn drop_permit(&self, permit: &mut Self::Permit) -> bool; - fn is_idle(&self) -> bool; fn add_permit(&self); - fn poll_acquire( - &self, - cx: &mut Context<'_>, - permit: &mut Self::Permit, - ) -> Poll<Result<(), ClosedError>>; - - fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; - - /// A value was sent into the channel and the permit held by `tx` is - /// dropped. In this case, the permit should not immeditely be returned to - /// the semaphore. Instead, the permit is returnred to the semaphore once - /// the sent value is read by the rx handle. - fn forget(&self, permit: &mut Self::Permit); - fn close(&self); + + fn is_closed(&self) -> bool; } struct Chan<T, S> { @@ -157,10 +98,7 @@ impl<T> fmt::Debug for RxFields<T> { unsafe impl<T: Send, S: Send> Send for Chan<T, S> {} unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {} -pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) -where - S: Semaphore, -{ +pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { @@ -179,48 +117,27 @@ where // ===== impl Tx ===== -impl<T, S> Tx<T, S> -where - S: Semaphore, -{ +impl<T, S> Tx<T, S> { fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> { - Tx { - inner: chan, - permit: S::new_permit(), - } + Tx { inner: chan } } - pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { - self.inner.semaphore.poll_acquire(cx, &mut self.permit) - } - - pub(crate) fn disarm(&mut self) { - // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit); + pub(super) fn semaphore(&self) -> &S { + &self.inner.semaphore } /// Send a message and notify the receiver. - pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> { - self.inner.try_send(value, &mut self.permit) - } -} - -impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> { - pub(crate) fn is_ready(&self) -> bool { - self.permit.is_acquired() + pub(crate) fn send(&self, value: T) { + self.inner.send(value); } -} -impl<T> Tx<T, AtomicUsize> { - pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> { - self.inner.try_send(value, &mut ()) + /// Wake the receive half + pub(crate) fn wake_rx(&self) { + self.inner.rx_waker.wake(); } } -impl<T, S> Clone for Tx<T, S> -where - S: Semaphore, -{ +impl<T, S> Clone for Tx<T, S> { fn clone(&self) -> Tx<T, S> { // Using a Relaxed ordering here is sufficient as the caller holds a // strong ref to `self`, preventing a concurrent decrement to zero. @@ -228,22 +145,12 @@ where Tx { inner: self.inner.clone(), - permit: S::new_permit(), } } } -impl<T, S> Drop for Tx<T, S> -where - S: Semaphore, -{ +impl<T, S> Drop for Tx<T, S> { fn drop(&mut self) { - let notify = self.inner.semaphore.drop_permit(&mut self.permit); - - if notify && self.inner.semaphore.is_idle() { - self.inner.rx_waker.wake(); - } - if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; } @@ -252,16 +159,13 @@ where self.inner.tx.close(); // Notify the receiver - self.inner.rx_waker.wake(); + self.wake_rx(); } } // ===== impl Rx ===== -impl<T, S> Rx<T, S> -where - S: Semaphore, -{ +impl<T, S: Semaphore> Rx<T, S> { fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> { Rx { inner: chan } } @@ -349,10 +253,7 @@ where } } -impl<T, S> Drop for Rx<T, S> -where - S: Semaphore, -{ +impl<T, S: Semaphore> Drop for Rx<T, S> { fn drop(&mut self) { use super::block::Read::Value; @@ -370,25 +271,13 @@ where // ===== impl Chan ===== -impl<T, S> Chan<T, S> -where - S: Semaphore, -{ - fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> { - if let Err(e) = self.semaphore.try_acquire(permit) { - return Err((value, e)); - } - +impl<T, S> Chan<T, S> { + fn send(&self, value: T) { // Push the value self.tx.push(value); // Notify the rx task self.rx_waker.wake(); - - // Release the permit - self.semaphore.forget(permit); - - Ok(()) } } @@ -407,74 +296,24 @@ impl<T, S> Drop for Chan<T, S> { } } -use crate::sync::semaphore_ll::TryAcquireError; - -impl From<TryAcquireError> for TrySendError { - fn from(src: TryAcquireError) -> TrySendError { - if src.is_closed() { - TrySendError::Closed - } else if src.is_no_permits() { - TrySendError::Full - } else { - unreachable!(); - } - } -} - // ===== impl Semaphore for (::Semaphore, capacity) ===== -use crate::sync::semaphore_ll::Permit; - -impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { - type Permit = Permit; - - fn new_permit() -> Permit { - Permit::new() - } - - fn drop_permit(&self, permit: &mut Permit) -> bool { - let ret = permit.is_acquired(); - permit.release(1, &self.0); - ret - } - +impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) { fn add_permit(&self) { - self.0.add_permits(1) + self.0.release(1) } fn is_idle(&self) -> bool { self.0.available_permits() == self.1 } - fn poll_acquire( - &self, - cx: &mut Context<'_>, - permit: &mut Permit, - ) -> Poll<Result<(), ClosedError>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - permit - .poll_acquire(cx, 1, &self.0) - .map_err(|_| ClosedError::new()) - .map(move |r| { - coop.made_progress(); - r - }) - } - - fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { - permit.try_acquire(1, &self.0)?; - Ok(()) - } - - fn forget(&self, permit: &mut Self::Permit) { - permit.forget(1); - } - fn close(&self) { self.0.close(); } + + fn is_closed(&self) -> bool { + self.0.is_closed() + } } // ===== impl Semaphore for AtomicUsize ===== @@ -483,14 +322,6 @@ use std::sync::atomic::Ordering::{Acquire, Release}; use std::usize; impl Semaphore for AtomicUsize { - type Permit = (); - - fn new_permit() {} - - fn drop_permit(&self, _permit: &mut ()) -> bool { - false - } - fn add_permit(&self) { let prev = self.fetch_sub(2, Release); @@ -504,40 +335,11 @@ impl Semaphore for AtomicUsize { self.load(Acquire) >> 1 == 0 } - fn poll_acquire( - &self, - _cx: &mut Context<'_>, - permit: &mut (), - ) -> Poll<Result<(), ClosedError>> { - Ready(self.try_acquire(permit).map_err(|_| ClosedError::new())) - } - - fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { - let mut curr = self.load(Acquire); - - loop { - if curr & 1 == 1 { - return Err(TrySendError::Closed); - } - - if curr == usize::MAX ^ 1 { - // Overflowed the ref count. There is no safe way to recover, so - // abort the process. In practice, this should never happen. - process::abort() - } - - match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) { - Ok(_) => return Ok(()), - Err(actual) => { - curr = actual; - } - } - } - } - - fn forget(&self, _permit: &mut ()) {} - fn close(&self) { self.fetch_or(1, Release); } + + fn is_closed(&self) -> bool { + self.load(Acquire) & 1 == 1 + } } |