diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-10-12 19:09:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-12 12:09:36 -0400 |
commit | b5750825431afe6fe227a6fcf30a593b51ceff1b (patch) | |
tree | 37ea4bb6ce9d2d478cac48d0e0ea9a0992f575eb /tokio/src/sync | |
parent | c4f620cb30fb5c98655ecd4726f913e488f90b5b (diff) |
sync: change chan `closed(&mut self)` to `closed(&self)` (#2939)
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 12 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 12 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_mpsc.rs | 37 |
4 files changed, 46 insertions, 17 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 38fb753e..76439a8d 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -332,11 +332,11 @@ impl<T> Sender<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx1, rx) = mpsc::channel::<()>(1); - /// let mut tx2 = tx1.clone(); - /// let mut tx3 = tx1.clone(); - /// let mut tx4 = tx1.clone(); - /// let mut tx5 = tx1.clone(); + /// let (tx1, rx) = mpsc::channel::<()>(1); + /// let tx2 = tx1.clone(); + /// let tx3 = tx1.clone(); + /// let tx4 = tx1.clone(); + /// let tx5 = tx1.clone(); /// tokio::spawn(async move { /// drop(rx); /// }); @@ -351,7 +351,7 @@ impl<T> Sender<T> { //// println!("Receiver dropped"); /// } /// ``` - pub async fn closed(&mut self) { + pub async fn closed(&self) { self.chan.closed().await } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 6fedb5c5..3f50493e 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -147,7 +147,7 @@ impl<T, S: Semaphore> Tx<T, S> { self.inner.semaphore.is_closed() } - pub(crate) async fn closed(&mut self) { + pub(crate) async fn closed(&self) { use std::future::Future; use std::pin::Pin; use std::task::Poll; diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index b92cbc05..fe882d5b 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -223,11 +223,11 @@ impl<T> UnboundedSender<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); - /// let mut tx2 = tx1.clone(); - /// let mut tx3 = tx1.clone(); - /// let mut tx4 = tx1.clone(); - /// let mut tx5 = tx1.clone(); + /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); + /// let tx2 = tx1.clone(); + /// let tx3 = tx1.clone(); + /// let tx4 = tx1.clone(); + /// let tx5 = tx1.clone(); /// tokio::spawn(async move { /// drop(rx); /// }); @@ -242,7 +242,7 @@ impl<T> UnboundedSender<T> { //// println!("Receiver dropped"); /// } /// ``` - pub async fn closed(&mut self) { + pub async fn closed(&self) { self.chan.closed().await } /// Checks if the channel has been closed. This happens when the diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index 330e798b..c12313bd 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -2,7 +2,9 @@ use crate::sync::mpsc; use futures::future::poll_fn; use loom::future::block_on; +use loom::sync::Arc; use loom::thread; +use tokio_test::assert_ok; #[test] fn closing_tx() { @@ -43,8 +45,8 @@ fn closing_unbounded_tx() { #[test] fn closing_bounded_rx() { loom::model(|| { - let (mut tx1, rx) = mpsc::channel::<()>(16); - let mut tx2 = tx1.clone(); + let (tx1, rx) = mpsc::channel::<()>(16); + let tx2 = tx1.clone(); thread::spawn(move || { drop(rx); }); @@ -55,10 +57,37 @@ fn closing_bounded_rx() { } #[test] +fn closing_and_sending() { + loom::model(|| { + let (tx1, mut rx) = mpsc::channel::<()>(16); + let tx1 = Arc::new(tx1); + let tx2 = tx1.clone(); + + let th1 = thread::spawn(move || { + tx1.try_send(()).unwrap(); + }); + + let th2 = thread::spawn(move || { + block_on(tx2.closed()); + }); + + let th3 = thread::spawn(move || { + let v = block_on(rx.recv()); + assert!(v.is_some()); + drop(rx); + }); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + assert_ok!(th3.join()); + }); +} + +#[test] fn closing_unbounded_rx() { loom::model(|| { - let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); - let mut tx2 = tx1.clone(); + let (tx1, rx) = mpsc::unbounded_channel::<()>(); + let tx2 = tx1.clone(); thread::spawn(move || { drop(rx); }); |