From fb28caa90c8ab4453b259424c88dc7ec8ff06bbb Mon Sep 17 00:00:00 2001 From: Zephyr Shannon Date: Mon, 19 Oct 2020 01:12:40 -0700 Subject: sync: implement Clone for broadcast::Receiver (#2933) --- tokio/src/sync/broadcast.rs | 40 ++++++++++++++++++++++------------ tokio/src/sync/tests/loom_broadcast.rs | 3 ++- 2 files changed, 28 insertions(+), 15 deletions(-) (limited to 'tokio') diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ada0fc67..4e74a3aa 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -405,7 +405,8 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2; /// /// The `Sender` can be cloned to `send` to the same channel from multiple /// points in the process or it can be used concurrently from an `Arc`. New -/// `Receiver` handles are created by calling [`Sender::subscribe`]. +/// `Receiver` handles can be cloned from an existing `Receiver` or created by +/// calling [`Sender::subscribe`]. /// /// If all [`Receiver`] handles are dropped, the `send` method will return a /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`] @@ -569,19 +570,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - - let mut tail = shared.tail.lock(); - - if tail.rx_cnt == MAX_RECEIVERS { - panic!("max receivers"); - } - - tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - let next = tail.pos; - - drop(tail); - - Receiver { shared, next } + new_receiver(shared) } /// Returns the number of active receivers @@ -671,6 +660,22 @@ impl Sender { } } +fn new_receiver(shared: Arc>) -> Receiver { + let mut tail = shared.tail.lock(); + + if tail.rx_cnt == MAX_RECEIVERS { + panic!("max receivers"); + } + + tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + + let next = tail.pos; + + drop(tail); + + Receiver { shared, next } +} + impl Tail { fn notify_rx(&mut self) { while let Some(mut waiter) = self.waiters.pop_back() { @@ -980,6 +985,13 @@ impl Receiver { } } +impl Clone for Receiver { + fn clone(&self) -> Self { + let shared = self.shared.clone(); + new_receiver(shared) + } +} + impl Drop for Receiver { fn drop(&mut self) { let mut tail = self.shared.tail.lock(); diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index 4b1f034f..e15dc25f 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -92,11 +92,12 @@ fn broadcast_two() { }); } +// Exercise the Receiver Clone impl as well #[test] fn broadcast_wrap() { loom::model(|| { let (tx, mut rx1) = broadcast::channel(2); - let mut rx2 = tx.subscribe(); + let mut rx2 = rx1.clone(); let th1 = thread::spawn(move || { block_on(async { -- cgit v1.2.3