summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorMikail Bagishov <bagishov.mikail@yandex.ru>2020-09-28 18:37:28 +0300
committerGitHub <noreply@github.com>2020-09-28 11:37:28 -0400
commit078d0a2ebc4d4f88cb6bce05c8ac4f5038dae9be (patch)
tree0ff39cf970d57028ee49bd362fb4acce95e8b4ae /tokio/src/sync
parent99d4061203aa5dbf79b06352b06bc9818a293665 (diff)
sync: Add `is_closed` method to mpsc senders (#2726)
Co-authored-by: Alice Ryhl <alice@ryhl.io>
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs22
-rw-r--r--tokio/src/sync/mpsc/chan.rs4
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs21
3 files changed, 47 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 542eae27..5e94e729 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -523,6 +523,28 @@ impl<T> Sender<T> {
enter_handle.block_on(self.send(value)).unwrap()
}
+ /// Checks if the channel has been closed. This happens when the
+ /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
+ /// called.
+ ///
+ /// [`Receiver`]: crate::sync::mpsc::Receiver
+ /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
+ /// assert!(!tx.is_closed());
+ ///
+ /// let tx2 = tx.clone();
+ /// assert!(!tx2.is_closed());
+ ///
+ /// drop(rx);
+ /// assert!(tx.is_closed());
+ /// assert!(tx2.is_closed());
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ self.chan.is_closed()
+ }
+
/// Wait for channel capacity. Once capacity to send one message is
/// available, it is reserved for the caller.
///
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index e7b951ed..6fedb5c5 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -143,6 +143,10 @@ impl<T, S> Tx<T, S> {
}
impl<T, S: Semaphore> Tx<T, S> {
+ pub(crate) fn is_closed(&self) -> bool {
+ self.inner.semaphore.is_closed()
+ }
+
pub(crate) async fn closed(&mut self) {
use std::future::Future;
use std::pin::Pin;
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 09f71f21..b92cbc05 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -245,4 +245,25 @@ impl<T> UnboundedSender<T> {
pub async fn closed(&mut self) {
self.chan.closed().await
}
+ /// Checks if the channel has been closed. This happens when the
+ /// [`UnboundedReceiver`] is dropped, or when the
+ /// [`UnboundedReceiver::close`] method is called.
+ ///
+ /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
+ /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
+ /// assert!(!tx.is_closed());
+ ///
+ /// let tx2 = tx.clone();
+ /// assert!(!tx2.is_closed());
+ ///
+ /// drop(rx);
+ /// assert!(tx.is_closed());
+ /// assert!(tx2.is_closed());
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ self.chan.is_closed()
+ }
}