summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorAlice Ryhl <alice@ryhl.io>2020-10-05 18:30:48 +0200
committerGitHub <noreply@github.com>2020-10-05 09:30:48 -0700
commit242ea011891099f348b755b2ea10ec9e9ea104db (patch)
tree0c65639a8b2357a680507da308813d81afdc840f /tokio/src/sync
parent1684e1c80921f13600ee6c4576662b7b587443c6 (diff)
sync: broadcast channel API tweaks (#2898)
Removes deprecated APIs and makes some small breaking changes.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/broadcast.rs280
1 files changed, 90 insertions, 190 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index fe826290..abd120be 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -107,6 +107,7 @@
//! assert_eq!(20, rx.recv().await.unwrap());
//! assert_eq!(30, rx.recv().await.unwrap());
//! }
+//! ```
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
@@ -194,9 +195,6 @@ pub struct Receiver<T> {
/// Next position to read from
next: u64,
-
- /// Used to support the deprecated `poll_recv` fn
- waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
}
/// Error returned by [`Sender::send`][Sender::send].
@@ -400,7 +398,7 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// tx.send(20).unwrap();
/// }
/// ```
-pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
+pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "capacity is empty");
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
@@ -433,7 +431,6 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
let rx = Receiver {
shared: shared.clone(),
next: 0,
- waiter: None,
};
let tx = Sender { shared };
@@ -540,11 +537,7 @@ impl<T> Sender<T> {
drop(tail);
- Receiver {
- shared,
- next,
- waiter: None,
- }
+ Receiver { shared, next }
}
/// Returns the number of active receivers
@@ -784,107 +777,7 @@ impl<T> Receiver<T> {
}
}
-impl<T> Receiver<T>
-where
- T: Clone,
-{
- /// Attempts to return a pending value on this receiver without awaiting.
- ///
- /// This is useful for a flavor of "optimistic check" before deciding to
- /// await on a receiver.
- ///
- /// Compared with [`recv`], this function has three failure cases instead of two
- /// (one for closed, one for an empty buffer, one for a lagging receiver).
- ///
- /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
- /// dropped, indicating that no further values can be sent on the channel.
- ///
- /// If the [`Receiver`] handle falls behind, once the channel is full, newly
- /// sent values will overwrite old values. At this point, a call to [`recv`]
- /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
- /// internal cursor is updated to point to the oldest value still held by
- /// the channel. A subsequent call to [`try_recv`] will return this value
- /// **unless** it has been since overwritten. If there are no values to
- /// receive, `Err(TryRecvError::Empty)` is returned.
- ///
- /// [`recv`]: crate::sync::broadcast::Receiver::recv
- /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
- /// [`Receiver`]: crate::sync::broadcast::Receiver
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::sync::broadcast;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let (tx, mut rx) = broadcast::channel(16);
- ///
- /// assert!(rx.try_recv().is_err());
- ///
- /// tx.send(10).unwrap();
- ///
- /// let value = rx.try_recv().unwrap();
- /// assert_eq!(10, value);
- /// }
- /// ```
- pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
- let guard = self.recv_ref(None)?;
- guard.clone_value().ok_or(TryRecvError::Closed)
- }
-
- #[doc(hidden)]
- #[deprecated(since = "0.2.21", note = "use async fn recv()")]
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
- use Poll::{Pending, Ready};
-
- // The borrow checker prohibits calling `self.poll_ref` while passing in
- // a mutable ref to a field (as it should). To work around this,
- // `waiter` is first *removed* from `self` then `poll_recv` is called.
- //
- // However, for safety, we must ensure that `waiter` is **not** dropped.
- // It could be contained in the intrusive linked list. The `Receiver`
- // drop implementation handles cleanup.
- //
- // The guard pattern is used to ensure that, on return, even due to
- // panic, the waiter node is replaced on `self`.
-
- struct Guard<'a, T> {
- waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
- receiver: &'a mut Receiver<T>,
- }
-
- impl<'a, T> Drop for Guard<'a, T> {
- fn drop(&mut self) {
- self.receiver.waiter = self.waiter.take();
- }
- }
-
- let waiter = self.waiter.take().or_else(|| {
- Some(Box::pin(UnsafeCell::new(Waiter {
- queued: false,
- waker: None,
- pointers: linked_list::Pointers::new(),
- _p: PhantomPinned,
- })))
- });
-
- let guard = Guard {
- waiter,
- receiver: self,
- };
- let res = guard
- .receiver
- .recv_ref(Some((&guard.waiter.as_ref().unwrap(), cx.waker())));
-
- match res {
- Ok(guard) => Ready(guard.clone_value().ok_or(RecvError::Closed)),
- Err(TryRecvError::Closed) => Ready(Err(RecvError::Closed)),
- Err(TryRecvError::Lagged(n)) => Ready(Err(RecvError::Lagged(n))),
- Err(TryRecvError::Empty) => Pending,
- }
- }
-
+impl<T: Clone> Receiver<T> {
/// Receives the next value for this receiver.
///
/// Each [`Receiver`] handle will receive a clone of all values sent
@@ -949,31 +842,97 @@ where
/// assert_eq!(20, rx.recv().await.unwrap());
/// assert_eq!(30, rx.recv().await.unwrap());
/// }
+ /// ```
pub async fn recv(&mut self) -> Result<T, RecvError> {
let fut = Recv::<_, T>::new(Borrow(self));
fut.await
}
-}
-#[cfg(feature = "stream")]
-#[doc(hidden)]
-#[deprecated(since = "0.2.21", note = "use `into_stream()`")]
-impl<T> crate::stream::Stream for Receiver<T>
-where
- T: Clone,
-{
- type Item = Result<T, RecvError>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Result<T, RecvError>>> {
- #[allow(deprecated)]
- self.poll_recv(cx).map(|v| match v {
- Ok(v) => Some(Ok(v)),
- lag @ Err(RecvError::Lagged(_)) => Some(lag),
- Err(RecvError::Closed) => None,
- })
+ /// Attempts to return a pending value on this receiver without awaiting.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// await on a receiver.
+ ///
+ /// Compared with [`recv`], this function has three failure cases instead of two
+ /// (one for closed, one for an empty buffer, one for a lagging receiver).
+ ///
+ /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
+ /// dropped, indicating that no further values can be sent on the channel.
+ ///
+ /// If the [`Receiver`] handle falls behind, once the channel is full, newly
+ /// sent values will overwrite old values. At this point, a call to [`recv`]
+ /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
+ /// internal cursor is updated to point to the oldest value still held by
+ /// the channel. A subsequent call to [`try_recv`] will return this value
+ /// **unless** it has been since overwritten. If there are no values to
+ /// receive, `Err(TryRecvError::Empty)` is returned.
+ ///
+ /// [`recv`]: crate::sync::broadcast::Receiver::recv
+ /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
+ /// [`Receiver`]: crate::sync::broadcast::Receiver
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::broadcast;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = broadcast::channel(16);
+ ///
+ /// assert!(rx.try_recv().is_err());
+ ///
+ /// tx.send(10).unwrap();
+ ///
+ /// let value = rx.try_recv().unwrap();
+ /// assert_eq!(10, value);
+ /// }
+ /// ```
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ let guard = self.recv_ref(None)?;
+ guard.clone_value().ok_or(TryRecvError::Closed)
+ }
+
+ /// Convert the receiver into a `Stream`.
+ ///
+ /// The conversion allows using `Receiver` with APIs that require stream
+ /// values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::stream::StreamExt;
+ /// use tokio::sync::broadcast;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = broadcast::channel(128);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10_i32 {
+ /// tx.send(i).unwrap();
+ /// }
+ /// });
+ ///
+ /// // Streams must be pinned to iterate.
+ /// tokio::pin! {
+ /// let stream = rx
+ /// .into_stream()
+ /// .filter(Result::is_ok)
+ /// .map(Result::unwrap)
+ /// .filter(|v| v % 2 == 0)
+ /// .map(|v| v + 1);
+ /// }
+ ///
+ /// while let Some(i) = stream.next().await {
+ /// println!("{}", i);
+ /// }
+ /// }
+ /// ```
+ #[cfg(feature = "stream")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
+ pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> {
+ Recv::new(Borrow(self))
}
}
@@ -981,23 +940,6 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut tail = self.shared.tail.lock();
- if let Some(waiter) = &self.waiter {
- // safety: tail lock is held
- let queued = waiter.with(|ptr| unsafe { (*ptr).queued });
-
- if queued {
- // Remove the node
- //
- // safety: tail lock is held and the wait node is verified to be in
- // the list.
- unsafe {
- waiter.with_mut(|ptr| {
- tail.waiters.remove((&mut *ptr).into());
- });
- }
- }
- }
-
tail.rx_cnt -= 1;
let until = tail.pos;
@@ -1071,48 +1013,6 @@ where
cfg_stream! {
use futures_core::Stream;
- impl<T: Clone> Receiver<T> {
- /// Convert the receiver into a `Stream`.
- ///
- /// The conversion allows using `Receiver` with APIs that require stream
- /// values.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::stream::StreamExt;
- /// use tokio::sync::broadcast;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let (tx, rx) = broadcast::channel(128);
- ///
- /// tokio::spawn(async move {
- /// for i in 0..10_i32 {
- /// tx.send(i).unwrap();
- /// }
- /// });
- ///
- /// // Streams must be pinned to iterate.
- /// tokio::pin! {
- /// let stream = rx
- /// .into_stream()
- /// .filter(Result::is_ok)
- /// .map(Result::unwrap)
- /// .filter(|v| v % 2 == 0)
- /// .map(|v| v + 1);
- /// }
- ///
- /// while let Some(i) = stream.next().await {
- /// println!("{}", i);
- /// }
- /// }
- /// ```
- pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> {
- Recv::new(Borrow(self))
- }
- }
-
impl<R, T: Clone> Stream for Recv<R, T>
where
R: AsMut<Receiver<T>>,