diff options
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 22 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 4 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 21 |
3 files changed, 47 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 542eae27..5e94e729 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -523,6 +523,28 @@ impl<T> Sender<T> { enter_handle.block_on(self.send(value)).unwrap() } + /// Checks if the channel has been closed. This happens when the + /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is + /// called. + /// + /// [`Receiver`]: crate::sync::mpsc::Receiver + /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42); + /// assert!(!tx.is_closed()); + /// + /// let tx2 = tx.clone(); + /// assert!(!tx2.is_closed()); + /// + /// drop(rx); + /// assert!(tx.is_closed()); + /// assert!(tx2.is_closed()); + /// ``` + pub fn is_closed(&self) -> bool { + self.chan.is_closed() + } + /// Wait for channel capacity. Once capacity to send one message is /// available, it is reserved for the caller. /// diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index e7b951ed..6fedb5c5 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -143,6 +143,10 @@ impl<T, S> Tx<T, S> { } impl<T, S: Semaphore> Tx<T, S> { + pub(crate) fn is_closed(&self) -> bool { + self.inner.semaphore.is_closed() + } + pub(crate) async fn closed(&mut self) { use std::future::Future; use std::pin::Pin; diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 09f71f21..b92cbc05 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -245,4 +245,25 @@ impl<T> UnboundedSender<T> { pub async fn closed(&mut self) { self.chan.closed().await } + /// Checks if the channel has been closed. This happens when the + /// [`UnboundedReceiver`] is dropped, or when the + /// [`UnboundedReceiver::close`] method is called. + /// + /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver + /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + /// assert!(!tx.is_closed()); + /// + /// let tx2 = tx.clone(); + /// assert!(!tx2.is_closed()); + /// + /// drop(rx); + /// assert!(tx.is_closed()); + /// assert!(tx2.is_closed()); + /// ``` + pub fn is_closed(&self) -> bool { + self.chan.is_closed() + } } |