diff options
author | Carl Lerche <me@carllerche.com> | 2019-06-27 11:33:36 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-27 11:33:36 -0700 |
commit | 32ceccb4652076629303fda890ad4b2371a0dd02 (patch) | |
tree | d03a913bf7f5e013320a995d8495172c651a1417 | |
parent | 0af05e7408cef92b834718ed1bd3578fc0fbd40e (diff) |
sync: add async APIs to oneshot and mpsc (#1211)
Adds:
- oneshot::Sender::close
- mpsc::Receiver::recv
- mpsc::Sender::send
Also renames `poll_next` to `poll_recv`.
Refs: #1210
-rw-r--r-- | tokio-macros/src/lib.rs | 16 | ||||
-rw-r--r-- | tokio-sync/Cargo.toml | 6 | ||||
-rw-r--r-- | tokio-sync/src/lib.rs | 1 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/bounded.rs | 26 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/unbounded.rs | 9 | ||||
-rw-r--r-- | tokio-sync/src/oneshot.rs | 19 | ||||
-rw-r--r-- | tokio-sync/tests/fuzz_mpsc.rs | 5 | ||||
-rw-r--r-- | tokio-sync/tests/fuzz_oneshot.rs | 1 | ||||
-rw-r--r-- | tokio-sync/tests/mpsc.rs | 81 | ||||
-rw-r--r-- | tokio-sync/tests/oneshot.rs | 20 |
10 files changed, 143 insertions, 41 deletions
diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 1562b877..c036f552 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -67,12 +67,14 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { for arg in args { match arg { - syn::NestedMeta::Meta(syn::Meta::Word(ident)) => match ident.to_string().to_lowercase().as_str() { - "multi_thread" => runtime = RuntimeType::Multi, - "single_thread" => runtime = RuntimeType::Single, - name => panic!("Unknown attribute {} is specified", name), - }, - _ => () + syn::NestedMeta::Meta(syn::Meta::Word(ident)) => { + match ident.to_string().to_lowercase().as_str() { + "multi_thread" => runtime = RuntimeType::Multi, + "single_thread" => runtime = RuntimeType::Single, + name => panic!("Unknown attribute {} is specified", name), + } + } + _ => (), } } @@ -90,7 +92,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); rt.block_on(async { #body }) } - } + }, }; result.into() diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index aedd4c62..154c6601 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -25,14 +25,14 @@ publish = false async-traits = ["async-sink", "futures-core-preview"] [dependencies] -fnv = "1.0.6" +async-util = { git = "https://github.com/tokio-rs/async" } async-sink = { git = "https://github.com/tokio-rs/async", optional = true } +fnv = "1.0.6" futures-core-preview = { version = "0.3.0-alpha.16", optional = true } [dev-dependencies] -async-util = { git = "https://github.com/tokio-rs/async" } env_logger = { version = "0.5", default-features = false } pin-utils = "0.1.0-alpha.4" -# tokio = { version = "0.2.0", path = "../tokio" } +tokio = { version = "*", path = "../tokio" } tokio-test = { version = "0.2.0", path = "../tokio-test" } loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] } diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index 67903ace..384740ac 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -7,6 +7,7 @@ )] #![cfg_attr(test, deny(warnings))] #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] +#![feature(async_await)] //! Asynchronous synchronization primitives. //! diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index 67134156..0dc9012f 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -132,7 +132,14 @@ impl<T> Receiver<T> { } /// TODO: Dox - pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + pub async fn recv(&mut self) -> Option<T> { + use async_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// TODO: Dox + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -150,7 +157,7 @@ impl<T> futures_core::Stream for Receiver<T> { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - Receiver::poll_next(self.get_mut(), cx) + self.get_mut().poll_recv(cx) } } @@ -189,6 +196,21 @@ impl<T> Sender<T> { self.chan.try_send(message)?; Ok(()) } + + /// Send a value, waiting until there is capacity. + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ``` + pub async fn send(&mut self, value: T) -> Result<(), SendError> { + use async_util::future::poll_fn; + + poll_fn(|cx| self.poll_ready(cx)).await?; + + self.try_send(value).map_err(|_| SendError(())) + } } #[cfg(feature = "async-traits")] diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 960bee41..4f7c6b26 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -88,10 +88,17 @@ impl<T> UnboundedReceiver<T> { } /// TODO: dox - pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } + /// TODO: Dox + pub async fn recv(&mut self) -> Option<T> { + use async_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index c38bb0ce..8e01b3f0 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -217,6 +217,25 @@ impl<T> Sender<T> { Pending } + /// Wait for the associated [`Receiver`] handle to drop. + /// + /// # Return + /// + /// Returns a `Future` which must be awaited on. + /// + /// [`Receiver`]: struct.Receiver.html + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ``` + pub async fn closed(&mut self) { + use async_util::future::poll_fn; + + poll_fn(|cx| self.poll_close(cx)).await + } + /// Check if the associated [`Receiver`] handle has been dropped. /// /// Unlike [`poll_close`], this function does not register a task for diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio-sync/tests/fuzz_mpsc.rs index 4a0b6441..57074ea8 100644 --- a/tokio-sync/tests/fuzz_mpsc.rs +++ b/tokio-sync/tests/fuzz_mpsc.rs @@ -1,4 +1,5 @@ #![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] #[macro_use] extern crate loom; @@ -32,10 +33,10 @@ fn closing_tx() { drop(tx); }); - let v = block_on(poll_fn(|cx| rx.poll_next(cx))); + let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); assert!(v.is_some()); - let v = block_on(poll_fn(|cx| rx.poll_next(cx))); + let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); assert!(v.is_none()); }); } diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio-sync/tests/fuzz_oneshot.rs index 9e0a1c40..1872efe0 100644 --- a/tokio-sync/tests/fuzz_oneshot.rs +++ b/tokio-sync/tests/fuzz_oneshot.rs @@ -1,4 +1,5 @@ #![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] /// Unwrap a ready value or propagate `Async::Pending`. #[macro_export] diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index 17dd3e64..e6b61854 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -1,4 +1,5 @@ #![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] use tokio_sync::mpsc; use tokio_test::task::MockTask; @@ -28,16 +29,30 @@ fn send_recv_with_buffer() { drop(tx); - let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); assert_eq!(val, Some(2)); - let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx))); assert!(val.is_none()); } +#[tokio::test] +async fn async_send_recv_with_buffer() { + let (mut tx, mut rx) = mpsc::channel(16); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + #[test] #[cfg(feature = "async-traits")] fn send_sink_recv_with_buffer() { @@ -65,13 +80,13 @@ fn send_sink_recv_with_buffer() { t1.enter(|cx| { pin_mut!(rx); - let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + let val = assert_ready!(rx.as_mut().poll_next(cx)); assert_eq!(val, Some(1)); - let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + let val = assert_ready!(rx.as_mut().poll_next(cx)); assert_eq!(val, Some(2)); - let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + let val = assert_ready!(rx.as_mut().poll_next(cx)); assert!(val.is_none()); }); } @@ -97,7 +112,7 @@ fn start_send_past_cap() { drop(tx1); - let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx))); + let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx))); assert!(val.is_some()); assert!(t2.is_woken()); @@ -105,7 +120,7 @@ fn start_send_past_cap() { drop(tx2); - let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx))); + let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx))); assert!(val.is_none()); } @@ -125,18 +140,32 @@ fn send_recv_unbounded() { assert_ok!(tx.try_send(1)); assert_ok!(tx.try_send(2)); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert_eq!(val, Some(2)); drop(tx); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert!(val.is_none()); } +#[tokio::test] +async fn async_send_recv_unbounded() { + let (mut tx, mut rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + assert_ok!(tx.try_send(1)); + assert_ok!(tx.try_send(2)); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + #[test] #[cfg(feature = "async-traits")] fn sink_send_recv_unbounded() { @@ -164,13 +193,13 @@ fn sink_send_recv_unbounded() { t1.enter(|cx| { pin_mut!(rx); - let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + let val = assert_ready!(rx.as_mut().poll_next(cx)); assert_eq!(val, Some(1)); - let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + let val = assert_ready!(rx.as_mut().poll_next(cx)); assert_eq!(val, Some(2)); - let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + let val = assert_ready!(rx.as_mut().poll_next(cx)); assert!(val.is_none()); }); } @@ -189,7 +218,7 @@ fn no_t_bounds_buffer() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert!(val.is_some()); } @@ -207,7 +236,7 @@ fn no_t_bounds_unbounded() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert!(val.is_some()); } @@ -234,7 +263,7 @@ fn send_recv_buffer_limited() { t2.enter(|cx| { // Take the value - let val = assert_ready!(rx.poll_next(cx)); + let val = assert_ready!(rx.poll_recv(cx)); assert_eq!(Some(1), val); }); @@ -251,7 +280,7 @@ fn send_recv_buffer_limited() { t2.enter(|cx| { // Take the value - let val = assert_ready!(rx.poll_next(cx)); + let val = assert_ready!(rx.poll_recv(cx)); assert_eq!(Some(2), val); }); @@ -269,7 +298,7 @@ fn recv_close_gets_none_idle() { rx.close(); t1.enter(|cx| { - let val = assert_ready!(rx.poll_next(cx)); + let val = assert_ready!(rx.poll_recv(cx)); assert!(val.is_none()); assert_ready_err!(tx.poll_ready(cx)); }); @@ -298,7 +327,7 @@ fn recv_close_gets_none_reserved() { assert_ready_err!(tx2.poll_ready(cx)); }); - t3.enter(|cx| assert_pending!(rx.poll_next(cx))); + t3.enter(|cx| assert_pending!(rx.poll_recv(cx))); assert!(!t1.is_woken()); assert!(!t2.is_woken()); @@ -308,10 +337,10 @@ fn recv_close_gets_none_reserved() { assert!(t3.is_woken()); t3.enter(|cx| { - let v = assert_ready!(rx.poll_next(cx)); + let v = assert_ready!(rx.poll_recv(cx)); assert_eq!(v, Some(123)); - let v = assert_ready!(rx.poll_next(cx)); + let v = assert_ready!(rx.poll_recv(cx)); assert!(v.is_none()); }); } @@ -324,7 +353,7 @@ fn tx_close_gets_none() { // Run on a task context t1.enter(|cx| { - let v = assert_ready!(rx.poll_next(cx)); + let v = assert_ready!(rx.poll_recv(cx)); assert!(v.is_none()); }); } @@ -341,16 +370,16 @@ fn try_send_fail() { let err = assert_err!(tx.try_send("fail")); assert!(err.is_full()); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert_eq!(val, Some("hello")); assert_ok!(tx.try_send("goodbye")); drop(tx); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert_eq!(val, Some("goodbye")); - let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx))); assert!(val.is_none()); } diff --git a/tokio-sync/tests/oneshot.rs b/tokio-sync/tests/oneshot.rs index fc78bd2e..d19fe977 100644 --- a/tokio-sync/tests/oneshot.rs +++ b/tokio-sync/tests/oneshot.rs @@ -1,4 +1,5 @@ #![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] use tokio_sync::oneshot; use tokio_test::task::MockTask; @@ -23,6 +24,14 @@ fn send_recv() { assert_eq!(val, 1); } +#[tokio::test] +async fn async_send_recv() { + let (tx, rx) = oneshot::channel(); + + assert_ok!(tx.send(1)); + assert_eq!(1, assert_ok!(rx.await)); +} + #[test] fn close_tx() { let (tx, mut rx) = oneshot::channel::<i32>(); @@ -60,6 +69,17 @@ fn close_rx() { assert_err!(tx.send(1)); } +#[tokio::test] +async fn async_rx_closed() { + let (mut tx, rx) = oneshot::channel::<()>(); + + tokio::spawn(async move { + drop(rx); + }); + + tx.closed().await; +} + #[test] fn explicit_close_poll() { // First, with message sent |