From b5750825431afe6fe227a6fcf30a593b51ceff1b Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 12 Oct 2020 19:09:36 +0300 Subject: sync: change chan `closed(&mut self)` to `closed(&self)` (#2939) --- tokio/src/sync/mpsc/bounded.rs | 12 ++++++------ tokio/src/sync/mpsc/chan.rs | 2 +- tokio/src/sync/mpsc/unbounded.rs | 12 ++++++------ tokio/src/sync/tests/loom_mpsc.rs | 37 +++++++++++++++++++++++++++++++++---- 4 files changed, 46 insertions(+), 17 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 38fb753e..76439a8d 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -332,11 +332,11 @@ impl Sender { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx1, rx) = mpsc::channel::<()>(1); - /// let mut tx2 = tx1.clone(); - /// let mut tx3 = tx1.clone(); - /// let mut tx4 = tx1.clone(); - /// let mut tx5 = tx1.clone(); + /// let (tx1, rx) = mpsc::channel::<()>(1); + /// let tx2 = tx1.clone(); + /// let tx3 = tx1.clone(); + /// let tx4 = tx1.clone(); + /// let tx5 = tx1.clone(); /// tokio::spawn(async move { /// drop(rx); /// }); @@ -351,7 +351,7 @@ impl Sender { //// println!("Receiver dropped"); /// } /// ``` - pub async fn closed(&mut self) { + pub async fn closed(&self) { self.chan.closed().await } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 6fedb5c5..3f50493e 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -147,7 +147,7 @@ impl Tx { self.inner.semaphore.is_closed() } - pub(crate) async fn closed(&mut self) { + pub(crate) async fn closed(&self) { use std::future::Future; use std::pin::Pin; use std::task::Poll; diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index b92cbc05..fe882d5b 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -223,11 +223,11 @@ impl UnboundedSender { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); - /// let mut tx2 = tx1.clone(); - /// let mut tx3 = tx1.clone(); - /// let mut tx4 = tx1.clone(); - /// let mut tx5 = tx1.clone(); + /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); + /// let tx2 = tx1.clone(); + /// let tx3 = tx1.clone(); + /// let tx4 = tx1.clone(); + /// let tx5 = tx1.clone(); /// tokio::spawn(async move { /// drop(rx); /// }); @@ -242,7 +242,7 @@ impl UnboundedSender { //// println!("Receiver dropped"); /// } /// ``` - pub async fn closed(&mut self) { + pub async fn closed(&self) { self.chan.closed().await } /// Checks if the channel has been closed. This happens when the diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index 330e798b..c12313bd 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -2,7 +2,9 @@ use crate::sync::mpsc; use futures::future::poll_fn; use loom::future::block_on; +use loom::sync::Arc; use loom::thread; +use tokio_test::assert_ok; #[test] fn closing_tx() { @@ -43,8 +45,8 @@ fn closing_unbounded_tx() { #[test] fn closing_bounded_rx() { loom::model(|| { - let (mut tx1, rx) = mpsc::channel::<()>(16); - let mut tx2 = tx1.clone(); + let (tx1, rx) = mpsc::channel::<()>(16); + let tx2 = tx1.clone(); thread::spawn(move || { drop(rx); }); @@ -54,11 +56,38 @@ fn closing_bounded_rx() { }); } +#[test] +fn closing_and_sending() { + loom::model(|| { + let (tx1, mut rx) = mpsc::channel::<()>(16); + let tx1 = Arc::new(tx1); + let tx2 = tx1.clone(); + + let th1 = thread::spawn(move || { + tx1.try_send(()).unwrap(); + }); + + let th2 = thread::spawn(move || { + block_on(tx2.closed()); + }); + + let th3 = thread::spawn(move || { + let v = block_on(rx.recv()); + assert!(v.is_some()); + drop(rx); + }); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + assert_ok!(th3.join()); + }); +} + #[test] fn closing_unbounded_rx() { loom::model(|| { - let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); - let mut tx2 = tx1.clone(); + let (tx1, rx) = mpsc::unbounded_channel::<()>(); + let tx2 = tx1.clone(); thread::spawn(move || { drop(rx); }); -- cgit v1.2.3