summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-01-23 13:24:30 -0800
committerGitHub <noreply@github.com>2020-01-23 13:24:30 -0800
commit7079bcd60975f592e08fcd575991f6ae2a409a1f (patch)
treeb4dc701039f9a8d984fa9d5b3835b8094d6359f1 /tokio
parentf8714e9901981d5d1cc66c9a03d5a8a0b2f2eb4b (diff)
future: provide join! macro (#2158)
Provides a `join!` macro that supports concurrently driving multiple futures on the same task and await the completion of all futures.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/future/maybe_done.rs6
-rw-r--r--tokio/src/future/mod.rs2
-rw-r--r--tokio/src/macros/join.rs112
-rw-r--r--tokio/src/macros/mod.rs3
-rw-r--r--tokio/src/macros/select.rs4
-rw-r--r--tokio/src/macros/support.rs2
-rw-r--r--tokio/tests/macros_join.rs71
7 files changed, 194 insertions, 6 deletions
diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs
index 5011544c..94b829f2 100644
--- a/tokio/src/future/maybe_done.rs
+++ b/tokio/src/future/maybe_done.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
/// A future that may have completed.
#[derive(Debug)]
-pub(crate) enum MaybeDone<Fut: Future> {
+pub enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
Future(Fut),
/// The output of the completed future
@@ -21,7 +21,7 @@ pub(crate) enum MaybeDone<Fut: Future> {
impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
/// Wraps a future into a `MaybeDone`
-pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
+pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
MaybeDone::Future(future)
}
@@ -43,7 +43,7 @@ impl<Fut: Future> MaybeDone<Fut> {
/// Attempt to take the output of a `MaybeDone` without driving it
/// towards completion.
#[inline]
- pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
+ pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
unsafe {
let this = self.get_unchecked_mut();
match this {
diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs
index c5225600..770753f3 100644
--- a/tokio/src/future/mod.rs
+++ b/tokio/src/future/mod.rs
@@ -3,7 +3,7 @@
//! Asynchronous values.
mod maybe_done;
-pub(crate) use maybe_done::{maybe_done, MaybeDone};
+pub use maybe_done::{maybe_done, MaybeDone};
mod poll_fn;
pub use poll_fn::poll_fn;
diff --git a/tokio/src/macros/join.rs b/tokio/src/macros/join.rs
new file mode 100644
index 00000000..c164bb04
--- /dev/null
+++ b/tokio/src/macros/join.rs
@@ -0,0 +1,112 @@
+/// Wait on multiple concurrent branches, returning when **all** branches
+/// complete.
+///
+/// The `join!` macro must be used inside of async functions, closures, and
+/// blocks.
+///
+/// The `join!` macro takes a list of async expressions and evaluates them
+/// concurrently on the same task. Each async expression evaluates to a future
+/// and the futures from each expression are multiplexed on the current task.
+///
+/// # Notes
+///
+/// The supplied futures are stored inline and does not require allocating a
+/// `Vec`.
+///
+/// ### Runtime characteristics
+///
+/// By running all async expressions on the current task, the expressions are
+/// able to run **concurrently** but not in **parallel**. This means all
+/// expressions are run on the same thread and if one branch blocks the thread,
+/// all other expressions will be unable to continue. If parallelism is
+/// required, spawn each async expression using [`tokio::spawn`] and pass the
+/// join handle to `join!`.
+///
+/// [`tokio::spawn`]: crate::spawn
+///
+/// # Examples
+///
+/// Basic join with two branches
+///
+/// ```
+/// async fn do_stuff_async() {
+/// // async work
+/// }
+///
+/// async fn more_async_work() {
+/// // more here
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (first, second) = tokio::join!(
+/// do_stuff_async(),
+/// more_async_work());
+///
+/// // do something with the values
+/// }
+/// ```
+#[macro_export]
+macro_rules! join {
+ (@ {
+ // One `_` for each branch in the `join!` macro. This is not used once
+ // normalization is complete.
+ ( $($count:tt)* )
+
+ // Normalized join! branches
+ $( ( $($skip:tt)* ) $e:expr, )*
+
+ }) => {{
+ use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
+ use $crate::macros::support::Poll::{Ready, Pending};
+
+ // Safety: nothing must be moved out of `futures`. This is to satisfy
+ // the requirement of `Pin::new_unchecked` called below.
+ let mut futures = ( $( maybe_done($e), )* );
+
+ poll_fn(move |cx| {
+ let mut is_pending = false;
+
+ $(
+ // Extract the future for this branch from the tuple.
+ let ( $($skip,)* fut, .. ) = &mut futures;
+
+ // Safety: future is stored on the stack above
+ // and never moved.
+ let mut fut = unsafe { Pin::new_unchecked(fut) };
+
+ // Try polling
+ if fut.poll(cx).is_pending() {
+ is_pending = true;
+ }
+ )*
+
+ if is_pending {
+ Pending
+ } else {
+ Ready(($({
+ // Extract the future for this branch from the tuple.
+ let ( $($skip,)* fut, .. ) = &mut futures;
+
+ // Safety: future is stored on the stack above
+ // and never moved.
+ let mut fut = unsafe { Pin::new_unchecked(fut) };
+
+ fut.take_output().expect("expected completed future")
+ },)*))
+ }
+ }).await
+ }};
+
+ // ===== Normalize =====
+
+ (@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
+ $crate::join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*)
+ };
+
+ // ===== Entry point =====
+
+ ( $($e:expr),* $(,)?) => {
+ $crate::join!(@{ () } $($e,)*)
+ };
+}
diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs
index d6026b96..d35a6411 100644
--- a/tokio/src/macros/mod.rs
+++ b/tokio/src/macros/mod.rs
@@ -8,6 +8,9 @@ mod assert;
mod cfg;
#[macro_use]
+mod join;
+
+#[macro_use]
mod loom;
#[macro_use]
diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs
index 00c2c6b5..ddb16f5d 100644
--- a/tokio/src/macros/select.rs
+++ b/tokio/src/macros/select.rs
@@ -1,6 +1,9 @@
/// Wait on multiple concurrent branches, returning when the **first** branch
/// completes, cancelling the remaining branches.
///
+/// The `select!` macro must be used inside of async functions, closures, and
+/// blocks.
+///
/// The `select` macro accepts one or more branches with the following pattern:
///
/// ```text
@@ -159,7 +162,6 @@
/// }
/// };
/// }
-///
/// ```
///
/// Basic stream selecting.
diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs
index 079cb085..1f6fcd3d 100644
--- a/tokio/src/macros/support.rs
+++ b/tokio/src/macros/support.rs
@@ -1,4 +1,4 @@
-pub use crate::future::poll_fn;
+pub use crate::future::{maybe_done, poll_fn};
pub use crate::util::thread_rng_n;
pub use std::future::Future;
diff --git a/tokio/tests/macros_join.rs b/tokio/tests/macros_join.rs
new file mode 100644
index 00000000..ab168e81
--- /dev/null
+++ b/tokio/tests/macros_join.rs
@@ -0,0 +1,71 @@
+use tokio::sync::oneshot;
+use tokio_test::{assert_pending, assert_ready, task};
+
+#[tokio::test]
+async fn sync_one_lit_expr_comma() {
+ let foo = tokio::join!(async { 1 },);
+
+ assert_eq!(foo, (1,));
+}
+
+#[tokio::test]
+async fn sync_one_lit_expr_no_comma() {
+ let foo = tokio::join!(async { 1 });
+
+ assert_eq!(foo, (1,));
+}
+
+#[tokio::test]
+async fn sync_two_lit_expr_comma() {
+ let foo = tokio::join!(async { 1 }, async { 2 },);
+
+ assert_eq!(foo, (1, 2));
+}
+
+#[tokio::test]
+async fn sync_two_lit_expr_no_comma() {
+ let foo = tokio::join!(async { 1 }, async { 2 });
+
+ assert_eq!(foo, (1, 2));
+}
+
+#[tokio::test]
+async fn sync_two_await() {
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+
+ let mut join = task::spawn(async {
+ tokio::join!(async { rx1.await.unwrap() }, async { rx2.await.unwrap() })
+ });
+
+ assert_pending!(join.poll());
+
+ tx2.send(123).unwrap();
+ assert!(join.is_woken());
+ assert_pending!(join.poll());
+
+ tx1.send("hello").unwrap();
+ assert!(join.is_woken());
+ let res = assert_ready!(join.poll());
+
+ assert_eq!(("hello", 123), res);
+}
+
+#[test]
+fn join_size() {
+ use futures::future;
+ use std::mem;
+
+ let fut = async {
+ let ready = future::ready(0i32);
+ tokio::join!(ready)
+ };
+ assert_eq!(mem::size_of_val(&fut), 16);
+
+ let fut = async {
+ let ready1 = future::ready(0i32);
+ let ready2 = future::ready(0i32);
+ tokio::join!(ready1, ready2)
+ };
+ assert_eq!(mem::size_of_val(&fut), 28);
+}