summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:38 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:38 -0700
commitcf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch)
tree39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/src/sync
parent4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff)
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/batch_semaphore.rs10
-rw-r--r--tokio/src/sync/mod.rs9
-rw-r--r--tokio/src/sync/mpsc/bounded.rs338
-rw-r--r--tokio/src/sync/mpsc/chan.rs268
-rw-r--r--tokio/src/sync/mpsc/error.rs20
-rw-r--r--tokio/src/sync/mpsc/mod.rs2
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs39
-rw-r--r--tokio/src/sync/semaphore_ll.rs1221
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs14
-rw-r--r--tokio/src/sync/tests/loom_semaphore_ll.rs192
-rw-r--r--tokio/src/sync/tests/mod.rs2
-rw-r--r--tokio/src/sync/tests/semaphore_ll.rs470
12 files changed, 293 insertions, 2292 deletions
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index a1048ca3..9f324f9c 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -165,7 +165,6 @@ impl Semaphore {
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
- #[allow(dead_code)]
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
@@ -185,6 +184,11 @@ impl Semaphore {
}
}
+ /// Returns true if the semaphore is closed
+ pub(crate) fn is_closed(&self) -> bool {
+ self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
+ }
+
pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
@@ -194,8 +198,8 @@ impl Semaphore {
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
- // Has the semaphore closed?git
- if curr & Self::CLOSED > 0 {
+ // Has the semaphore closed?
+ if curr & Self::CLOSED == Self::CLOSED {
return Err(TryAcquireError::Closed);
}
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index 4c069467..6531931b 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -106,7 +106,7 @@
//!
//! #[tokio::main]
//! async fn main() {
-//! let (mut tx, mut rx) = mpsc::channel(100);
+//! let (tx, mut rx) = mpsc::channel(100);
//!
//! tokio::spawn(async move {
//! for i in 0..10 {
@@ -150,7 +150,7 @@
//! for _ in 0..10 {
//! // Each task needs its own `tx` handle. This is done by cloning the
//! // original handle.
-//! let mut tx = tx.clone();
+//! let tx = tx.clone();
//!
//! tokio::spawn(async move {
//! tx.send(&b"data to write"[..]).await.unwrap();
@@ -213,7 +213,7 @@
//!
//! // Spawn tasks that will send the increment command.
//! for _ in 0..10 {
-//! let mut cmd_tx = cmd_tx.clone();
+//! let cmd_tx = cmd_tx.clone();
//!
//! join_handles.push(tokio::spawn(async move {
//! let (resp_tx, resp_rx) = oneshot::channel();
@@ -443,7 +443,6 @@ cfg_sync! {
pub mod oneshot;
pub(crate) mod batch_semaphore;
- pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
@@ -473,7 +472,7 @@ cfg_not_sync! {
cfg_signal_internal! {
pub(crate) mod mpsc;
- pub(crate) mod semaphore_ll;
+ pub(crate) mod batch_semaphore;
}
}
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 14e4731a..2d2006d5 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,6 +1,6 @@
+use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
-use crate::sync::semaphore_ll as semaphore;
+use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
@@ -8,6 +8,7 @@ cfg_time! {
}
use std::fmt;
+#[cfg(any(feature = "signal", feature = "process", feature = "stream"))]
use std::task::{Context, Poll};
/// Send values to the associated `Receiver`.
@@ -17,20 +18,14 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
-impl<T> Clone for Sender<T> {
- fn clone(&self) -> Self {
- Sender {
- chan: self.chan.clone(),
- }
- }
-}
-
-impl<T> fmt::Debug for Sender<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Sender")
- .field("chan", &self.chan)
- .finish()
- }
+/// Permit to send one value into the channel.
+///
+/// `Permit` values are returned by [`Sender::reserve()`] and are used to
+/// guarantee channel capacity before generating a message to send.
+///
+/// [`Sender::reserve()`]: Sender::reserve
+pub struct Permit<'a, T> {
+ chan: &'a chan::Tx<T, Semaphore>,
}
/// Receive values from the associated `Sender`.
@@ -41,14 +36,6 @@ pub struct Receiver<T> {
chan: chan::Rx<T, Semaphore>,
}
-impl<T> fmt::Debug for Receiver<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Receiver")
- .field("chan", &self.chan)
- .finish()
- }
-}
-
/// Creates a bounded mpsc channel for communicating between asynchronous tasks
/// with backpressure.
///
@@ -77,7 +64,7 @@ impl<T> fmt::Debug for Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
-/// let (mut tx, mut rx) = mpsc::channel(100);
+/// let (tx, mut rx) = mpsc::channel(100);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
@@ -125,7 +112,7 @@ impl<T> Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(100);
+ /// let (tx, mut rx) = mpsc::channel(100);
///
/// tokio::spawn(async move {
/// tx.send("hello").await.unwrap();
@@ -143,7 +130,7 @@ impl<T> Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(100);
+ /// let (tx, mut rx) = mpsc::channel(100);
///
/// tx.send("hello").await.unwrap();
/// tx.send("world").await.unwrap();
@@ -154,12 +141,11 @@ impl<T> Receiver<T> {
/// ```
pub async fn recv(&mut self) -> Option<T> {
use crate::future::poll_fn;
-
- poll_fn(|cx| self.poll_recv(cx)).await
+ poll_fn(|cx| self.chan.recv(cx)).await
}
- #[doc(hidden)] // TODO: document
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ #[cfg(any(feature = "signal", feature = "process"))]
+ pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -178,7 +164,7 @@ impl<T> Receiver<T> {
/// use tokio::sync::mpsc;
///
/// fn main() {
- /// let (mut tx, mut rx) = mpsc::channel::<u8>(10);
+ /// let (tx, mut rx) = mpsc::channel::<u8>(10);
///
/// let sync_code = thread::spawn(move || {
/// assert_eq!(Some(10), rx.blocking_recv());
@@ -215,12 +201,53 @@ impl<T> Receiver<T> {
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
- /// still enabling the receiver to drain messages that are buffered.
+ /// still enabling the receiver to drain messages that are buffered. Any
+ /// outstanding [`Permit`] values will still be able to send messages.
+ ///
+ /// In order to guarantee no messages are dropped, after calling `close()`,
+ /// `recv()` must be called until `None` is returned.
+ ///
+ /// [`Permit`]: Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(20);
+ ///
+ /// tokio::spawn(async move {
+ /// let mut i = 0;
+ /// while let Ok(permit) = tx.reserve().await {
+ /// permit.send(i);
+ /// i += 1;
+ /// }
+ /// });
+ ///
+ /// rx.close();
+ ///
+ /// while let Some(msg) = rx.recv().await {
+ /// println!("got {}", msg);
+ /// }
+ ///
+ /// // Channel closed and no messages are lost.
+ /// }
+ /// ```
pub fn close(&mut self) {
self.chan.close();
}
}
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
impl<T> Unpin for Receiver<T> {}
cfg_stream! {
@@ -228,7 +255,7 @@ cfg_stream! {
type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.poll_recv(cx)
+ self.chan.recv(cx)
}
}
}
@@ -267,7 +294,7 @@ impl<T> Sender<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(1);
+ /// let (tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
@@ -283,17 +310,13 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
- pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
- use crate::future::poll_fn;
-
- if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
- return Err(SendError(value));
- }
-
- match self.try_send(value) {
- Ok(()) => Ok(()),
- Err(TrySendError::Full(_)) => unreachable!(),
- Err(TrySendError::Closed(value)) => Err(SendError(value)),
+ pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
+ match self.reserve().await {
+ Ok(permit) => {
+ permit.send(value);
+ Ok(())
+ }
+ Err(_) => Err(SendError(value)),
}
}
@@ -304,9 +327,6 @@ impl<T> Sender<T> {
/// with [`send`], this function has two failure cases instead of one (one for
/// disconnection, one for a full buffer).
///
- /// This function may be paired with [`poll_ready`] in order to wait for
- /// channel capacity before trying to send a value.
- ///
/// # Errors
///
/// If the channel capacity has been reached, i.e., the channel has `n`
@@ -318,7 +338,6 @@ impl<T> Sender<T> {
/// an error. The error includes the value passed to `send`.
///
/// [`send`]: Sender::send
- /// [`poll_ready`]: Sender::poll_ready
/// [`channel`]: channel
/// [`close`]: Receiver::close
///
@@ -330,8 +349,8 @@ impl<T> Sender<T> {
/// #[tokio::main]
/// async fn main() {
/// // Create a channel with buffer size 1
- /// let (mut tx1, mut rx) = mpsc::channel(1);
- /// let mut tx2 = tx1.clone();
+ /// let (tx1, mut rx) = mpsc::channel(1);
+ /// let tx2 = tx1.clone();
///
/// tokio::spawn(async move {
/// tx1.send(1).await.unwrap();
@@ -359,8 +378,15 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
- pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
- self.chan.try_send(message)?;
+ pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
+ }
+
+ // Send the message
+ self.chan.send(message);
Ok(())
}
@@ -392,7 +418,7 @@ impl<T> Sender<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(1);
+ /// let (tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
@@ -412,27 +438,22 @@ impl<T> Sender<T> {
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub async fn send_timeout(
- &mut self,
+ &self,
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
- use crate::future::poll_fn;
-
- match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await {
+ let permit = match crate::time::timeout(timeout, self.reserve()).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Ok(Err(_)) => {
return Err(SendTimeoutError::Closed(value));
}
- Ok(_) => {}
- }
+ Ok(Ok(permit)) => permit,
+ };
- match self.try_send(value) {
- Ok(()) => Ok(()),
- Err(TrySendError::Full(_)) => unreachable!(),
- Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)),
- }
+ permit.send(value);
+ Ok(())
}
/// Blocking send to call outside of asynchronous contexts.
@@ -450,7 +471,7 @@ impl<T> Sender<T> {
/// use tokio::sync::mpsc;
///
/// fn main() {
- /// let (mut tx, mut rx) = mpsc::channel::<u8>(1);
+ /// let (tx, mut rx) = mpsc::channel::<u8>(1);
///
/// let sync_code = thread::spawn(move || {
/// tx.blocking_send(10).unwrap();
@@ -462,92 +483,139 @@ impl<T> Sender<T> {
/// sync_code.join().unwrap()
/// }
/// ```
- pub fn blocking_send(&mut self, value: T) -> Result<(), SendError<T>> {
+ pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
let mut enter_handle = crate::runtime::enter::enter(false);
enter_handle.block_on(self.send(value)).unwrap()
}
- /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
+ /// Wait for channel capacity. Once capacity to send one message is
+ /// available, it is reserved for the caller.
///
- /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
- /// slot becomes available.
+ /// If the channel is full, the function waits for the number of unreceived
+ /// messages to become less than the channel capacity. Capacity to send one
+ /// message is reserved for the caller. A [`Permit`] is returned to track
+ /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
+ /// reserved capacity.
///
- /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
- /// the channel has since been closed. To provide this guarantee, the channel reserves one slot
- /// in the channel for the coming send. This reserved slot is not available to other `Sender`
- /// instances, so you need to be careful to not end up with deadlocks by blocking after calling
- /// `poll_ready` but before sending an element.
+ /// Dropping [`Permit`] without sending a message releases the capacity back
+ /// to the channel.
///
- /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
- /// can use [`disarm`](Sender::disarm) to release the reserved slot.
+ /// [`Permit`]: Permit
+ /// [`send`]: Permit::send
///
- /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
- /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
- /// is closed.
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
- self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
- }
-
- /// Undo a successful call to `poll_ready`.
+ /// # Examples
///
- /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
- /// channel to make room for the coming send. `disarm` allows you to give up that slot if you
- /// decide you do not wish to send an item after all. After calling `disarm`, you must call
- /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
///
- /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
- /// not previously called, or did not succeed).
+ /// // Reserve capacity
+ /// let permit = tx.reserve().await.unwrap();
///
- /// # Motivation
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
///
- /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
- /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
- /// take up all the slots of the channel, and prevent active senders from getting any requests
- /// through. Consider this code that forwards from one channel to another:
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
///
- /// ```rust,ignore
- /// loop {
- /// ready!(tx.poll_ready(cx))?;
- /// if let Some(item) = ready!(rx.poll_recv(cx)) {
- /// tx.try_send(item)?;
- /// } else {
- /// break;
- /// }
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
/// }
/// ```
+ pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
+ match self.chan.semaphore().0.acquire(1).await {
+ Ok(_) => {}
+ Err(_) => return Err(SendError(())),
+ }
+
+ Ok(Permit { chan: &self.chan })
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+// ===== impl Permit =====
+
+impl<T> Permit<'_, T> {
+ /// Sends a value using the reserved capacity.
+ ///
+ /// Capacity for the message has already been reserved. The message is sent
+ /// to the receiver and the permit is consumed. The operation will succeed
+ /// even if the receiver half has been closed. See [`Receiver::close`] for
+ /// more details on performing a clean shutdown.
+ ///
+ /// [`Receiver::close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
///
- /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
- /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
- /// they are effectively each reducing the channel's capacity by 1. If enough of these
- /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
- /// for them through `poll_ready`, and the system will deadlock.
- ///
- /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
- /// you have to block. We can then fix the code above by writing:
- ///
- /// ```rust,ignore
- /// loop {
- /// ready!(tx.poll_ready(cx))?;
- /// let item = rx.poll_recv(cx);
- /// if let Poll::Ready(Ok(_)) = item {
- /// // we're going to send the item below, so don't disarm
- /// } else {
- /// // give up our send slot, we won't need it for a while
- /// tx.disarm();
- /// }
- /// if let Some(item) = ready!(item) {
- /// tx.try_send(item)?;
- /// } else {
- /// break;
- /// }
+ /// // Send a message on the permit
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
/// }
/// ```
- pub fn disarm(&mut self) -> bool {
- if self.chan.is_ready() {
- self.chan.disarm();
- true
- } else {
- false
+ pub fn send(self, value: T) {
+ use std::mem;
+
+ self.chan.send(value);
+
+ // Avoid the drop logic
+ mem::forget(self);
+ }
+}
+
+impl<T> Drop for Permit<'_, T> {
+ fn drop(&mut self) {
+ use chan::Semaphore;
+
+ let semaphore = self.chan.semaphore();
+
+ // Add the permit back to the semaphore
+ semaphore.add_permit();
+
+ if semaphore.is_closed() && semaphore.is_idle() {
+ self.chan.wake_rx();
}
}
}
+
+impl<T> fmt::Debug for Permit<'_, T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Permit")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 0a53cda2..2d3f0149 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
-use crate::sync::mpsc::error::{ClosedError, TryRecvError};
-use crate::sync::mpsc::{error, list};
+use crate::sync::mpsc::error::TryRecvError;
+use crate::sync::mpsc::list;
use std::fmt;
use std::process;
@@ -12,21 +12,13 @@ use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
/// Channel sender
-pub(crate) struct Tx<T, S: Semaphore> {
+pub(crate) struct Tx<T, S> {
inner: Arc<Chan<T, S>>,
- permit: S::Permit,
}
-impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
-where
- S::Permit: fmt::Debug,
- S: fmt::Debug,
-{
+impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Tx")
- .field("inner", &self.inner)
- .field("permit", &self.permit)
- .finish()
+ fmt.debug_struct("Tx").field("inner", &self.inner).finish()
}
}
@@ -35,71 +27,20 @@ pub(crate) struct Rx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
}
-impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
-where
- S: fmt::Debug,
-{
+impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Rx").field("inner", &self.inner).finish()
}
}
-#[derive(Debug, Eq, PartialEq)]
-pub(crate) enum TrySendError {
- Closed,
- Full,
-}
-
-impl<T> From<(T, TrySendError)> for error::SendError<T> {
- fn from(src: (T, TrySendError)) -> error::SendError<T> {
- match src.1 {
- TrySendError::Closed => error::SendError(src.0),
- TrySendError::Full => unreachable!(),
- }
- }
-}
-
-impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
- fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
- match src.1 {
- TrySendError::Closed => error::TrySendError::Closed(src.0),
- TrySendError::Full => error::TrySendError::Full(src.0),
- }
- }
-}
-
pub(crate) trait Semaphore {
- type Permit;
-
- fn new_permit() -> Self::Permit;
-
- /// The permit is dropped without a value being sent. In this case, the
- /// permit must be returned to the semaphore.
- ///
- /// # Return
- ///
- /// Returns true if the permit was acquired.
- fn drop_permit(&self, permit: &mut Self::Permit) -> bool;
-
fn is_idle(&self) -> bool;
fn add_permit(&self);
- fn poll_acquire(
- &self,
- cx: &mut Context<'_>,
- permit: &mut Self::Permit,
- ) -> Poll<Result<(), ClosedError>>;
-
- fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
-
- /// A value was sent into the channel and the permit held by `tx` is
- /// dropped. In this case, the permit should not immeditely be returned to
- /// the semaphore. Instead, the permit is returnred to the semaphore once
- /// the sent value is read by the rx handle.
- fn forget(&self, permit: &mut Self::Permit);
-
fn close(&self);
+
+ fn is_closed(&self) -> bool;
}
struct Chan<T, S> {
@@ -157,10 +98,7 @@ impl<T> fmt::Debug for RxFields<T> {
unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
-pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
-where
- S: Semaphore,
-{
+pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();
let chan = Arc::new(Chan {
@@ -179,48 +117,27 @@ where
// ===== impl Tx =====
-impl<T, S> Tx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S> Tx<T, S> {
fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
- Tx {
- inner: chan,
- permit: S::new_permit(),
- }
+ Tx { inner: chan }
}
- pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
- self.inner.semaphore.poll_acquire(cx, &mut self.permit)
- }
-
- pub(crate) fn disarm(&mut self) {
- // TODO: should this error if not acquired?
- self.inner.semaphore.drop_permit(&mut self.permit);
+ pub(super) fn semaphore(&self) -> &S {
+ &self.inner.semaphore
}
/// Send a message and notify the receiver.
- pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
- self.inner.try_send(value, &mut self.permit)
- }
-}
-
-impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
- pub(crate) fn is_ready(&self) -> bool {
- self.permit.is_acquired()
+ pub(crate) fn send(&self, value: T) {
+ self.inner.send(value);
}
-}
-impl<T> Tx<T, AtomicUsize> {
- pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
- self.inner.try_send(value, &mut ())
+ /// Wake the receive half
+ pub(crate) fn wake_rx(&self) {
+ self.inner.rx_waker.wake();
}
}
-impl<T, S> Clone for Tx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
// strong ref to `self`, preventing a concurrent decrement to zero.
@@ -228,22 +145,12 @@ where
Tx {
inner: self.inner.clone(),
- permit: S::new_permit(),
}
}
}
-impl<T, S> Drop for Tx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S> Drop for Tx<T, S> {
fn drop(&mut self) {
- let notify = self.inner.semaphore.drop_permit(&mut self.permit);
-
- if notify && self.inner.semaphore.is_idle() {
- self.inner.rx_waker.wake();
- }
-
if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
return;
}
@@ -252,16 +159,13 @@ where
self.inner.tx.close();
// Notify the receiver
- self.inner.rx_waker.wake();
+ self.wake_rx();
}
}
// ===== impl Rx =====
-impl<T, S> Rx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S: Semaphore> Rx<T, S> {
fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
Rx { inner: chan }
}
@@ -349,10 +253,7 @@ where
}
}
-impl<T, S> Drop for Rx<T, S>
-where
- S: Semaphore,
-{
+impl<T, S: Semaphore> Drop for Rx<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
@@ -370,25 +271,13 @@ where
// ===== impl Chan =====
-impl<T, S> Chan<T, S>
-where
- S: Semaphore,
-{
- fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
- if let Err(e) = self.semaphore.try_acquire(permit) {
- return Err((value, e));
- }
-
+impl<T, S> Chan<T, S> {
+ fn send(&self, value: T) {
// Push the value
self.tx.push(value);
// Notify the rx task
self.rx_waker.wake();
-
- // Release the permit
- self.semaphore.forget(permit);
-
- Ok(())
}
}
@@ -407,74 +296,24 @@ impl<T, S> Drop for Chan<T, S> {
}
}
-use crate::sync::semaphore_ll::TryAcquireError;
-
-impl From<TryAcquireError> for TrySendError {
- fn from(src: TryAcquireError) -> TrySendError {
- if src.is_closed() {
- TrySendError::Closed
- } else if src.is_no_permits() {
- TrySendError::Full
- } else {
- unreachable!();
- }
- }
-}
-
// ===== impl Semaphore for (::Semaphore, capacity) =====
-use crate::sync::semaphore_ll::Permit;
-
-impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
- type Permit = Permit;
-
- fn new_permit() -> Permit {
- Permit::new()
- }
-
- fn drop_permit(&self, permit: &mut Permit) -> bool {
- let ret = permit.is_acquired();
- permit.release(1, &self.0);
- ret
- }
-
+impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
fn add_permit(&self) {
- self.0.add_permits(1)
+ self.0.release(1)
}
fn is_idle(&self) -> bool {
self.0.available_permits() == self.1
}
- fn poll_acquire(
- &self,
- cx: &mut Context<'_>,
- permit: &mut Permit,
- ) -> Poll<Result<(), ClosedError>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- permit
- .poll_acquire(cx, 1, &self.0)
- .map_err(|_| ClosedError::new())
- .map(move |r| {
- coop.made_progress();
- r
- })
- }
-
- fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
- permit.try_acquire(1, &self.0)?;
- Ok(())
- }
-
- fn forget(&self, permit: &mut Self::Permit) {
- permit.forget(1);
- }
-
fn close(&self) {
self.0.close();
}
+
+ fn is_closed(&self) -> bool {
+ self.0.is_closed()
+ }
}
// ===== impl Semaphore for AtomicUsize =====
@@ -483,14 +322,6 @@ use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;
impl Semaphore for AtomicUsize {
- type Permit = ();
-
- fn new_permit() {}
-
- fn drop_permit(&self, _permit: &mut ()) -> bool {</