summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-10-12 19:09:36 +0300
committerGitHub <noreply@github.com>2020-10-12 12:09:36 -0400
commitb5750825431afe6fe227a6fcf30a593b51ceff1b (patch)
tree37ea4bb6ce9d2d478cac48d0e0ea9a0992f575eb /tokio/src/sync
parentc4f620cb30fb5c98655ecd4726f913e488f90b5b (diff)
sync: change chan `closed(&mut self)` to `closed(&self)` (#2939)
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs12
-rw-r--r--tokio/src/sync/mpsc/chan.rs2
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs12
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs37
4 files changed, 46 insertions, 17 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 38fb753e..76439a8d 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -332,11 +332,11 @@ impl<T> Sender<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx1, rx) = mpsc::channel::<()>(1);
- /// let mut tx2 = tx1.clone();
- /// let mut tx3 = tx1.clone();
- /// let mut tx4 = tx1.clone();
- /// let mut tx5 = tx1.clone();
+ /// let (tx1, rx) = mpsc::channel::<()>(1);
+ /// let tx2 = tx1.clone();
+ /// let tx3 = tx1.clone();
+ /// let tx4 = tx1.clone();
+ /// let tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
@@ -351,7 +351,7 @@ impl<T> Sender<T> {
//// println!("Receiver dropped");
/// }
/// ```
- pub async fn closed(&mut self) {
+ pub async fn closed(&self) {
self.chan.closed().await
}
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index 6fedb5c5..3f50493e 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -147,7 +147,7 @@ impl<T, S: Semaphore> Tx<T, S> {
self.inner.semaphore.is_closed()
}
- pub(crate) async fn closed(&mut self) {
+ pub(crate) async fn closed(&self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index b92cbc05..fe882d5b 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -223,11 +223,11 @@ impl<T> UnboundedSender<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
- /// let mut tx2 = tx1.clone();
- /// let mut tx3 = tx1.clone();
- /// let mut tx4 = tx1.clone();
- /// let mut tx5 = tx1.clone();
+ /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ /// let tx2 = tx1.clone();
+ /// let tx3 = tx1.clone();
+ /// let tx4 = tx1.clone();
+ /// let tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
@@ -242,7 +242,7 @@ impl<T> UnboundedSender<T> {
//// println!("Receiver dropped");
/// }
/// ```
- pub async fn closed(&mut self) {
+ pub async fn closed(&self) {
self.chan.closed().await
}
/// Checks if the channel has been closed. This happens when the
diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs
index 330e798b..c12313bd 100644
--- a/tokio/src/sync/tests/loom_mpsc.rs
+++ b/tokio/src/sync/tests/loom_mpsc.rs
@@ -2,7 +2,9 @@ use crate::sync::mpsc;
use futures::future::poll_fn;
use loom::future::block_on;
+use loom::sync::Arc;
use loom::thread;
+use tokio_test::assert_ok;
#[test]
fn closing_tx() {
@@ -43,8 +45,8 @@ fn closing_unbounded_tx() {
#[test]
fn closing_bounded_rx() {
loom::model(|| {
- let (mut tx1, rx) = mpsc::channel::<()>(16);
- let mut tx2 = tx1.clone();
+ let (tx1, rx) = mpsc::channel::<()>(16);
+ let tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});
@@ -55,10 +57,37 @@ fn closing_bounded_rx() {
}
#[test]
+fn closing_and_sending() {
+ loom::model(|| {
+ let (tx1, mut rx) = mpsc::channel::<()>(16);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.try_send(()).unwrap();
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(tx2.closed());
+ });
+
+ let th3 = thread::spawn(move || {
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+ drop(rx);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ assert_ok!(th3.join());
+ });
+}
+
+#[test]
fn closing_unbounded_rx() {
loom::model(|| {
- let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
- let mut tx2 = tx1.clone();
+ let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ let tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});