summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/bounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs338
1 files changed, 203 insertions, 135 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 14e4731a..2d2006d5 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,6 +1,6 @@
+use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
-use crate::sync::semaphore_ll as semaphore;
+use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
@@ -8,6 +8,7 @@ cfg_time! {
}
use std::fmt;
+#[cfg(any(feature = "signal", feature = "process", feature = "stream"))]
use std::task::{Context, Poll};
/// Send values to the associated `Receiver`.
@@ -17,20 +18,14 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
-impl<T> Clone for Sender<T> {
- fn clone(&self) -> Self {
- Sender {
- chan: self.chan.clone(),
- }
- }
-}
-
-impl<T> fmt::Debug for Sender<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Sender")
- .field("chan", &self.chan)
- .finish()
- }
+/// Permit to send one value into the channel.
+///
+/// `Permit` values are returned by [`Sender::reserve()`] and are used to
+/// guarantee channel capacity before generating a message to send.
+///
+/// [`Sender::reserve()`]: Sender::reserve
+pub struct Permit<'a, T> {
+ chan: &'a chan::Tx<T, Semaphore>,
}
/// Receive values from the associated `Sender`.
@@ -41,14 +36,6 @@ pub struct Receiver<T> {
chan: chan::Rx<T, Semaphore>,
}
-impl<T> fmt::Debug for Receiver<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Receiver")
- .field("chan", &self.chan)
- .finish()
- }
-}
-
/// Creates a bounded mpsc channel for communicating between asynchronous tasks
/// with backpressure.
///
@@ -77,7 +64,7 @@ impl<T> fmt::Debug for Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
-/// let (mut tx, mut rx) = mpsc::channel(100);
+/// let (tx, mut rx) = mpsc::channel(100);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
@@ -125,7 +112,7 @@ impl<T> Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(100);
+ /// let (tx, mut rx) = mpsc::channel(100);
///
/// tokio::spawn(async move {
/// tx.send("hello").await.unwrap();
@@ -143,7 +130,7 @@ impl<T> Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(100);
+ /// let (tx, mut rx) = mpsc::channel(100);
///
/// tx.send("hello").await.unwrap();
/// tx.send("world").await.unwrap();
@@ -154,12 +141,11 @@ impl<T> Receiver<T> {
/// ```
pub async fn recv(&mut self) -> Option<T> {
use crate::future::poll_fn;
-
- poll_fn(|cx| self.poll_recv(cx)).await
+ poll_fn(|cx| self.chan.recv(cx)).await
}
- #[doc(hidden)] // TODO: document
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ #[cfg(any(feature = "signal", feature = "process"))]
+ pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -178,7 +164,7 @@ impl<T> Receiver<T> {
/// use tokio::sync::mpsc;
///
/// fn main() {
- /// let (mut tx, mut rx) = mpsc::channel::<u8>(10);
+ /// let (tx, mut rx) = mpsc::channel::<u8>(10);
///
/// let sync_code = thread::spawn(move || {
/// assert_eq!(Some(10), rx.blocking_recv());
@@ -215,12 +201,53 @@ impl<T> Receiver<T> {
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
- /// still enabling the receiver to drain messages that are buffered.
+ /// still enabling the receiver to drain messages that are buffered. Any
+ /// outstanding [`Permit`] values will still be able to send messages.
+ ///
+ /// In order to guarantee no messages are dropped, after calling `close()`,
+ /// `recv()` must be called until `None` is returned.
+ ///
+ /// [`Permit`]: Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(20);
+ ///
+ /// tokio::spawn(async move {
+ /// let mut i = 0;
+ /// while let Ok(permit) = tx.reserve().await {
+ /// permit.send(i);
+ /// i += 1;
+ /// }
+ /// });
+ ///
+ /// rx.close();
+ ///
+ /// while let Some(msg) = rx.recv().await {
+ /// println!("got {}", msg);
+ /// }
+ ///
+ /// // Channel closed and no messages are lost.
+ /// }
+ /// ```
pub fn close(&mut self) {
self.chan.close();
}
}
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
impl<T> Unpin for Receiver<T> {}
cfg_stream! {
@@ -228,7 +255,7 @@ cfg_stream! {
type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.poll_recv(cx)
+ self.chan.recv(cx)
}
}
}
@@ -267,7 +294,7 @@ impl<T> Sender<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(1);
+ /// let (tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
@@ -283,17 +310,13 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
- pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
- use crate::future::poll_fn;
-
- if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
- return Err(SendError(value));
- }
-
- match self.try_send(value) {
- Ok(()) => Ok(()),
- Err(TrySendError::Full(_)) => unreachable!(),
- Err(TrySendError::Closed(value)) => Err(SendError(value)),
+ pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
+ match self.reserve().await {
+ Ok(permit) => {
+ permit.send(value);
+ Ok(())
+ }
+ Err(_) => Err(SendError(value)),
}
}
@@ -304,9 +327,6 @@ impl<T> Sender<T> {
/// with [`send`], this function has two failure cases instead of one (one for
/// disconnection, one for a full buffer).
///
- /// This function may be paired with [`poll_ready`] in order to wait for
- /// channel capacity before trying to send a value.
- ///
/// # Errors
///
/// If the channel capacity has been reached, i.e., the channel has `n`
@@ -318,7 +338,6 @@ impl<T> Sender<T> {
/// an error. The error includes the value passed to `send`.
///
/// [`send`]: Sender::send
- /// [`poll_ready`]: Sender::poll_ready
/// [`channel`]: channel
/// [`close`]: Receiver::close
///
@@ -330,8 +349,8 @@ impl<T> Sender<T> {
/// #[tokio::main]
/// async fn main() {
/// // Create a channel with buffer size 1
- /// let (mut tx1, mut rx) = mpsc::channel(1);
- /// let mut tx2 = tx1.clone();
+ /// let (tx1, mut rx) = mpsc::channel(1);
+ /// let tx2 = tx1.clone();
///
/// tokio::spawn(async move {
/// tx1.send(1).await.unwrap();
@@ -359,8 +378,15 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
- pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
- self.chan.try_send(message)?;
+ pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
+ }
+
+ // Send the message
+ self.chan.send(message);
Ok(())
}
@@ -392,7 +418,7 @@ impl<T> Sender<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(1);
+ /// let (tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
@@ -412,27 +438,22 @@ impl<T> Sender<T> {
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub async fn send_timeout(
- &mut self,
+ &self,
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
- use crate::future::poll_fn;
-
- match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await {
+ let permit = match crate::time::timeout(timeout, self.reserve()).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Ok(Err(_)) => {
return Err(SendTimeoutError::Closed(value));
}
- Ok(_) => {}
- }
+ Ok(Ok(permit)) => permit,
+ };
- match self.try_send(value) {
- Ok(()) => Ok(()),
- Err(TrySendError::Full(_)) => unreachable!(),
- Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)),
- }
+ permit.send(value);
+ Ok(())
}
/// Blocking send to call outside of asynchronous contexts.
@@ -450,7 +471,7 @@ impl<T> Sender<T> {
/// use tokio::sync::mpsc;
///
/// fn main() {
- /// let (mut tx, mut rx) = mpsc::channel::<u8>(1);
+ /// let (tx, mut rx) = mpsc::channel::<u8>(1);
///
/// let sync_code = thread::spawn(move || {
/// tx.blocking_send(10).unwrap();
@@ -462,92 +483,139 @@ impl<T> Sender<T> {
/// sync_code.join().unwrap()
/// }
/// ```
- pub fn blocking_send(&mut self, value: T) -> Result<(), SendError<T>> {
+ pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
let mut enter_handle = crate::runtime::enter::enter(false);
enter_handle.block_on(self.send(value)).unwrap()
}
- /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
+ /// Wait for channel capacity. Once capacity to send one message is
+ /// available, it is reserved for the caller.
///
- /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
- /// slot becomes available.
+ /// If the channel is full, the function waits for the number of unreceived
+ /// messages to become less than the channel capacity. Capacity to send one
+ /// message is reserved for the caller. A [`Permit`] is returned to track
+ /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
+ /// reserved capacity.
///
- /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
- /// the channel has since been closed. To provide this guarantee, the channel reserves one slot
- /// in the channel for the coming send. This reserved slot is not available to other `Sender`
- /// instances, so you need to be careful to not end up with deadlocks by blocking after calling
- /// `poll_ready` but before sending an element.
+ /// Dropping [`Permit`] without sending a message releases the capacity back
+ /// to the channel.
///
- /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
- /// can use [`disarm`](Sender::disarm) to release the reserved slot.
+ /// [`Permit`]: Permit
+ /// [`send`]: Permit::send
///
- /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
- /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
- /// is closed.
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
- self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
- }
-
- /// Undo a successful call to `poll_ready`.
+ /// # Examples
///
- /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
- /// channel to make room for the coming send. `disarm` allows you to give up that slot if you
- /// decide you do not wish to send an item after all. After calling `disarm`, you must call
- /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
///
- /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
- /// not previously called, or did not succeed).
+ /// // Reserve capacity
+ /// let permit = tx.reserve().await.unwrap();
///
- /// # Motivation
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
///
- /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
- /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
- /// take up all the slots of the channel, and prevent active senders from getting any requests
- /// through. Consider this code that forwards from one channel to another:
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
///
- /// ```rust,ignore
- /// loop {
- /// ready!(tx.poll_ready(cx))?;
- /// if let Some(item) = ready!(rx.poll_recv(cx)) {
- /// tx.try_send(item)?;
- /// } else {
- /// break;
- /// }
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
/// }
/// ```
+ pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
+ match self.chan.semaphore().0.acquire(1).await {
+ Ok(_) => {}
+ Err(_) => return Err(SendError(())),
+ }
+
+ Ok(Permit { chan: &self.chan })
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+// ===== impl Permit =====
+
+impl<T> Permit<'_, T> {
+ /// Sends a value using the reserved capacity.
+ ///
+ /// Capacity for the message has already been reserved. The message is sent
+ /// to the receiver and the permit is consumed. The operation will succeed
+ /// even if the receiver half has been closed. See [`Receiver::close`] for
+ /// more details on performing a clean shutdown.
+ ///
+ /// [`Receiver::close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
///
- /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
- /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
- /// they are effectively each reducing the channel's capacity by 1. If enough of these
- /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
- /// for them through `poll_ready`, and the system will deadlock.
- ///
- /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
- /// you have to block. We can then fix the code above by writing:
- ///
- /// ```rust,ignore
- /// loop {
- /// ready!(tx.poll_ready(cx))?;
- /// let item = rx.poll_recv(cx);
- /// if let Poll::Ready(Ok(_)) = item {
- /// // we're going to send the item below, so don't disarm
- /// } else {
- /// // give up our send slot, we won't need it for a while
- /// tx.disarm();
- /// }
- /// if let Some(item) = ready!(item) {
- /// tx.try_send(item)?;
- /// } else {
- /// break;
- /// }
+ /// // Send a message on the permit
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
/// }
/// ```
- pub fn disarm(&mut self) -> bool {
- if self.chan.is_ready() {
- self.chan.disarm();
- true
- } else {
- false
+ pub fn send(self, value: T) {
+ use std::mem;
+
+ self.chan.send(value);
+
+ // Avoid the drop logic
+ mem::forget(self);
+ }
+}
+
+impl<T> Drop for Permit<'_, T> {
+ fn drop(&mut self) {
+ use chan::Semaphore;
+
+ let semaphore = self.chan.semaphore();
+
+ // Add the permit back to the semaphore
+ semaphore.add_permit();
+
+ if semaphore.is_closed() && semaphore.is_idle() {
+ self.chan.wake_rx();
}
}
}
+
+impl<T> fmt::Debug for Permit<'_, T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Permit")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}