summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorxd009642 <danielmckenna93@gmail.com>2020-08-26 20:39:06 +0100
committerGitHub <noreply@github.com>2020-08-26 21:39:06 +0200
commit347e18bc7700d0e93c5da494a49728d5b2cce500 (patch)
tree559901c07848769841d9430a59088df2ac25171c /tokio/src/sync
parent2e7e42bca78790e6b7a848445db47de77bfcd8af (diff)
sync: add blocking_recv and blocking_send in mpsc (#2684)
Fixes: #2629
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs70
-rw-r--r--tokio/src/sync/mpsc/mod.rs11
2 files changed, 76 insertions, 5 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index afca8c52..2810dad0 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -155,6 +155,42 @@ impl<T> Receiver<T> {
self.chan.recv(cx)
}
+ /// Blocking receive to call outside of asynchronous contexts.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution
+ /// context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use tokio::runtime::Runtime;
+ /// use tokio::sync::mpsc;
+ ///
+ /// fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel::<u8>(10);
+ ///
+ /// let sync_code = thread::spawn(move || {
+ /// assert_eq!(Some(10), rx.blocking_recv());
+ /// });
+ ///
+ /// Runtime::new()
+ /// .unwrap()
+ /// .block_on(async move {
+ /// let _ = tx.send(10).await;
+ /// });
+ /// sync_code.join().unwrap()
+ /// }
+ /// ```
+ #[cfg(feature = "rt-core")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))]
+ pub fn blocking_recv(&mut self) -> Option<T> {
+ let mut enter_handle = crate::runtime::enter::enter(false);
+ enter_handle.block_on(self.recv()).unwrap()
+ }
+
/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
@@ -393,6 +429,40 @@ impl<T> Sender<T> {
}
}
+ /// Blocking send to call outside of asynchronous contexts.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution
+ /// context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use tokio::runtime::Runtime;
+ /// use tokio::sync::mpsc;
+ ///
+ /// fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel::<u8>(1);
+ ///
+ /// let sync_code = thread::spawn(move || {
+ /// tx.blocking_send(10).unwrap();
+ /// });
+ ///
+ /// Runtime::new().unwrap().block_on(async move {
+ /// assert_eq!(Some(10), rx.recv().await);
+ /// });
+ /// sync_code.join().unwrap()
+ /// }
+ /// ```
+ #[cfg(feature = "rt-core")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))]
+ pub fn blocking_send(&mut self, value: T) -> Result<(), SendError<T>> {
+ let mut enter_handle = crate::runtime::enter::enter(false);
+ enter_handle.block_on(self.send(value)).unwrap()
+ }
+
/// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
///
/// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs
index c489c9f9..b9599131 100644
--- a/tokio/src/sync/mpsc/mod.rs
+++ b/tokio/src/sync/mpsc/mod.rs
@@ -43,11 +43,10 @@
//! are two situations to consider:
//!
//! **Bounded channel**: If you need a bounded channel, you should use a bounded
-//! Tokio `mpsc` channel for both directions of communication. To call the async
-//! [`send`][bounded-send] or [`recv`][bounded-recv] methods in sync code, you
-//! will need to use [`Handle::block_on`], which allow you to execute an async
-//! method in synchronous code. This is necessary because a bounded channel may
-//! need to wait for additional capacity to become available.
+//! Tokio `mpsc` channel for both directions of communication. Instead of calling
+//! the async [`send`][bounded-send] or [`recv`][bounded-recv] methods, in
+//! synchronous code you will need to use the [`blocking_send`][blocking-send] or
+//! [`blocking_recv`][blocking-recv] methods.
//!
//! **Unbounded channel**: You should use the kind of channel that matches where
//! the receiver is. So for sending a message _from async to sync_, you should
@@ -59,6 +58,8 @@
//! [`Receiver`]: crate::sync::mpsc::Receiver
//! [bounded-send]: crate::sync::mpsc::Sender::send()
//! [bounded-recv]: crate::sync::mpsc::Receiver::recv()
+//! [blocking-send]: crate::sync::mpsc::Sender::blocking_send()
+//! [blocking-recv]: crate::sync::mpsc::Receiver::blocking_recv()
//! [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
//! [`Handle::block_on`]: crate::runtime::Handle::block_on()
//! [std-unbounded]: std::sync::mpsc::channel