diff options
author | Carl Lerche <me@carllerche.com> | 2020-09-24 17:26:38 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-24 17:26:38 -0700 |
commit | cf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch) | |
tree | 39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/src | |
parent | 4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff) |
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore.
This enables using `Sender` with `&self`.
Instead of using `Sender::poll_ready` to ensure capacity and updating
the `Sender` state, `async fn Sender::reserve()` is added. This function
returns a `Permit` value representing the reserved capacity.
Fixes: #2637
Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/signal/unix.rs | 30 | ||||
-rw-r--r-- | tokio/src/stream/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/stream/stream_map.rs | 4 | ||||
-rw-r--r-- | tokio/src/sync/batch_semaphore.rs | 10 | ||||
-rw-r--r-- | tokio/src/sync/mod.rs | 9 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 338 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 268 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/error.rs | 20 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 39 | ||||
-rw-r--r-- | tokio/src/sync/semaphore_ll.rs | 1221 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_mpsc.rs | 14 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_semaphore_ll.rs | 192 | ||||
-rw-r--r-- | tokio/src/sync/tests/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/tests/semaphore_ll.rs | 470 | ||||
-rw-r--r-- | tokio/src/util/linked_list.rs | 20 |
16 files changed, 306 insertions, 2337 deletions
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 45a091d7..30a05872 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -391,35 +391,7 @@ impl Signal { poll_fn(|cx| self.poll_recv(cx)).await } - /// Polls to receive the next signal notification event, outside of an - /// `async` context. - /// - /// `None` is returned if no more events can be received by this stream. - /// - /// # Examples - /// - /// Polling from a manually implemented future - /// - /// ```rust,no_run - /// use std::pin::Pin; - /// use std::future::Future; - /// use std::task::{Context, Poll}; - /// use tokio::signal::unix::Signal; - /// - /// struct MyFuture { - /// signal: Signal, - /// } - /// - /// impl Future for MyFuture { - /// type Output = Option<()>; - /// - /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - /// println!("polling MyFuture"); - /// self.signal.poll_recv(cx) - /// } - /// } - /// ``` - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { self.rx.poll_recv(cx) } } diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 6a99d9d8..59e1482f 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -270,8 +270,8 @@ pub trait StreamExt: Stream { /// # #[tokio::main(basic_scheduler)] /// async fn main() { /// # time::pause(); - /// let (mut tx1, rx1) = mpsc::channel(10); - /// let (mut tx2, rx2) = mpsc::channel(10); + /// let (tx1, rx1) = mpsc::channel(10); + /// let (tx2, rx2) = mpsc::channel(10); /// /// let mut rx = rx1.merge(rx2); /// diff --git a/tokio/src/stream/stream_map.rs b/tokio/src/stream/stream_map.rs index 2f60ea4d..a1c80f15 100644 --- a/tokio/src/stream/stream_map.rs +++ b/tokio/src/stream/stream_map.rs @@ -57,8 +57,8 @@ use std::task::{Context, Poll}; /// /// #[tokio::main] /// async fn main() { -/// let (mut tx1, rx1) = mpsc::channel(10); -/// let (mut tx2, rx2) = mpsc::channel(10); +/// let (tx1, rx1) = mpsc::channel(10); +/// let (tx2, rx2) = mpsc::channel(10); /// /// tokio::spawn(async move { /// tx1.send(1).await.unwrap(); diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index a1048ca3..9f324f9c 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -165,7 +165,6 @@ impl Semaphore { /// permits and notifies all pending waiters. // This will be used once the bounded MPSC is updated to use the new // semaphore implementation. - #[allow(dead_code)] pub(crate) fn close(&self) { let mut waiters = self.waiters.lock().unwrap(); // If the semaphore's permits counter has enough permits for an @@ -185,6 +184,11 @@ impl Semaphore { } } + /// Returns true if the semaphore is closed + pub(crate) fn is_closed(&self) -> bool { + self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED + } + pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { assert!( num_permits as usize <= Self::MAX_PERMITS, @@ -194,8 +198,8 @@ impl Semaphore { let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; let mut curr = self.permits.load(Acquire); loop { - // Has the semaphore closed?git - if curr & Self::CLOSED > 0 { + // Has the semaphore closed? + if curr & Self::CLOSED == Self::CLOSED { return Err(TryAcquireError::Closed); } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 4c069467..6531931b 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -106,7 +106,7 @@ //! //! #[tokio::main] //! async fn main() { -//! let (mut tx, mut rx) = mpsc::channel(100); +//! let (tx, mut rx) = mpsc::channel(100); //! //! tokio::spawn(async move { //! for i in 0..10 { @@ -150,7 +150,7 @@ //! for _ in 0..10 { //! // Each task needs its own `tx` handle. This is done by cloning the //! // original handle. -//! let mut tx = tx.clone(); +//! let tx = tx.clone(); //! //! tokio::spawn(async move { //! tx.send(&b"data to write"[..]).await.unwrap(); @@ -213,7 +213,7 @@ //! //! // Spawn tasks that will send the increment command. //! for _ in 0..10 { -//! let mut cmd_tx = cmd_tx.clone(); +//! let cmd_tx = cmd_tx.clone(); //! //! join_handles.push(tokio::spawn(async move { //! let (resp_tx, resp_rx) = oneshot::channel(); @@ -443,7 +443,6 @@ cfg_sync! { pub mod oneshot; pub(crate) mod batch_semaphore; - pub(crate) mod semaphore_ll; mod semaphore; pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; @@ -473,7 +472,7 @@ cfg_not_sync! { cfg_signal_internal! { pub(crate) mod mpsc; - pub(crate) mod semaphore_ll; + pub(crate) mod batch_semaphore; } } diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 14e4731a..2d2006d5 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,6 +1,6 @@ +use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; -use crate::sync::semaphore_ll as semaphore; +use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; cfg_time! { use crate::sync::mpsc::error::SendTimeoutError; @@ -8,6 +8,7 @@ cfg_time! { } use std::fmt; +#[cfg(any(feature = "signal", feature = "process", feature = "stream"))] use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. @@ -17,20 +18,14 @@ pub struct Sender<T> { chan: chan::Tx<T, Semaphore>, } -impl<T> Clone for Sender<T> { - fn clone(&self) -> Self { - Sender { - chan: self.chan.clone(), - } - } -} - -impl<T> fmt::Debug for Sender<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Sender") - .field("chan", &self.chan) - .finish() - } +/// Permit to send one value into the channel. +/// +/// `Permit` values are returned by [`Sender::reserve()`] and are used to +/// guarantee channel capacity before generating a message to send. +/// +/// [`Sender::reserve()`]: Sender::reserve +pub struct Permit<'a, T> { + chan: &'a chan::Tx<T, Semaphore>, } /// Receive values from the associated `Sender`. @@ -41,14 +36,6 @@ pub struct Receiver<T> { chan: chan::Rx<T, Semaphore>, } -impl<T> fmt::Debug for Receiver<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Receiver") - .field("chan", &self.chan) - .finish() - } -} - /// Creates a bounded mpsc channel for communicating between asynchronous tasks /// with backpressure. /// @@ -77,7 +64,7 @@ impl<T> fmt::Debug for Receiver<T> { /// /// #[tokio::main] /// async fn main() { -/// let (mut tx, mut rx) = mpsc::channel(100); +/// let (tx, mut rx) = mpsc::channel(100); /// /// tokio::spawn(async move { /// for i in 0..10 { @@ -125,7 +112,7 @@ impl<T> Receiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); + /// let (tx, mut rx) = mpsc::channel(100); /// /// tokio::spawn(async move { /// tx.send("hello").await.unwrap(); @@ -143,7 +130,7 @@ impl<T> Receiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); + /// let (tx, mut rx) = mpsc::channel(100); /// /// tx.send("hello").await.unwrap(); /// tx.send("world").await.unwrap(); @@ -154,12 +141,11 @@ impl<T> Receiver<T> { /// ``` pub async fn recv(&mut self) -> Option<T> { use crate::future::poll_fn; - - poll_fn(|cx| self.poll_recv(cx)).await + poll_fn(|cx| self.chan.recv(cx)).await } - #[doc(hidden)] // TODO: document - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + #[cfg(any(feature = "signal", feature = "process"))] + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -178,7 +164,7 @@ impl<T> Receiver<T> { /// use tokio::sync::mpsc; /// /// fn main() { - /// let (mut tx, mut rx) = mpsc::channel::<u8>(10); + /// let (tx, mut rx) = mpsc::channel::<u8>(10); /// /// let sync_code = thread::spawn(move || { /// assert_eq!(Some(10), rx.blocking_recv()); @@ -215,12 +201,53 @@ impl<T> Receiver<T> { /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. + /// still enabling the receiver to drain messages that are buffered. Any + /// outstanding [`Permit`] values will still be able to send messages. + /// + /// In order to guarantee no messages are dropped, after calling `close()`, + /// `recv()` must be called until `None` is returned. + /// + /// [`Permit`]: Permit + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(20); + /// + /// tokio::spawn(async move { + /// let mut i = 0; + /// while let Ok(permit) = tx.reserve().await { + /// permit.send(i); + /// i += 1; + /// } + /// }); + /// + /// rx.close(); + /// + /// while let Some(msg) = rx.recv().await { + /// println!("got {}", msg); + /// } + /// + /// // Channel closed and no messages are lost. + /// } + /// ``` pub fn close(&mut self) { self.chan.close(); } } +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Receiver") + .field("chan", &self.chan) + .finish() + } +} + impl<T> Unpin for Receiver<T> {} cfg_stream! { @@ -228,7 +255,7 @@ cfg_stream! { type Item = T; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.poll_recv(cx) + self.chan.recv(cx) } } } @@ -267,7 +294,7 @@ impl<T> Sender<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); + /// let (tx, mut rx) = mpsc::channel(1); /// /// tokio::spawn(async move { /// for i in 0..10 { @@ -283,17 +310,13 @@ impl<T> Sender<T> { /// } /// } /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> { - use crate::future::poll_fn; - - if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { - return Err(SendError(value)); - } - - match self.try_send(value) { - Ok(()) => Ok(()), - Err(TrySendError::Full(_)) => unreachable!(), - Err(TrySendError::Closed(value)) => Err(SendError(value)), + pub async fn send(&self, value: T) -> Result<(), SendError<T>> { + match self.reserve().await { + Ok(permit) => { + permit.send(value); + Ok(()) + } + Err(_) => Err(SendError(value)), } } @@ -304,9 +327,6 @@ impl<T> Sender<T> { /// with [`send`], this function has two failure cases instead of one (one for /// disconnection, one for a full buffer). /// - /// This function may be paired with [`poll_ready`] in order to wait for - /// channel capacity before trying to send a value. - /// /// # Errors /// /// If the channel capacity has been reached, i.e., the channel has `n` @@ -318,7 +338,6 @@ impl<T> Sender<T> { /// an error. The error includes the value passed to `send`. /// /// [`send`]: Sender::send - /// [`poll_ready`]: Sender::poll_ready /// [`channel`]: channel /// [`close`]: Receiver::close /// @@ -330,8 +349,8 @@ impl<T> Sender<T> { /// #[tokio::main] /// async fn main() { /// // Create a channel with buffer size 1 - /// let (mut tx1, mut rx) = mpsc::channel(1); - /// let mut tx2 = tx1.clone(); + /// let (tx1, mut rx) = mpsc::channel(1); + /// let tx2 = tx1.clone(); /// /// tokio::spawn(async move { /// tx1.send(1).await.unwrap(); @@ -359,8 +378,15 @@ impl<T> Sender<T> { /// } /// } /// ``` - pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { - self.chan.try_send(message)?; + pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)), + Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)), + } + + // Send the message + self.chan.send(message); Ok(()) } @@ -392,7 +418,7 @@ impl<T> Sender<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); + /// let (tx, mut rx) = mpsc::channel(1); /// /// tokio::spawn(async move { /// for i in 0..10 { @@ -412,27 +438,22 @@ impl<T> Sender<T> { #[cfg(feature = "time")] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] pub async fn send_timeout( - &mut self, + &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>> { - use crate::future::poll_fn; - - match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + let permit = match crate::time::timeout(timeout, self.reserve()).await { Err(_) => { return Err(SendTimeoutError::Timeout(value)); } Ok(Err(_)) => { return Err(SendTimeoutError::Closed(value)); } - Ok(_) => {} - } + Ok(Ok(permit)) => permit, + }; - match self.try_send(value) { - Ok(()) => Ok(()), - Err(TrySendError::Full(_)) => unreachable!(), - Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), - } + permit.send(value); + Ok(()) } /// Blocking send to call outside of asynchronous contexts. @@ -450,7 +471,7 @@ impl<T> Sender<T> { /// use tokio::sync::mpsc; /// /// fn main() { - /// let (mut tx, mut rx) = mpsc::channel::<u8>(1); + /// let (tx, mut rx) = mpsc::channel::<u8>(1); /// /// let sync_code = thread::spawn(move || { /// tx.blocking_send(10).unwrap(); @@ -462,92 +483,139 @@ impl<T> Sender<T> { /// sync_code.join().unwrap() /// } /// ``` - pub fn blocking_send(&mut self, value: T) -> Result<(), SendError<T>> { + pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> { let mut enter_handle = crate::runtime::enter::enter(false); enter_handle.block_on(self.send(value)).unwrap() } - /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item. + /// Wait for channel capacity. Once capacity to send one message is + /// available, it is reserved for the caller. /// - /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a - /// slot becomes available. + /// If the channel is full, the function waits for the number of unreceived + /// messages to become less than the channel capacity. Capacity to send one + /// message is reserved for the caller. A [`Permit`] is returned to track + /// the reserved capacity. The [`send`] function on [`Permit`] consumes the + /// reserved capacity. /// - /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless - /// the channel has since been closed. To provide this guarantee, the channel reserves one slot - /// in the channel for the coming send. This reserved slot is not available to other `Sender` - /// instances, so you need to be careful to not end up with deadlocks by blocking after calling - /// `poll_ready` but before sending an element. + /// Dropping [`Permit`] without sending a message releases the capacity back + /// to the channel. /// - /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you - /// can use [`disarm`](Sender::disarm) to release the reserved slot. + /// [`Permit`]: Permit + /// [`send`]: Permit::send /// - /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to - /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel - /// is closed. - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { - self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) - } - - /// Undo a successful call to `poll_ready`. + /// # Examples /// - /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the - /// channel to make room for the coming send. `disarm` allows you to give up that slot if you - /// decide you do not wish to send an item after all. After calling `disarm`, you must call - /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again. + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); /// - /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was - /// not previously called, or did not succeed). + /// // Reserve capacity + /// let permit = tx.reserve().await.unwrap(); /// - /// # Motivation + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); /// - /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers - /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may - /// take up all the slots of the channel, and prevent active senders from getting any requests - /// through. Consider this code that forwards from one channel to another: + /// // Sending on the permit succeeds + /// permit.send(456); /// - /// ```rust,ignore - /// loop { - /// ready!(tx.poll_ready(cx))?; - /// if let Some(item) = ready!(rx.poll_recv(cx)) { - /// tx.try_send(item)?; - /// } else { - /// break; - /// } + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); /// } /// ``` + pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> { + match self.chan.semaphore().0.acquire(1).await { + Ok(_) => {} + Err(_) => return Err(SendError(())), + } + + Ok(Permit { chan: &self.chan }) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { + chan: self.chan.clone(), + } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Sender") + .field("chan", &self.chan) + .finish() + } +} + +// ===== impl Permit ===== + +impl<T> Permit<'_, T> { + /// Sends a value using the reserved capacity. + /// + /// Capacity for the message has already been reserved. The message is sent + /// to the receiver and the permit is consumed. The operation will succeed + /// even if the receiver half has been closed. See [`Receiver::close`] for + /// more details on performing a clean shutdown. + /// + /// [`Receiver::close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.reserve().await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); /// - /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then - /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do, - /// they are effectively each reducing the channel's capacity by 1. If enough of these - /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot - /// for them through `poll_ready`, and the system will deadlock. - /// - /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that - /// you have to block. We can then fix the code above by writing: - /// - /// ```rust,ignore - /// loop { - /// ready!(tx.poll_ready(cx))?; - /// let item = rx.poll_recv(cx); - /// if let Poll::Ready(Ok(_)) = item { - /// // we're going to send the item below, so don't disarm - /// } else { - /// // give up our send slot, we won't need it for a while - /// tx.disarm(); - /// } - /// if let Some(item) = ready!(item) { - /// tx.try_send(item)?; - /// } else { - /// break; - /// } + /// // Send a message on the permit + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); /// } /// ``` - pub fn disarm(&mut self) -> bool { - if self.chan.is_ready() { - self.chan.disarm(); - true - } else { - false + pub fn send(self, value: T) { + use std::mem; + + self.chan.send(value); + + // Avoid the drop logic + mem::forget(self); + } +} + +impl<T> Drop for Permit<'_, T> { + fn drop(&mut self) { + use chan::Semaphore; + + let semaphore = self.chan.semaphore(); + + // Add the permit back to the semaphore + semaphore.add_permit(); + + if semaphore.is_closed() && semaphore.is_idle() { + self.chan.wake_rx(); } } } + +impl<T> fmt::Debug for Permit<'_, T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Permit") + .field("chan", &self.chan) + .finish() + } +} diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 0a53cda2..2d3f0149 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; -use crate::sync::mpsc::error::{ClosedError, TryRecvError}; -use crate::sync::mpsc::{error, list}; +use crate::sync::mpsc::error::TryRecvError; +use crate::sync::mpsc::list; use std::fmt; use std::process; @@ -12,21 +12,13 @@ use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; /// Channel sender -pub(crate) struct Tx<T, S: Semaphore> { +pub(crate) struct Tx<T, S> { inner: Arc<Chan<T, S>>, - permit: S::Permit, } -impl<T, S: Semaphore> fmt::Debug for Tx<T, S> -where - S::Permit: fmt::Debug, - S: fmt::Debug, -{ +impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Tx") - .field("inner", &self.inner) - .field("permit", &self.permit) - .finish() + fmt.debug_struct("Tx").field("inner", &self.inner).finish() } } @@ -35,71 +27,20 @@ pub(crate) struct Rx<T, S: Semaphore> { inner: Arc<Chan<T, S>>, } -impl<T, S: Semaphore> fmt::Debug for Rx<T, S> -where - S: fmt::Debug, -{ +impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Rx").field("inner", &self.inner).finish() } } -#[derive(Debug, Eq, PartialEq)] -pub(crate) enum TrySendError { - Closed, - Full, -} - -impl<T> From<(T, TrySendError)> for error::SendError<T> { - fn from(src: (T, TrySendError)) -> error::SendError<T> { - match src.1 { - TrySendError::Closed => error::SendError(src.0), - TrySendError::Full => unreachable!(), - } - } -} - -impl<T> From<(T, TrySendError)> for error::TrySendError<T> { - fn from(src: (T, TrySendError)) -> error::TrySendError<T> { - match src.1 { - TrySendError::Closed => error::TrySendError::Closed(src.0), - TrySendError::Full => error::TrySendError::Full(src.0), - } - } -} - pub(crate) trait Semaphore { - type Permit; - - fn new_permit() -> Self::Permit; - - /// The permit is dropped without a value being sent. In this case, the - /// permit must be returned to the semaphore. - /// - /// # Return - /// - /// Returns true if the permit was acquired. - fn drop_permit(&self, permit: &mut Self::Permit) -> bool; - fn is_idle(&self) -> bool; fn add_permit(&self); - fn poll_acquire( - &self, - cx: &mut Context<'_>, - permit: &mut Self::Permit, - ) -> Poll<Result<(), ClosedError>>; - - fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; - - /// A value was sent into the channel and the permit held by `tx` is - /// dropped. In this case, the permit should not immeditely be returned to - /// the semaphore. Instead, the permit is returnred to the semaphore once - /// the sent value is read by the rx handle. - fn forget(&self, permit: &mut Self::Permit); - fn close(&self); + + fn is_closed(&self) -> bool; } struct Chan<T, S> { @@ -157,10 +98,7 @@ impl<T> fmt::Debug for RxFields<T> { unsafe impl<T: Send, S: Send> Send for Chan<T, S> {} unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {} -pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) -where - S: Semaphore, -{ +pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { @@ -179,48 +117,27 @@ where // ===== impl Tx ===== -impl<T, S> Tx<T, S> -where - S: Semaphore, -{ +impl<T, S> Tx<T, S> { fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> { - Tx { - inner: chan, - permit: S::new_permit(), - } + Tx { inner: chan } } - pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { - self.inner.semaphore.poll_acquire(cx, &mut self.permit) - } - - pub(crate) fn disarm(&mut self) { - // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit); + pub(super) fn semaphore(&self) -> &S { + &self.inner.semaphore } /// Send a message and notify the receiver. - pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> { - self.inner.try_send(value, &mut self.permit) - } -} - -impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> { - pub(crate) fn is_ready(&self) -> bool { - self.permit.is_acquired() + pub(crate) fn send(&self, value: T) { + self.inner.send(value); } -} -impl<T> Tx<T, AtomicUsize> { - pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> { - self.inner.try_send(value, &mut ()) + /// Wake the receive half + pub(crate) fn wake_rx(&self) { + self.inner.rx_waker.wake(); } } -impl<T, S> Clone for Tx<T, S> -where - S: Semaphore, -{ +impl<T, S> Clone for Tx<T, S> { fn clone(&self) -> Tx<T, S> { // Using a Relaxed ordering here is sufficient as the caller holds a // strong ref to `self`, preventing a concurrent decrement to zero. @@ -228,22 +145,12 @@ where Tx { inner: self.inner.clone(), - permit: S::new_permit(), } } } -impl<T, S> Drop for Tx<T, S> -where - S: Semaphore, -{ +impl<T, S> Drop for Tx<T, S> { fn drop(&mut self) { - let notify = self.inner.semaphore.drop_permit(&mut self.permit); - - if notify && self.inner.semaphore.is_idle() { - self.inner.rx_waker.wake(); - } - if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; } @@ -252,16 +159,13 @@ where self.inner.tx.close(); // Notify the receiver - self.inner.rx_waker.wake(); + self.wake_rx(); } } // ===== impl Rx ===== -impl<T, S> Rx<T, S> -where - S: Semaphore, -{ +impl<T, S: Semaphore> Rx<T, S> { fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> { Rx { inner: chan } } @@ -349,10 +253,7 @@ where } } < |