diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-23 13:24:30 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-23 13:24:30 -0800 |
commit | 7079bcd60975f592e08fcd575991f6ae2a409a1f (patch) | |
tree | b4dc701039f9a8d984fa9d5b3835b8094d6359f1 /tokio | |
parent | f8714e9901981d5d1cc66c9a03d5a8a0b2f2eb4b (diff) |
future: provide join! macro (#2158)
Provides a `join!` macro that supports concurrently driving multiple
futures on the same task and await the completion of all futures.
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/future/maybe_done.rs | 6 | ||||
-rw-r--r-- | tokio/src/future/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/macros/join.rs | 112 | ||||
-rw-r--r-- | tokio/src/macros/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/macros/select.rs | 4 | ||||
-rw-r--r-- | tokio/src/macros/support.rs | 2 | ||||
-rw-r--r-- | tokio/tests/macros_join.rs | 71 |
7 files changed, 194 insertions, 6 deletions
diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs index 5011544c..94b829f2 100644 --- a/tokio/src/future/maybe_done.rs +++ b/tokio/src/future/maybe_done.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; /// A future that may have completed. #[derive(Debug)] -pub(crate) enum MaybeDone<Fut: Future> { +pub enum MaybeDone<Fut: Future> { /// A not-yet-completed future Future(Fut), /// The output of the completed future @@ -21,7 +21,7 @@ pub(crate) enum MaybeDone<Fut: Future> { impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} /// Wraps a future into a `MaybeDone` -pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { +pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { MaybeDone::Future(future) } @@ -43,7 +43,7 @@ impl<Fut: Future> MaybeDone<Fut> { /// Attempt to take the output of a `MaybeDone` without driving it /// towards completion. #[inline] - pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { + pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { unsafe { let this = self.get_unchecked_mut(); match this { diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index c5225600..770753f3 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -3,7 +3,7 @@ //! Asynchronous values. mod maybe_done; -pub(crate) use maybe_done::{maybe_done, MaybeDone}; +pub use maybe_done::{maybe_done, MaybeDone}; mod poll_fn; pub use poll_fn::poll_fn; diff --git a/tokio/src/macros/join.rs b/tokio/src/macros/join.rs new file mode 100644 index 00000000..c164bb04 --- /dev/null +++ b/tokio/src/macros/join.rs @@ -0,0 +1,112 @@ +/// Wait on multiple concurrent branches, returning when **all** branches +/// complete. +/// +/// The `join!` macro must be used inside of async functions, closures, and +/// blocks. +/// +/// The `join!` macro takes a list of async expressions and evaluates them +/// concurrently on the same task. Each async expression evaluates to a future +/// and the futures from each expression are multiplexed on the current task. +/// +/// # Notes +/// +/// The supplied futures are stored inline and does not require allocating a +/// `Vec`. +/// +/// ### Runtime characteristics +/// +/// By running all async expressions on the current task, the expressions are +/// able to run **concurrently** but not in **parallel**. This means all +/// expressions are run on the same thread and if one branch blocks the thread, +/// all other expressions will be unable to continue. If parallelism is +/// required, spawn each async expression using [`tokio::spawn`] and pass the +/// join handle to `join!`. +/// +/// [`tokio::spawn`]: crate::spawn +/// +/// # Examples +/// +/// Basic join with two branches +/// +/// ``` +/// async fn do_stuff_async() { +/// // async work +/// } +/// +/// async fn more_async_work() { +/// // more here +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let (first, second) = tokio::join!( +/// do_stuff_async(), +/// more_async_work()); +/// +/// // do something with the values +/// } +/// ``` +#[macro_export] +macro_rules! join { + (@ { + // One `_` for each branch in the `join!` macro. This is not used once + // normalization is complete. + ( $($count:tt)* ) + + // Normalized join! branches + $( ( $($skip:tt)* ) $e:expr, )* + + }) => {{ + use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; + use $crate::macros::support::Poll::{Ready, Pending}; + + // Safety: nothing must be moved out of `futures`. This is to satisfy + // the requirement of `Pin::new_unchecked` called below. + let mut futures = ( $( maybe_done($e), )* ); + + poll_fn(move |cx| { + let mut is_pending = false; + + $( + // Extract the future for this branch from the tuple. + let ( $($skip,)* fut, .. ) = &mut futures; + + // Safety: future is stored on the stack above + // and never moved. + let mut fut = unsafe { Pin::new_unchecked(fut) }; + + // Try polling + if fut.poll(cx).is_pending() { + is_pending = true; + } + )* + + if is_pending { + Pending + } else { + Ready(($({ + // Extract the future for this branch from the tuple. + let ( $($skip,)* fut, .. ) = &mut futures; + + // Safety: future is stored on the stack above + // and never moved. + let mut fut = unsafe { Pin::new_unchecked(fut) }; + + fut.take_output().expect("expected completed future") + },)*)) + } + }).await + }}; + + // ===== Normalize ===== + + (@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { + $crate::join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*) + }; + + // ===== Entry point ===== + + ( $($e:expr),* $(,)?) => { + $crate::join!(@{ () } $($e,)*) + }; +} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index d6026b96..d35a6411 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -8,6 +8,9 @@ mod assert; mod cfg; #[macro_use] +mod join; + +#[macro_use] mod loom; #[macro_use] diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index 00c2c6b5..ddb16f5d 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -1,6 +1,9 @@ /// Wait on multiple concurrent branches, returning when the **first** branch /// completes, cancelling the remaining branches. /// +/// The `select!` macro must be used inside of async functions, closures, and +/// blocks. +/// /// The `select` macro accepts one or more branches with the following pattern: /// /// ```text @@ -159,7 +162,6 @@ /// } /// }; /// } -/// /// ``` /// /// Basic stream selecting. diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs index 079cb085..1f6fcd3d 100644 --- a/tokio/src/macros/support.rs +++ b/tokio/src/macros/support.rs @@ -1,4 +1,4 @@ -pub use crate::future::poll_fn; +pub use crate::future::{maybe_done, poll_fn}; pub use crate::util::thread_rng_n; pub use std::future::Future; diff --git a/tokio/tests/macros_join.rs b/tokio/tests/macros_join.rs new file mode 100644 index 00000000..ab168e81 --- /dev/null +++ b/tokio/tests/macros_join.rs @@ -0,0 +1,71 @@ +use tokio::sync::oneshot; +use tokio_test::{assert_pending, assert_ready, task}; + +#[tokio::test] +async fn sync_one_lit_expr_comma() { + let foo = tokio::join!(async { 1 },); + + assert_eq!(foo, (1,)); +} + +#[tokio::test] +async fn sync_one_lit_expr_no_comma() { + let foo = tokio::join!(async { 1 }); + + assert_eq!(foo, (1,)); +} + +#[tokio::test] +async fn sync_two_lit_expr_comma() { + let foo = tokio::join!(async { 1 }, async { 2 },); + + assert_eq!(foo, (1, 2)); +} + +#[tokio::test] +async fn sync_two_lit_expr_no_comma() { + let foo = tokio::join!(async { 1 }, async { 2 }); + + assert_eq!(foo, (1, 2)); +} + +#[tokio::test] +async fn sync_two_await() { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let mut join = task::spawn(async { + tokio::join!(async { rx1.await.unwrap() }, async { rx2.await.unwrap() }) + }); + + assert_pending!(join.poll()); + + tx2.send(123).unwrap(); + assert!(join.is_woken()); + assert_pending!(join.poll()); + + tx1.send("hello").unwrap(); + assert!(join.is_woken()); + let res = assert_ready!(join.poll()); + + assert_eq!(("hello", 123), res); +} + +#[test] +fn join_size() { + use futures::future; + use std::mem; + + let fut = async { + let ready = future::ready(0i32); + tokio::join!(ready) + }; + assert_eq!(mem::size_of_val(&fut), 16); + + let fut = async { + let ready1 = future::ready(0i32); + let ready2 = future::ready(0i32); + tokio::join!(ready1, ready2) + }; + assert_eq!(mem::size_of_val(&fut), 28); +} |