summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZephyr Shannon <earthlingzephyr@gmail.com>2020-10-19 01:12:40 -0700
committerGitHub <noreply@github.com>2020-10-19 10:12:40 +0200
commitfb28caa90c8ab4453b259424c88dc7ec8ff06bbb (patch)
treead2e0ddd41b188b7b7e794ccb7a3baba8377b250
parente88e64bcc07f4a632014815a5fec3e67d97a8f31 (diff)
sync: implement Clone for broadcast::Receiver (#2933)
-rw-r--r--tokio/src/sync/broadcast.rs40
-rw-r--r--tokio/src/sync/tests/loom_broadcast.rs3
2 files changed, 28 insertions, 15 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index ada0fc67..4e74a3aa 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -405,7 +405,8 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
///
/// The `Sender` can be cloned to `send` to the same channel from multiple
/// points in the process or it can be used concurrently from an `Arc`. New
-/// `Receiver` handles are created by calling [`Sender::subscribe`].
+/// `Receiver` handles can be cloned from an existing `Receiver` or created by
+/// calling [`Sender::subscribe`].
///
/// If all [`Receiver`] handles are dropped, the `send` method will return a
/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
@@ -569,19 +570,7 @@ impl<T> Sender<T> {
/// ```
pub fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
-
- let mut tail = shared.tail.lock();
-
- if tail.rx_cnt == MAX_RECEIVERS {
- panic!("max receivers");
- }
-
- tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
- let next = tail.pos;
-
- drop(tail);
-
- Receiver { shared, next }
+ new_receiver(shared)
}
/// Returns the number of active receivers
@@ -671,6 +660,22 @@ impl<T> Sender<T> {
}
}
+fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
+ let mut tail = shared.tail.lock();
+
+ if tail.rx_cnt == MAX_RECEIVERS {
+ panic!("max receivers");
+ }
+
+ tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
+
+ let next = tail.pos;
+
+ drop(tail);
+
+ Receiver { shared, next }
+}
+
impl Tail {
fn notify_rx(&mut self) {
while let Some(mut waiter) = self.waiters.pop_back() {
@@ -980,6 +985,13 @@ impl<T: Clone> Receiver<T> {
}
}
+impl<T> Clone for Receiver<T> {
+ fn clone(&self) -> Self {
+ let shared = self.shared.clone();
+ new_receiver(shared)
+ }
+}
+
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut tail = self.shared.tail.lock();
diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs
index 4b1f034f..e15dc25f 100644
--- a/tokio/src/sync/tests/loom_broadcast.rs
+++ b/tokio/src/sync/tests/loom_broadcast.rs
@@ -92,11 +92,12 @@ fn broadcast_two() {
});
}
+// Exercise the Receiver Clone impl as well
#[test]
fn broadcast_wrap() {
loom::model(|| {
let (tx, mut rx1) = broadcast::channel(2);
- let mut rx2 = tx.subscribe();
+ let mut rx2 = rx1.clone();
let th1 = thread::spawn(move || {
block_on(async {