From 347e18bc7700d0e93c5da494a49728d5b2cce500 Mon Sep 17 00:00:00 2001 From: xd009642 Date: Wed, 26 Aug 2020 20:39:06 +0100 Subject: sync: add blocking_recv and blocking_send in mpsc (#2684) Fixes: #2629 --- tokio/src/sync/mpsc/bounded.rs | 70 ++++++++++++++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/mod.rs | 11 ++++--- 2 files changed, 76 insertions(+), 5 deletions(-) (limited to 'tokio/src/sync/mpsc') diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index afca8c52..2810dad0 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -155,6 +155,42 @@ impl Receiver { self.chan.recv(cx) } + /// Blocking receive to call outside of asynchronous contexts. + /// + /// # Panics + /// + /// This function panics if called within an asynchronous execution + /// context. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use tokio::runtime::Runtime; + /// use tokio::sync::mpsc; + /// + /// fn main() { + /// let (mut tx, mut rx) = mpsc::channel::(10); + /// + /// let sync_code = thread::spawn(move || { + /// assert_eq!(Some(10), rx.blocking_recv()); + /// }); + /// + /// Runtime::new() + /// .unwrap() + /// .block_on(async move { + /// let _ = tx.send(10).await; + /// }); + /// sync_code.join().unwrap() + /// } + /// ``` + #[cfg(feature = "rt-core")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))] + pub fn blocking_recv(&mut self) -> Option { + let mut enter_handle = crate::runtime::enter::enter(false); + enter_handle.block_on(self.recv()).unwrap() + } + /// Attempts to return a pending value on this receiver without blocking. /// /// This method will never block the caller in order to wait for data to @@ -393,6 +429,40 @@ impl Sender { } } + /// Blocking send to call outside of asynchronous contexts. + /// + /// # Panics + /// + /// This function panics if called within an asynchronous execution + /// context. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use tokio::runtime::Runtime; + /// use tokio::sync::mpsc; + /// + /// fn main() { + /// let (mut tx, mut rx) = mpsc::channel::(1); + /// + /// let sync_code = thread::spawn(move || { + /// tx.blocking_send(10).unwrap(); + /// }); + /// + /// Runtime::new().unwrap().block_on(async move { + /// assert_eq!(Some(10), rx.recv().await); + /// }); + /// sync_code.join().unwrap() + /// } + /// ``` + #[cfg(feature = "rt-core")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))] + pub fn blocking_send(&mut self, value: T) -> Result<(), SendError> { + let mut enter_handle = crate::runtime::enter::enter(false); + enter_handle.block_on(self.send(value)).unwrap() + } + /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item. /// /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index c489c9f9..b9599131 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -43,11 +43,10 @@ //! are two situations to consider: //! //! **Bounded channel**: If you need a bounded channel, you should use a bounded -//! Tokio `mpsc` channel for both directions of communication. To call the async -//! [`send`][bounded-send] or [`recv`][bounded-recv] methods in sync code, you -//! will need to use [`Handle::block_on`], which allow you to execute an async -//! method in synchronous code. This is necessary because a bounded channel may -//! need to wait for additional capacity to become available. +//! Tokio `mpsc` channel for both directions of communication. Instead of calling +//! the async [`send`][bounded-send] or [`recv`][bounded-recv] methods, in +//! synchronous code you will need to use the [`blocking_send`][blocking-send] or +//! [`blocking_recv`][blocking-recv] methods. //! //! **Unbounded channel**: You should use the kind of channel that matches where //! the receiver is. So for sending a message _from async to sync_, you should @@ -59,6 +58,8 @@ //! [`Receiver`]: crate::sync::mpsc::Receiver //! [bounded-send]: crate::sync::mpsc::Sender::send() //! [bounded-recv]: crate::sync::mpsc::Receiver::recv() +//! [blocking-send]: crate::sync::mpsc::Sender::blocking_send() +//! [blocking-recv]: crate::sync::mpsc::Receiver::blocking_recv() //! [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender //! [`Handle::block_on`]: crate::runtime::Handle::block_on() //! [std-unbounded]: std::sync::mpsc::channel -- cgit v1.2.3