diff options
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 338 |
1 files changed, 203 insertions, 135 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 14e4731a..2d2006d5 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,6 +1,6 @@ +use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; -use crate::sync::semaphore_ll as semaphore; +use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; cfg_time! { use crate::sync::mpsc::error::SendTimeoutError; @@ -8,6 +8,7 @@ cfg_time! { } use std::fmt; +#[cfg(any(feature = "signal", feature = "process", feature = "stream"))] use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. @@ -17,20 +18,14 @@ pub struct Sender<T> { chan: chan::Tx<T, Semaphore>, } -impl<T> Clone for Sender<T> { - fn clone(&self) -> Self { - Sender { - chan: self.chan.clone(), - } - } -} - -impl<T> fmt::Debug for Sender<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Sender") - .field("chan", &self.chan) - .finish() - } +/// Permit to send one value into the channel. +/// +/// `Permit` values are returned by [`Sender::reserve()`] and are used to +/// guarantee channel capacity before generating a message to send. +/// +/// [`Sender::reserve()`]: Sender::reserve +pub struct Permit<'a, T> { + chan: &'a chan::Tx<T, Semaphore>, } /// Receive values from the associated `Sender`. @@ -41,14 +36,6 @@ pub struct Receiver<T> { chan: chan::Rx<T, Semaphore>, } -impl<T> fmt::Debug for Receiver<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Receiver") - .field("chan", &self.chan) - .finish() - } -} - /// Creates a bounded mpsc channel for communicating between asynchronous tasks /// with backpressure. /// @@ -77,7 +64,7 @@ impl<T> fmt::Debug for Receiver<T> { /// /// #[tokio::main] /// async fn main() { -/// let (mut tx, mut rx) = mpsc::channel(100); +/// let (tx, mut rx) = mpsc::channel(100); /// /// tokio::spawn(async move { /// for i in 0..10 { @@ -125,7 +112,7 @@ impl<T> Receiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); + /// let (tx, mut rx) = mpsc::channel(100); /// /// tokio::spawn(async move { /// tx.send("hello").await.unwrap(); @@ -143,7 +130,7 @@ impl<T> Receiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); + /// let (tx, mut rx) = mpsc::channel(100); /// /// tx.send("hello").await.unwrap(); /// tx.send("world").await.unwrap(); @@ -154,12 +141,11 @@ impl<T> Receiver<T> { /// ``` pub async fn recv(&mut self) -> Option<T> { use crate::future::poll_fn; - - poll_fn(|cx| self.poll_recv(cx)).await + poll_fn(|cx| self.chan.recv(cx)).await } - #[doc(hidden)] // TODO: document - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + #[cfg(any(feature = "signal", feature = "process"))] + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -178,7 +164,7 @@ impl<T> Receiver<T> { /// use tokio::sync::mpsc; /// /// fn main() { - /// let (mut tx, mut rx) = mpsc::channel::<u8>(10); + /// let (tx, mut rx) = mpsc::channel::<u8>(10); /// /// let sync_code = thread::spawn(move || { /// assert_eq!(Some(10), rx.blocking_recv()); @@ -215,12 +201,53 @@ impl<T> Receiver<T> { /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. + /// still enabling the receiver to drain messages that are buffered. Any + /// outstanding [`Permit`] values will still be able to send messages. + /// + /// In order to guarantee no messages are dropped, after calling `close()`, + /// `recv()` must be called until `None` is returned. + /// + /// [`Permit`]: Permit + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(20); + /// + /// tokio::spawn(async move { + /// let mut i = 0; + /// while let Ok(permit) = tx.reserve().await { + /// permit.send(i); + /// i += 1; + /// } + /// }); + /// + /// rx.close(); + /// + /// while let Some(msg) = rx.recv().await { + /// println!("got {}", msg); + /// } + /// + /// // Channel closed and no messages are lost. + /// } + /// ``` pub fn close(&mut self) { self.chan.close(); } } +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Receiver") + .field("chan", &self.chan) + .finish() + } +} + impl<T> Unpin for Receiver<T> {} cfg_stream! { @@ -228,7 +255,7 @@ cfg_stream! { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.poll_recv(cx) + self.chan.recv(cx) } } } @@ -267,7 +294,7 @@ impl<T> Sender<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); + /// let (tx, mut rx) = mpsc::channel(1); /// /// tokio::spawn(async move { /// for i in 0..10 { @@ -283,17 +310,13 @@ impl<T> Sender<T> { /// } /// } /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> { - use crate::future::poll_fn; - - if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { - return Err(SendError(value)); - } - - match self.try_send(value) { - Ok(()) => Ok(()), - Err(TrySendError::Full(_)) => unreachable!(), - Err(TrySendError::Closed(value)) => Err(SendError(value)), + pub async fn send(&self, value: T) -> Result<(), SendError<T>> { + match self.reserve().await { + Ok(permit) => { + permit.send(value); + Ok(()) + } + Err(_) => Err(SendError(value)), } } @@ -304,9 +327,6 @@ impl<T> Sender<T> { /// with [`send`], this function has two failure cases instead of one (one for /// disconnection, one for a full buffer). /// - /// This function may be paired with [`poll_ready`] in order to wait for - /// channel capacity before trying to send a value. - /// /// # Errors /// /// If the channel capacity has been reached, i.e., the channel has `n` @@ -318,7 +338,6 @@ impl<T> Sender<T> { /// an error. The error includes the value passed to `send`. /// /// [`send`]: Sender::send - /// [`poll_ready`]: Sender::poll_ready /// [`channel`]: channel /// [`close`]: Receiver::close /// @@ -330,8 +349,8 @@ impl<T> Sender<T> { /// #[tokio::main] /// async fn main() { /// // Create a channel with buffer size 1 - /// let (mut tx1, mut rx) = mpsc::channel(1); - /// let mut tx2 = tx1.clone(); + /// let (tx1, mut rx) = mpsc::channel(1); + /// let tx2 = tx1.clone(); /// /// tokio::spawn(async move { /// tx1.send(1).await.unwrap(); @@ -359,8 +378,15 @@ impl<T> Sender<T> { /// } /// } /// ``` - pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { - self.chan.try_send(message)?; + pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)), + Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)), + } + + // Send the message + self.chan.send(message); Ok(()) } @@ -392,7 +418,7 @@ impl<T> Sender<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); + /// let (tx, mut rx) = mpsc::channel(1); /// /// tokio::spawn(async move { /// for i in 0..10 { @@ -412,27 +438,22 @@ impl<T> Sender<T> { #[cfg(feature = "time")] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] pub async fn send_timeout( - &mut self, + &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>> { - use crate::future::poll_fn; - - match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + let permit = match crate::time::timeout(timeout, self.reserve()).await { Err(_) => { return Err(SendTimeoutError::Timeout(value)); } Ok(Err(_)) => { return Err(SendTimeoutError::Closed(value)); } - Ok(_) => {} - } + Ok(Ok(permit)) => permit, + }; - match self.try_send(value) { - Ok(()) => Ok(()), - Err(TrySendError::Full(_)) => unreachable!(), - Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), - } + permit.send(value); + Ok(()) } /// Blocking send to call outside of asynchronous contexts. @@ -450,7 +471,7 @@ impl<T> Sender<T> { /// use tokio::sync::mpsc; /// /// fn main() { - /// let (mut tx, mut rx) = mpsc::channel::<u8>(1); + /// let (tx, mut rx) = mpsc::channel::<u8>(1); /// /// let sync_code = thread::spawn(move || { /// tx.blocking_send(10).unwrap(); @@ -462,92 +483,139 @@ impl<T> Sender<T> { /// sync_code.join().unwrap() /// } /// ``` - pub fn blocking_send(&mut self, value: T) -> Result<(), SendError<T>> { + pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> { let mut enter_handle = crate::runtime::enter::enter(false); enter_handle.block_on(self.send(value)).unwrap() } - /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item. + /// Wait for channel capacity. Once capacity to send one message is + /// available, it is reserved for the caller. /// - /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a - /// slot becomes available. + /// If the channel is full, the function waits for the number of unreceived + /// messages to become less than the channel capacity. Capacity to send one + /// message is reserved for the caller. A [`Permit`] is returned to track + /// the reserved capacity. The [`send`] function on [`Permit`] consumes the + /// reserved capacity. /// - /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless - /// the channel has since been closed. To provide this guarantee, the channel reserves one slot - /// in the channel for the coming send. This reserved slot is not available to other `Sender` - /// instances, so you need to be careful to not end up with deadlocks by blocking after calling - /// `poll_ready` but before sending an element. + /// Dropping [`Permit`] without sending a message releases the capacity back + /// to the channel. /// - /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you - /// can use [`disarm`](Sender::disarm) to release the reserved slot. + /// [`Permit`]: Permit + /// [`send`]: Permit::send /// - /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to - /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel - /// is closed. - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { - self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) - } - - /// Undo a successful call to `poll_ready`. + /// # Examples /// - /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the - /// channel to make room for the coming send. `disarm` allows you to give up that slot if you - /// decide you do not wish to send an item after all. After calling `disarm`, you must call - /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again. + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); /// - /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was - /// not previously called, or did not succeed). + /// // Reserve capacity + /// let permit = tx.reserve().await.unwrap(); /// - /// # Motivation + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); /// - /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers - /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may - /// take up all the slots of the channel, and prevent active senders from getting any requests - /// through. Consider this code that forwards from one channel to another: + /// // Sending on the permit succeeds + /// permit.send(456); /// - /// ```rust,ignore - /// loop { - /// ready!(tx.poll_ready(cx))?; - /// if let Some(item) = ready!(rx.poll_recv(cx)) { - /// tx.try_send(item)?; - /// } else { - /// break; - /// } + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); /// } /// ``` + pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> { + match self.chan.semaphore().0.acquire(1).await { + Ok(_) => {} + Err(_) => return Err(SendError(())), + } + + Ok(Permit { chan: &self.chan }) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { + chan: self.chan.clone(), + } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Sender") + .field("chan", &self.chan) + .finish() + } +} + +// ===== impl Permit ===== + +impl<T> Permit<'_, T> { + /// Sends a value using the reserved capacity. + /// + /// Capacity for the message has already been reserved. The message is sent + /// to the receiver and the permit is consumed. The operation will succeed + /// even if the receiver half has been closed. See [`Receiver::close`] for + /// more details on performing a clean shutdown. + /// + /// [`Receiver::close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.reserve().await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); /// - /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then - /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do, - /// they are effectively each reducing the channel's capacity by 1. If enough of these - /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot - /// for them through `poll_ready`, and the system will deadlock. - /// - /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that - /// you have to block. We can then fix the code above by writing: - /// - /// ```rust,ignore - /// loop { - /// ready!(tx.poll_ready(cx))?; - /// let item = rx.poll_recv(cx); - /// if let Poll::Ready(Ok(_)) = item { - /// // we're going to send the item below, so don't disarm - /// } else { - /// // give up our send slot, we won't need it for a while - /// tx.disarm(); - /// } - /// if let Some(item) = ready!(item) { - /// tx.try_send(item)?; - /// } else { - /// break; - /// } + /// // Send a message on the permit + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); /// } /// ``` - pub fn disarm(&mut self) -> bool { - if self.chan.is_ready() { - self.chan.disarm(); - true - } else { - false + pub fn send(self, value: T) { + use std::mem; + + self.chan.send(value); + + // Avoid the drop logic + mem::forget(self); + } +} + +impl<T> Drop for Permit<'_, T> { + fn drop(&mut self) { + use chan::Semaphore; + + let semaphore = self.chan.semaphore(); + + // Add the permit back to the semaphore + semaphore.add_permit(); + + if semaphore.is_closed() && semaphore.is_idle() { + self.chan.wake_rx(); } } } + +impl<T> fmt::Debug for Permit<'_, T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Permit") + .field("chan", &self.chan) + .finish() + } +} |