//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by //! all consumers. //! //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`] //! values. [`Sender`] handles are clone-able, allowing concurrent send and //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as //! long as `T` is also `Send` or `Sync` respectively. //! //! When a value is sent, **all** [`Receiver`] handles are notified and will //! receive the value. The value is stored once inside the channel and cloned on //! demand for each receiver. Once all receivers have received a clone of the //! value, the value is released from the channel. //! //! A channel is created by calling [`channel`], specifying the maximum number //! of messages the channel can retain at any given time. //! //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The //! returned [`Receiver`] will receive values sent **after** the call to //! `subscribe`. //! //! ## Lagging //! //! As sent messages must be retained until **all** [`Receiver`] handles receive //! a clone, broadcast channels are susceptible to the "slow receiver" problem. //! In this case, all but one receiver are able to receive values at the rate //! they are sent. Because one receiver is stalled, the channel starts to fill //! up. //! //! This broadcast channel implementation handles this case by setting a hard //! upper bound on the number of values the channel may retain at any given //! time. This upper bound is passed to the [`channel`] function as an argument. //! //! If a value is sent when the channel is at capacity, the oldest value //! currently held by the channel is released. This frees up space for the new //! value. Any receiver that has not yet seen the released value will return //! [`RecvError::Lagged`] the next time [`recv`] is called. //! //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is //! updated to the oldest value contained by the channel. The next call to //! [`recv`] will return this value. //! //! This behavior enables a receiver to detect when it has lagged so far behind //! that data has been dropped. The caller may decide how to respond to this: //! either by aborting its task or by tolerating lost messages and resuming //! consumption of the channel. //! //! ## Closing //! //! When **all** [`Sender`] handles have been dropped, no new values may be //! sent. At this point, the channel is "closed". Once a receiver has received //! all values retained by the channel, the next call to [`recv`] will return //! with [`RecvError::Closed`]. //! //! [`Sender`]: crate::sync::broadcast::Sender //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe //! [`Receiver`]: crate::sync::broadcast::Receiver //! [`channel`]: crate::sync::broadcast::channel //! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged //! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed //! [`recv`]: crate::sync::broadcast::Receiver::recv //! //! # Examples //! //! Basic usage //! //! ``` //! use tokio::sync::broadcast; //! //! #[tokio::main] //! async fn main() { //! let (tx, mut rx1) = broadcast::channel(16); //! let mut rx2 = tx.subscribe(); //! //! tokio::spawn(async move { //! assert_eq!(rx1.recv().await.unwrap(), 10); //! assert_eq!(rx1.recv().await.unwrap(), 20); //! }); //! //! tokio::spawn(async move { //! assert_eq!(rx2.recv().await.unwrap(), 10); //! assert_eq!(rx2.recv().await.unwrap(), 20); //! }); //! //! tx.send(10).unwrap(); //! tx.send(20).unwrap(); //! } //! ``` //! //! Handling lag //! //! ``` //! use tokio::sync::broadcast; //! //! #[tokio::main] //! async fn main() { //! let (tx, mut rx) = broadcast::channel(2); //! //! tx.send(10).unwrap(); //! tx.send(20).unwrap(); //! tx.send(30).unwrap(); //! //! // The receiver lagged behind //! assert!(rx.recv().await.is_err()); //! //! // At this point, we can abort or continue with lost messages //! //! assert_eq!(20, rx.recv().await.unwrap()); //! assert_eq!(30, rx.recv().await.unwrap()); //! } //! ``` use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use crate::util::linked_list::{self, LinkedList}; use std::fmt; use std::future::Future; use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::Ordering::SeqCst; use std::task::{Context, Poll, Waker}; use std::usize; /// Sending-half of the [`broadcast`] channel. /// /// May be used from many threads. Messages can be sent with /// [`send`][Sender::send]. /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` /// /// [`broadcast`]: crate::sync::broadcast pub struct Sender { shared: Arc>, } /// Receiving-half of the [`broadcast`] channel. /// /// Must not be used concurrently. Messages may be retrieved using /// [`recv`][Receiver::recv]. /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` /// /// [`broadcast`]: crate::sync::broadcast pub struct Receiver { /// State shared with all receivers and senders. shared: Arc>, /// Next position to read from next: u64, } pub mod error { //! Broadcast error types use std::fmt; /// Error returned by from the [`send`] function on a [`Sender`]. /// /// A **send** operation can only fail if there are no active receivers, /// implying that the message could never be received. The error contains the /// message being sent as a payload so it can be recovered. /// /// [`send`]: crate::sync::broadcast::Sender::send /// [`Sender`]: crate::sync::broadcast::Sender #[derive(Debug)] pub struct SendError(pub T); impl fmt::Display for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "channel closed") } } impl std::error::Error for SendError {} /// An error returned from the [`recv`] function on a [`Receiver`]. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver #[derive(Debug, PartialEq)] pub enum RecvError { /// There are no more active senders implying no further messages will ever /// be sent. Closed, /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. /// /// Includes the number of skipped messages. Lagged(u64), } impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { RecvError::Closed => write!(f, "channel closed"), RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), } } } impl std::error::Error for RecvError {} /// An error returned from the [`try_recv`] function on a [`Receiver`]. /// /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver #[derive(Debug, PartialEq)] pub enum TryRecvError { /// The channel is currently empty. There are still active /// [`Sender`] handles, so data may yet become available. /// /// [`Sender`]: crate::sync::broadcast::Sender Empty, /// There are no more active senders implying no further messages will ever /// be sent. Closed, /// The receiver lagged too far behind and has been forcibly disconnected. /// Attempting to receive again will return the oldest message still /// retained by the channel. /// /// Includes the number of skipped messages. Lagged(u64), } impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { TryRecvError::Empty => write!(f, "channel empty"), TryRecvError::Closed => write!(f, "channel closed"), TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), } } } impl std::error::Error for TryRecvError {} } use self::error::*; /// Data shared between senders and receivers struct Shared { /// slots in the channel buffer: Box<[RwLock>]>, /// Mask a position -> index mask: usize, /// Tail of the queue. Includes the rx wait list. tail: Mutex, /// Number of outstanding Sender handles num_tx: AtomicUsize, } /// Next position to write a value struct Tail { /// Next position to write to pos: u64, /// Number of active receivers rx_cnt: usize, /// True if the channel is closed closed: bool, /// Receivers waiting for a value waiters: LinkedList::Target>, } /// Slot in the buffer struct Slot { /// Remaining number of receivers that are expected to see this value. /// /// When this goes to zero, the value is released. /// /// An atomic is used as it is mutated concurrently with the slot read lock /// acquired. rem: AtomicUsize, /// Uniquely identifies the `send` stored in the slot pos: u64, /// True signals the channel is closed. closed: bool, /// The value being broadcast. /// /// The value is set by `send` when the write lock is held. When a reader /// drops, `rem` is decremented. When it hits zero, the value is dropped. val: UnsafeCell>, } /// An entry in the wait queue struct Waiter { /// True if queued queued: bool, /// Task waiting on the broadcast channel. waker: Option, /// Intrusive linked-list pointers. pointers: linked_list::Pointers, /// Should not be `Unpin`. _p: PhantomPinned, } struct RecvGuard<'a, T> { slot: RwLockReadGuard<'a, Slot>, } /// Receive a value future struct Recv where R: AsMut>, { /// Receiver being waited on receiver: R, /// Entry in the waiter `LinkedList` waiter: UnsafeCell, _p: std::marker::PhantomData, } /// `AsMut` is not implemented for `T` (coherence). Explicitly implementing /// `AsMut` for `Receiver` would be included in the public API of the receiver /// type. Instead, `Borrow` is used internally to bridge the gap. struct Borrow(T); impl AsMut> for Borrow> { fn as_mut(&mut self) -> &mut Receiver { &mut self.0 } } impl<'a, T> AsMut> for Borrow<&'a mut Receiver> { fn as_mut(&mut self) -> &mut Receiver { &mut *self.0 } } unsafe impl> + Send, T: Send> Send for Recv {} unsafe impl> + Sync, T: Send> Sync for Recv {} /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 2; /// Create a bounded, multi-producer, multi-consumer channel where each sent /// value is broadcasted to all active receivers. /// /// All data sent on [`Sender`] will become available on every active /// [`Receiver`] in the same order as it was sent. /// /// The `Sender` can be cloned to `send` to the same channel from multiple /// points in the process or it can be used concurrently from an `Arc`. New /// `Receiver` handles are created by calling [`Sender::subscribe`]. /// /// If all [`Receiver`] handles are dropped, the `send` method will return a /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`] /// method will return a [`RecvError`]. /// /// [`Sender`]: crate::sync::broadcast::Sender /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`SendError`]: crate::sync::broadcast::error::SendError /// [`RecvError`]: crate::sync::broadcast::error::RecvError /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` pub fn channel(mut capacity: usize) -> (Sender, Receiver) { assert!(capacity > 0, "capacity is empty"); assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); // Round to a power of two capacity = capacity.next_power_of_two(); let mut buffer = Vec::with_capacity(capacity); for i in 0..capacity { buffer.push(RwLock::new(Slot { rem: AtomicUsize::new(0), pos: (i as u64).wrapping_sub(capacity as u64), closed: false, val: UnsafeCell::new(None), })); } let shared = Arc::new(Shared { buffer: buffer.into_boxed_slice(), mask: capacity - 1, tail: Mutex::new(Tail { pos: 0, rx_cnt: 1, closed: false, waiters: LinkedList::new(), }), num_tx: AtomicUsize::new(1), }); let rx = Receiver { shared: shared.clone(), next: 0, }; let tx = Sender { shared }; (tx, rx) } unsafe impl Send for Sender {} unsafe impl Sync for Sender {} unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl Sender { /// Attempts to send a value to all active [`Receiver`] handles, returning /// it back if it could not be sent. /// /// A successful send occurs when there is at least one active [`Receiver`] /// handle. An unsuccessful send would be one where all associated /// [`Receiver`] handles have already been dropped. /// /// # Return /// /// On success, the number of subscribed [`Receiver`] handles is returned. /// This does not mean that this number of receivers will see the message as /// a receiver may drop before receiving the message. /// /// # Note /// /// A return value of `Ok` **does not** mean that the sent value will be /// observed by all or any of the active [`Receiver`] handles. [`Receiver`] /// handles may be dropped before receiving the sent message. /// /// A return value of `Err` **does not** mean that future calls to `send` /// will fail. New [`Receiver`] handles may be created by calling /// [`subscribe`]. /// /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` pub fn send(&self, value: T) -> Result> { self.send2(Some(value)) .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap())) } /// Creates a new [`Receiver`] handle that will receive values sent **after** /// this call to `subscribe`. /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, _rx) = broadcast::channel(16); /// /// // Will not be seen /// tx.send(10).unwrap(); /// /// let mut rx = tx.subscribe(); /// /// tx.send(20).unwrap(); /// /// let value = rx.recv().await.unwrap(); /// assert_eq!(20, value); /// } /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); new_receiver(shared) } /// Returns the number of active receivers /// /// An active receiver is a [`Receiver`] handle returned from [`channel`] or /// [`subscribe`]. These are the handles that will receive values sent on /// this [`Sender`]. /// /// # Note /// /// It is not guaranteed that a sent message will reach this number of /// receivers. Active receivers may never call [`recv`] again before /// dropping. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`Sender`]: crate::sync::broadcast::Sender /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe /// [`channel`]: crate::sync::broadcast::channel /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, _rx1) = broadcast::channel(16); /// /// assert_eq!(1, tx.receiver_count()); /// /// let mut _rx2 = tx.subscribe(); /// /// assert_eq!(2, tx.receiver_count()); /// /// tx.send(10).unwrap(); /// } /// ``` pub fn receiver_count(&self) -> usize { let tail = self.shared.tail.lock(); tail.rx_cnt } fn send2(&self, value: Option) -> Result>> { let mut tail = self.shared.tail.lock(); if tail.rx_cnt == 0 { return Err(SendError(value)); } // Position to write into let pos = tail.pos; let rem = tail.rx_cnt; let idx = (pos & self.shared.mask as u64) as usize; // Update the tail position tail.pos = tail.pos.wrapping_add(1); // Get the slot let mut slot = self.shared.buffer[idx].write().unwrap(); // Track the position slot.pos = pos; // Set remaining receivers slot.rem.with_mut(|v| *v = rem); // Set the closed bit if the value is `None`; otherwise write the value if value.is_none() { tail.closed = true; slot.closed = true; } else { slot.val.with_mut(|ptr| unsafe { *ptr = value }); } // Release the slot lock before notifying the receivers. drop(slot); tail.notify_rx(); // Release the mutex. This must happen after the slot lock is released, // otherwise the writer lock bit could be cleared while another thread // is in the critical section. drop(tail); Ok(rem) } } fn new_receiver(shared: Arc>) -> Receiver { let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { panic!("max receivers"); } tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); let next = tail.pos; drop(tail); Receiver { shared, next } } impl Tail { fn notify_rx(&mut self) { while let Some(mut waiter) = self.waiters.pop_back() { // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; assert!(waiter.queued); waiter.queued = false; let waker = waiter.waker.take().unwrap(); waker.wake(); } } } impl Clone for Sender { fn clone(&self) -> Sender { let shared = self.shared.clone(); shared.num_tx.fetch_add(1, SeqCst); Sender { shared } } } impl Drop for Sender { fn drop(&mut self) { if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) { let _ = self.send2(None); } } } impl Receiver { /// Locks the next value if there is one. fn recv_ref( &mut self, waiter: Option<(&UnsafeCell, &Waker)>, ) -> Result, TryRecvError> { let idx = (self.next & self.shared.mask as u64) as usize; // The slot holding the next value to read let mut slot = self.shared.buffer[idx].read().unwrap(); if slot.pos != self.next { let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64); // The receiver has read all current values in the channel and there // is no waiter to register if waiter.is_none() && next_pos == self.next { return Err(TryRecvError::Empty); } // Release the `slot` lock before attempting to acquire the `tail` // lock. This is required because `send2` acquires the tail lock // first followed by the slot lock. Acquiring the locks in reverse // order here would result in a potential deadlock: `recv_ref` // acquires the `slot` lock and attempts to acquire the `tail` lock // while `send2` acquired the `tail` lock and attempts to acquire // the slot lock. drop(slot); let mut tail = self.shared.tail.lock(); // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); // Make sure the position did not change. This could happen in the // unlikely event that the buffer is wrapped between dropping the // read lock and acquiring the tail lock. if slot.pos != self.next { let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64); if next_pos == self.next { // Store the waker if let Some((waiter, waker)) = waiter { // Safety: called while locked. unsafe { // Only queue if not already queued waiter.with_mut(|ptr| { // If there is no waker **or** if the currently // stored waker references a **different** task, // track the tasks' waker to be notified on // receipt of a new value. match (*ptr).waker { Some(ref w) if w.will_wake(waker) => {} _ => { (*ptr).waker = Some(waker.clone()); } } if !(*ptr).queued { (*ptr).queued = true; tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); } } return Err(TryRecvError::Empty); } // At this point, the receiver has lagged behind the sender by // more than the channel capacity. The receiver will attempt to // catch up by skipping dropped messages and setting the // internal cursor to the **oldest** message stored by the // channel. // // However, finding the oldest position is a bit more // complicated than `tail-position - buffer-size`. When // the channel is closed, the tail position is incremented to // signal a new `None` message, but `None` is not stored in the // channel itself (see issue #2425 for why). // // To account for this, if the channel is closed, the tail // position is decremented by `buffer-size + 1`. let mut adjust = 0; if tail.closed { adjust = 1 } let next = tail .pos .wrapping_sub(self.shared.buffer.len() as u64 + adjust); let missed = next.wrapping_sub(self.next); drop(tail); // The receiver is slow but no values have been missed if missed == 0 { self.next = self.next.wrapping_add(1); return Ok(RecvGuard { slot }); } self.next = next; return Err(TryRecvError::Lagged(missed)); } } self.next = self.next.wrapping_add(1); if slot.closed { return Err(TryRecvError::Closed); } Ok(RecvGuard { slot }) } } impl Receiver { /// Receives the next value for this receiver. /// /// Each [`Receiver`] handle will receive a clone of all values sent /// **after** it has subscribed. /// /// `Err(RecvError::Closed)` is returned when all `Sender` halves have /// dropped, indicating that no further values can be sent on the channel. /// /// If the [`Receiver`] handle falls behind, once the channel is full, newly /// sent values will overwrite old values. At this point, a call to [`recv`] /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s /// internal cursor is updated to point to the oldest value still held by /// the channel. A subsequent call to [`recv`] will return this value /// **unless** it has been since overwritten. /// /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`recv`]: crate::sync::broadcast::Receiver::recv /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx1) = broadcast::channel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.recv().await.unwrap(), 20); /// }); /// /// tokio::spawn(async move { /// assert_eq!(rx2.recv().await.unwrap(), 10); /// assert_eq!(rx2.recv().await.unwrap(), 20); /// }); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// } /// ``` /// /// Handling lag /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = broadcast::channel(2); /// /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// tx.send(30).unwrap(); /// /// // The receiver lagged behind /// assert!(rx.recv().await.is_err()); /// /// // At this point, we can abort or continue with lost messages /// /// assert_eq!(20, rx.recv().await.unwrap()); /// assert_eq!(30, rx.recv().await.unwrap()); /// } /// ``` pub async fn recv(&mut self) -> Result { let fut = Recv::<_, T>::new(Borrow(self)); fut.await } /// Attempts to return a pending value on this receiver without awaiting. /// /// This is useful for a flavor of "optimistic check" before deciding to /// await on a receiver. /// /// Compared with [`recv`], this function has three failure cases instead of two /// (one for closed, one for an empty buffer, one for a lagging receiver). /// /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have /// dropped, indicating that no further values can be sent on the channel. /// /// If the [`Receiver`] handle falls behind, once the channel is full, newly /// sent values will overwrite old values. At this point, a call to [`recv`] /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s /// internal cursor is updated to point to the oldest value still held by /// the channel. A subsequent call to [`try_recv`] will return this value /// **unless** it has been since overwritten. If there are no values to /// receive, `Err(TryRecvError::Empty)` is returned. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver /// /// # Examples /// /// ``` /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, mut rx) = broadcast::channel(16); /// /// assert!(rx.try_recv().is_err()); /// /// tx.send(10).unwrap(); /// /// let value = rx.try_recv().unwrap(); /// assert_eq!(10, value); /// } /// ``` pub fn try_recv(&mut self) -> Result { let guard = self.recv_ref(None)?; guard.clone_value().ok_or(TryRecvError::Closed) } /// Convert the receiver into a `Stream`. /// /// The conversion allows using `Receiver` with APIs that require stream /// values. /// /// # Examples /// /// ``` /// use tokio::stream::StreamExt; /// use tokio::sync::broadcast; /// /// #[tokio::main] /// async fn main() { /// let (tx, rx) = broadcast::channel(128); /// /// tokio::spawn(async move { /// for i in 0..10_i32 { /// tx.send(i).unwrap(); /// } /// }); /// /// // Streams must be pinned to iterate. /// tokio::pin! { /// let stream = rx /// .into_stream() /// .filter(Result::is_ok) /// .map(Result::unwrap) /// .filter(|v| v % 2 == 0) /// .map(|v| v + 1); /// } /// /// while let Some(i) = stream.next().await { /// println!("{}", i); /// } /// } /// ``` #[cfg(feature = "stream")] #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] pub fn into_stream(self) -> impl Stream> { Recv::new(Borrow(self)) } } impl Drop for Receiver { fn drop(&mut self) { let mut tail = self.shared.tail.lock(); tail.rx_cnt -= 1; let until = tail.pos; drop(tail); while self.next != until { match self.recv_ref(None) { Ok(_) => {} // The channel is closed Err(TryRecvError::Closed) => break, // Ignore lagging, we will catch up Err(TryRecvError::Lagged(..)) => {} // Can't be empty Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"), } } } } impl Recv where R: AsMut>, { fn new(receiver: R) -> Recv { Recv { receiver, waiter: UnsafeCell::new(Waiter { queued: false, waker: None, pointers: linked_list::Pointers::new(), _p: PhantomPinned, }), _p: std::marker::PhantomData, } } /// A custom `project` implementation is used in place of `pin-project-lite` /// as a custom drop implementation is needed. fn project(self: Pin<&mut Self>) -> (&mut Receiver, &UnsafeCell) { unsafe { // Safety: Receiver is Unpin is_unpin::<&mut Receiver>(); let me = self.get_unchecked_mut(); (me.receiver.as_mut(), &me.waiter) } } } impl Future for Recv where R: AsMut>, T: Clone, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (receiver, waiter) = self.project(); let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) { Ok(value) => value, Err(TryRecvError::Empty) => return Poll::Pending, Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)), }; Poll::Ready(guard.clone_value().ok_or(RecvError::Closed)) } } cfg_stream! { use futures_core::Stream; impl Stream for Recv where R: AsMut>, T: Clone, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (receiver, waiter) = self.project(); let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) { Ok(value) => value, Err(TryRecvError::Empty) => return Poll::Pending, Err(TryRecvError::Lagged(n)) => return Poll::Ready(Some(Err(RecvError::Lagged(n)))), Err(TryRecvError::Closed) => return Poll::Ready(None), }; Poll::Ready(guard.clone_value().map(Ok)) } } } impl Drop for Recv where R: AsMut>, { fn drop(&mut self) { // Acquire the tail lock. This is required for safety before accessing // the waiter node. let mut tail = self.receiver.as_mut().shared.tail.lock(); // safety: tail lock is held let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); if queued { // Remove the node // // safety: tail lock is held and the wait node is verified to be in // the list. unsafe { self.waiter.with_mut(|ptr| { tail.waiters.remove((&mut *ptr).into()); }); } } } } /// # Safety /// /// `Waiter` is forced to be !Unpin. unsafe impl linked_list::Link for Waiter { type Handle = NonNull; type Target = Waiter; fn as_raw(handle: &NonNull) -> NonNull { *handle } unsafe fn from_raw(ptr: NonNull) -> NonNull { ptr } unsafe fn pointers(mut target: NonNull) -> NonNull> { NonNull::from(&mut target.as_mut().pointers) } } impl fmt::Debug for Sender { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "broadcast::Sender") } } impl fmt::Debug for Receiver { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "broadcast::Receiver") } } impl<'a, T> RecvGuard<'a, T> { fn clone_value(&self) -> Option where T: Clone, { self.slot.val.with(|ptr| unsafe { (*ptr).clone() }) } } impl<'a, T> Drop for RecvGuard<'a, T> { fn drop(&mut self) { // Decrement the remaining counter if 1 == self.slot.rem.fetch_sub(1, SeqCst) { // Safety: Last receiver, drop the value self.slot.val.with_mut(|ptr| unsafe { *ptr = None }); } } } fn is_unpin() {}