summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/chan.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/mpsc/chan.rs')
-rw-r--r--tokio/src/sync/mpsc/chan.rs268
1 files changed, 35 insertions, 233 deletions
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 0a53cda2..2d3f0149 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
-use crate::sync::mpsc::error::{ClosedError, TryRecvError};
-use crate::sync::mpsc::{error, list};
+use crate::sync::mpsc::error::TryRecvError;
+use crate::sync::mpsc::list;
use std::fmt;
use std::process;
@@ -12,21 +12,13 @@ use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
/// Channel sender
-pub(crate) struct Tx<T, S: Semaphore> {
+pub(crate) struct Tx<T, S> {
inner: Arc<Chan<T, S>>,
- permit: S::Permit,
}
-impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
-where
- S::Permit: fmt::Debug,
- S: fmt::Debug,
-{
+impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Tx")
- .field("inner", &self.inner)
- .field("permit", &self.permit)
- .finish()
+ fmt.debug_struct("Tx").field("inner", &self.inner).finish()
}
}
@@ -35,71 +27,20 @@ pub(crate) struct Rx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
}
-impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
-where
- S: fmt::Debug,
-{
+impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Rx").field("inner", &self.inner).finish()
}
}
-#[derive(Debug, Eq, PartialEq)]
-pub(crate) enum TrySendError {
- Closed,
- Full,
-}
-
-impl<T> From<(T, TrySendError)> for error::SendError<T> {
- fn from(src: (T, TrySendError)) -> error::SendError<T> {
- match src.1 {
- TrySendError::Closed => error::SendError(src.0),
- TrySendError::Full => unreachable!(),
- }
- }
-}
-
-impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
- fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
- match src.1 {
- TrySendError::Closed => error::TrySendError::Closed(src.0),
- TrySendError::Full => error::TrySendError::Full(src.0),
- }
- }
-}
-
pub(crate) trait Semaphore {
- type Permit;
-
- fn new_permit() -> Self::Permit;
-
- /// The permit is dropped without a value being sent. In this case, the
- /// permit must be returned to the semaphore.
- ///
- /// # Return
- ///
- /// Returns true if the permit was acquired.
- fn drop_permit(&self, permit: &mut Self::Permit) -> bool;
-
fn is_idle(&self) -> bool;
fn add_permit(&self);
- fn poll_acquire(
- &self,
- cx: &mut Context<'_>,
- permit: &mut Self::Permit,
- ) -> Poll<Result<(), ClosedError>>;
-
- fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
-
- /// A value was sent into the channel and the permit held by `tx` is
- /// dropped. In this case, the permit should not immeditely be returned to
- /// the semaphore. Instead, the permit is returnred to the semaphore once
- /// the sent value is read by the rx handle.
- fn forget(&self, permit: &mut Self::Permit);
-
fn close(&self);
+
+ fn is_closed(&self) -> bool;
}
struct Chan<T, S> {
@@ -157,10 +98,7 @@ impl<T> fmt::Debug for RxFields<T> {
unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
-pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
-where
- S: Semaphore,
-{
+pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();
let chan = Arc::new(Chan {
@@ -179,48 +117,27 @@ where
// ===== impl Tx =====
-impl<T, S> Tx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S> Tx<T, S> {
fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
- Tx {
- inner: chan,
- permit: S::new_permit(),
- }
+ Tx { inner: chan }
}
- pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
- self.inner.semaphore.poll_acquire(cx, &mut self.permit)
- }
-
- pub(crate) fn disarm(&mut self) {
- // TODO: should this error if not acquired?
- self.inner.semaphore.drop_permit(&mut self.permit);
+ pub(super) fn semaphore(&self) -> &S {
+ &self.inner.semaphore
}
/// Send a message and notify the receiver.
- pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
- self.inner.try_send(value, &mut self.permit)
- }
-}
-
-impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
- pub(crate) fn is_ready(&self) -> bool {
- self.permit.is_acquired()
+ pub(crate) fn send(&self, value: T) {
+ self.inner.send(value);
}
-}
-impl<T> Tx<T, AtomicUsize> {
- pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
- self.inner.try_send(value, &mut ())
+ /// Wake the receive half
+ pub(crate) fn wake_rx(&self) {
+ self.inner.rx_waker.wake();
}
}
-impl<T, S> Clone for Tx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
// strong ref to `self`, preventing a concurrent decrement to zero.
@@ -228,22 +145,12 @@ where
Tx {
inner: self.inner.clone(),
- permit: S::new_permit(),
}
}
}
-impl<T, S> Drop for Tx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S> Drop for Tx<T, S> {
fn drop(&mut self) {
- let notify = self.inner.semaphore.drop_permit(&mut self.permit);
-
- if notify && self.inner.semaphore.is_idle() {
- self.inner.rx_waker.wake();
- }
-
if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
return;
}
@@ -252,16 +159,13 @@ where
self.inner.tx.close();
// Notify the receiver
- self.inner.rx_waker.wake();
+ self.wake_rx();
}
}
// ===== impl Rx =====
-impl<T, S> Rx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S: Semaphore> Rx<T, S> {
fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
Rx { inner: chan }
}
@@ -349,10 +253,7 @@ where
}
}
-impl<T, S> Drop for Rx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S: Semaphore> Drop for Rx<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
@@ -370,25 +271,13 @@ where
// ===== impl Chan =====
-impl<T, S> Chan<T, S>
-where
- S: Semaphore,
-{
- fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
- if let Err(e) = self.semaphore.try_acquire(permit) {
- return Err((value, e));
- }
-
+impl<T, S> Chan<T, S> {
+ fn send(&self, value: T) {
// Push the value
self.tx.push(value);
// Notify the rx task
self.rx_waker.wake();
-
- // Release the permit
- self.semaphore.forget(permit);
-
- Ok(())
}
}
@@ -407,74 +296,24 @@ impl<T, S> Drop for Chan<T, S> {
}
}
-use crate::sync::semaphore_ll::TryAcquireError;
-
-impl From<TryAcquireError> for TrySendError {
- fn from(src: TryAcquireError) -> TrySendError {
- if src.is_closed() {
- TrySendError::Closed
- } else if src.is_no_permits() {
- TrySendError::Full
- } else {
- unreachable!();
- }
- }
-}
-
// ===== impl Semaphore for (::Semaphore, capacity) =====
-use crate::sync::semaphore_ll::Permit;
-
-impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
- type Permit = Permit;
-
- fn new_permit() -> Permit {
- Permit::new()
- }
-
- fn drop_permit(&self, permit: &mut Permit) -> bool {
- let ret = permit.is_acquired();
- permit.release(1, &self.0);
- ret
- }
-
+impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
fn add_permit(&self) {
- self.0.add_permits(1)
+ self.0.release(1)
}
fn is_idle(&self) -> bool {
self.0.available_permits() == self.1
}
- fn poll_acquire(
- &self,
- cx: &mut Context<'_>,
- permit: &mut Permit,
- ) -> Poll<Result<(), ClosedError>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- permit
- .poll_acquire(cx, 1, &self.0)
- .map_err(|_| ClosedError::new())
- .map(move |r| {
- coop.made_progress();
- r
- })
- }
-
- fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
- permit.try_acquire(1, &self.0)?;
- Ok(())
- }
-
- fn forget(&self, permit: &mut Self::Permit) {
- permit.forget(1);
- }
-
fn close(&self) {
self.0.close();
}
+
+ fn is_closed(&self) -> bool {
+ self.0.is_closed()
+ }
}
// ===== impl Semaphore for AtomicUsize =====
@@ -483,14 +322,6 @@ use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;
impl Semaphore for AtomicUsize {
- type Permit = ();
-
- fn new_permit() {}
-
- fn drop_permit(&self, _permit: &mut ()) -> bool {
- false
- }
-
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
@@ -504,40 +335,11 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}
- fn poll_acquire(
- &self,
- _cx: &mut Context<'_>,
- permit: &mut (),
- ) -> Poll<Result<(), ClosedError>> {
- Ready(self.try_acquire(permit).map_err(|_| ClosedError::new()))
- }
-
- fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
- let mut curr = self.load(Acquire);
-
- loop {
- if curr & 1 == 1 {
- return Err(TrySendError::Closed);
- }
-
- if curr == usize::MAX ^ 1 {
- // Overflowed the ref count. There is no safe way to recover, so
- // abort the process. In practice, this should never happen.
- process::abort()
- }
-
- match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
- Ok(_) => return Ok(()),
- Err(actual) => {
- curr = actual;
- }
- }
- }
- }
-
- fn forget(&self, _permit: &mut ()) {}
-
fn close(&self) {
self.fetch_or(1, Release);
}
+
+ fn is_closed(&self) -> bool {
+ self.load(Acquire) & 1 == 1
+ }
}