diff options
author | Alice Ryhl <alice@ryhl.io> | 2020-10-05 18:30:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-05 09:30:48 -0700 |
commit | 242ea011891099f348b755b2ea10ec9e9ea104db (patch) | |
tree | 0c65639a8b2357a680507da308813d81afdc840f /tokio/src/sync | |
parent | 1684e1c80921f13600ee6c4576662b7b587443c6 (diff) |
sync: broadcast channel API tweaks (#2898)
Removes deprecated APIs and makes some small breaking changes.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/broadcast.rs | 280 |
1 files changed, 90 insertions, 190 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fe826290..abd120be 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -107,6 +107,7 @@ //! assert_eq!(20, rx.recv().await.unwrap()); //! assert_eq!(30, rx.recv().await.unwrap()); //! } +//! ``` use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; @@ -194,9 +195,6 @@ pub struct Receiver<T> { /// Next position to read from next: u64, - - /// Used to support the deprecated `poll_recv` fn - waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>, } /// Error returned by [`Sender::send`][Sender::send]. @@ -400,7 +398,7 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2; /// tx.send(20).unwrap(); /// } /// ``` -pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { +pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { assert!(capacity > 0, "capacity is empty"); assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); @@ -433,7 +431,6 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { let rx = Receiver { shared: shared.clone(), next: 0, - waiter: None, }; let tx = Sender { shared }; @@ -540,11 +537,7 @@ impl<T> Sender<T> { drop(tail); - Receiver { - shared, - next, - waiter: None, - } + Receiver { shared, next } } /// Returns the number of active receivers @@ -784,107 +777,7 @@ impl<T> Receiver<T> { } } -impl<T> Receiver<T> -where - T: Clone, -{ - /// Attempts to return a pending value on this receiver without awaiting. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// await on a receiver. - /// - /// Compared with [`recv`], this function has three failure cases instead of two - /// (one for closed, one for an empty buffer, one for a lagging receiver). - /// - /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have - /// dropped, indicating that no further values can be sent on the channel. - /// - /// If the [`Receiver`] handle falls behind, once the channel is full, newly - /// sent values will overwrite old values. At this point, a call to [`recv`] - /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s - /// internal cursor is updated to point to the oldest value still held by - /// the channel. A subsequent call to [`try_recv`] will return this value - /// **unless** it has been since overwritten. If there are no values to - /// receive, `Err(TryRecvError::Empty)` is returned. - /// - /// [`recv`]: crate::sync::broadcast::Receiver::recv - /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv - /// [`Receiver`]: crate::sync::broadcast::Receiver - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::broadcast; - /// - /// #[tokio::main] - /// async fn main() { - /// let (tx, mut rx) = broadcast::channel(16); - /// - /// assert!(rx.try_recv().is_err()); - /// - /// tx.send(10).unwrap(); - /// - /// let value = rx.try_recv().unwrap(); - /// assert_eq!(10, value); - /// } - /// ``` - pub fn try_recv(&mut self) -> Result<T, TryRecvError> { - let guard = self.recv_ref(None)?; - guard.clone_value().ok_or(TryRecvError::Closed) - } - - #[doc(hidden)] - #[deprecated(since = "0.2.21", note = "use async fn recv()")] - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { - use Poll::{Pending, Ready}; - - // The borrow checker prohibits calling `self.poll_ref` while passing in - // a mutable ref to a field (as it should). To work around this, - // `waiter` is first *removed* from `self` then `poll_recv` is called. - // - // However, for safety, we must ensure that `waiter` is **not** dropped. - // It could be contained in the intrusive linked list. The `Receiver` - // drop implementation handles cleanup. - // - // The guard pattern is used to ensure that, on return, even due to - // panic, the waiter node is replaced on `self`. - - struct Guard<'a, T> { - waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>, - receiver: &'a mut Receiver<T>, - } - - impl<'a, T> Drop for Guard<'a, T> { - fn drop(&mut self) { - self.receiver.waiter = self.waiter.take(); - } - } - - let waiter = self.waiter.take().or_else(|| { - Some(Box::pin(UnsafeCell::new(Waiter { - queued: false, - waker: None, - pointers: linked_list::Pointers::new(), - _p: PhantomPinned, - }))) - }); - - let guard = Guard { - waiter, - receiver: self, - }; - let res = guard - .receiver - .recv_ref(Some((&guard.waiter.as_ref().unwrap(), cx.waker()))); - - match res { - Ok(guard) => Ready(guard.clone_value().ok_or(RecvError::Closed)), - Err(TryRecvError::Closed) => Ready(Err(RecvError::Closed)), - Err(TryRecvError::Lagged(n)) => Ready(Err(RecvError::Lagged(n))), - Err(TryRecvError::Empty) => Pending, - } - } - +impl<T: Clone> Receiver<T> { /// Receives the next value for this receiver. /// /// Each [`Receiver`] handle will receive a clone of all values sent @@ -949,31 +842,97 @@ where /// assert_eq!(20, rx.recv().await.unwrap()); /// assert_eq!(30, rx.recv().await.unwrap()); /// } + /// ``` pub async fn recv(&mut self) -> Result<T, RecvError> { let fut = Recv::<_, T>::new(Borrow(self)); fut.await } -} -#[cfg(feature = "stream")] -#[doc(hidden)] -#[deprecated(since = "0.2.21", note = "use `into_stream()`")] -impl<T> crate::stream::Stream for Receiver<T> -where - T: Clone, -{ - type Item = Result<T, RecvError>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Result<T, RecvError>>> { - #[allow(deprecated)] - self.poll_recv(cx).map(|v| match v { - Ok(v) => Some(Ok(v)), - lag @ Err(RecvError::Lagged(_)) => Some(lag), - Err(RecvError::Closed) => None, - }) + /// Attempts to return a pending value on this receiver without awaiting. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// await on a receiver. + /// + /// Compared with [`recv`], this function has three failure cases instead of two + /// (one for closed, one for an empty buffer, one for a lagging receiver). + /// + /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have + /// dropped, indicating that no further values can be sent on the channel. + /// + /// If the [`Receiver`] handle falls behind, once the channel is full, newly + /// sent values will overwrite old values. At this point, a call to [`recv`] + /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s + /// internal cursor is updated to point to the oldest value still held by + /// the channel. A subsequent call to [`try_recv`] will return this value + /// **unless** it has been since overwritten. If there are no values to + /// receive, `Err(TryRecvError::Empty)` is returned. + /// + /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv + /// [`Receiver`]: crate::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = broadcast::channel(16); + /// + /// assert!(rx.try_recv().is_err()); + /// + /// tx.send(10).unwrap(); + /// + /// let value = rx.try_recv().unwrap(); + /// assert_eq!(10, value); + /// } + /// ``` + pub fn try_recv(&mut self) -> Result<T, TryRecvError> { + let guard = self.recv_ref(None)?; + guard.clone_value().ok_or(TryRecvError::Closed) + } + + /// Convert the receiver into a `Stream`. + /// + /// The conversion allows using `Receiver` with APIs that require stream + /// values. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::StreamExt; + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = broadcast::channel(128); + /// + /// tokio::spawn(async move { + /// for i in 0..10_i32 { + /// tx.send(i).unwrap(); + /// } + /// }); + /// + /// // Streams must be pinned to iterate. + /// tokio::pin! { + /// let stream = rx + /// .into_stream() + /// .filter(Result::is_ok) + /// .map(Result::unwrap) + /// .filter(|v| v % 2 == 0) + /// .map(|v| v + 1); + /// } + /// + /// while let Some(i) = stream.next().await { + /// println!("{}", i); + /// } + /// } + /// ``` + #[cfg(feature = "stream")] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> { + Recv::new(Borrow(self)) } } @@ -981,23 +940,6 @@ impl<T> Drop for Receiver<T> { fn drop(&mut self) { let mut tail = self.shared.tail.lock(); - if let Some(waiter) = &self.waiter { - // safety: tail lock is held - let queued = waiter.with(|ptr| unsafe { (*ptr).queued }); - - if queued { - // Remove the node - // - // safety: tail lock is held and the wait node is verified to be in - // the list. - unsafe { - waiter.with_mut(|ptr| { - tail.waiters.remove((&mut *ptr).into()); - }); - } - } - } - tail.rx_cnt -= 1; let until = tail.pos; @@ -1071,48 +1013,6 @@ where cfg_stream! { use futures_core::Stream; - impl<T: Clone> Receiver<T> { - /// Convert the receiver into a `Stream`. - /// - /// The conversion allows using `Receiver` with APIs that require stream - /// values. - /// - /// # Examples - /// - /// ``` - /// use tokio::stream::StreamExt; - /// use tokio::sync::broadcast; - /// - /// #[tokio::main] - /// async fn main() { - /// let (tx, rx) = broadcast::channel(128); - /// - /// tokio::spawn(async move { - /// for i in 0..10_i32 { - /// tx.send(i).unwrap(); - /// } - /// }); - /// - /// // Streams must be pinned to iterate. - /// tokio::pin! { - /// let stream = rx - /// .into_stream() - /// .filter(Result::is_ok) - /// .map(Result::unwrap) - /// .filter(|v| v % 2 == 0) - /// .map(|v| v + 1); - /// } - /// - /// while let Some(i) = stream.next().await { - /// println!("{}", i); - /// } - /// } - /// ``` - pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> { - Recv::new(Borrow(self)) - } - } - impl<R, T: Clone> Stream for Recv<R, T> where R: AsMut<Receiver<T>>, |