summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-06-27 11:33:36 -0700
committerGitHub <noreply@github.com>2019-06-27 11:33:36 -0700
commit32ceccb4652076629303fda890ad4b2371a0dd02 (patch)
treed03a913bf7f5e013320a995d8495172c651a1417
parent0af05e7408cef92b834718ed1bd3578fc0fbd40e (diff)
sync: add async APIs to oneshot and mpsc (#1211)
Adds: - oneshot::Sender::close - mpsc::Receiver::recv - mpsc::Sender::send Also renames `poll_next` to `poll_recv`. Refs: #1210
-rw-r--r--tokio-macros/src/lib.rs16
-rw-r--r--tokio-sync/Cargo.toml6
-rw-r--r--tokio-sync/src/lib.rs1
-rw-r--r--tokio-sync/src/mpsc/bounded.rs26
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs9
-rw-r--r--tokio-sync/src/oneshot.rs19
-rw-r--r--tokio-sync/tests/fuzz_mpsc.rs5
-rw-r--r--tokio-sync/tests/fuzz_oneshot.rs1
-rw-r--r--tokio-sync/tests/mpsc.rs81
-rw-r--r--tokio-sync/tests/oneshot.rs20
10 files changed, 143 insertions, 41 deletions
diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs
index 1562b877..c036f552 100644
--- a/tokio-macros/src/lib.rs
+++ b/tokio-macros/src/lib.rs
@@ -67,12 +67,14 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
for arg in args {
match arg {
- syn::NestedMeta::Meta(syn::Meta::Word(ident)) => match ident.to_string().to_lowercase().as_str() {
- "multi_thread" => runtime = RuntimeType::Multi,
- "single_thread" => runtime = RuntimeType::Single,
- name => panic!("Unknown attribute {} is specified", name),
- },
- _ => ()
+ syn::NestedMeta::Meta(syn::Meta::Word(ident)) => {
+ match ident.to_string().to_lowercase().as_str() {
+ "multi_thread" => runtime = RuntimeType::Multi,
+ "single_thread" => runtime = RuntimeType::Single,
+ name => panic!("Unknown attribute {} is specified", name),
+ }
+ }
+ _ => (),
}
}
@@ -90,7 +92,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
rt.block_on(async { #body })
}
- }
+ },
};
result.into()
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml
index aedd4c62..154c6601 100644
--- a/tokio-sync/Cargo.toml
+++ b/tokio-sync/Cargo.toml
@@ -25,14 +25,14 @@ publish = false
async-traits = ["async-sink", "futures-core-preview"]
[dependencies]
-fnv = "1.0.6"
+async-util = { git = "https://github.com/tokio-rs/async" }
async-sink = { git = "https://github.com/tokio-rs/async", optional = true }
+fnv = "1.0.6"
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }
[dev-dependencies]
-async-util = { git = "https://github.com/tokio-rs/async" }
env_logger = { version = "0.5", default-features = false }
pin-utils = "0.1.0-alpha.4"
-# tokio = { version = "0.2.0", path = "../tokio" }
+tokio = { version = "*", path = "../tokio" }
tokio-test = { version = "0.2.0", path = "../tokio-test" }
loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] }
diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs
index 67903ace..384740ac 100644
--- a/tokio-sync/src/lib.rs
+++ b/tokio-sync/src/lib.rs
@@ -7,6 +7,7 @@
)]
#![cfg_attr(test, deny(warnings))]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
+#![feature(async_await)]
//! Asynchronous synchronization primitives.
//!
diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs
index 67134156..0dc9012f 100644
--- a/tokio-sync/src/mpsc/bounded.rs
+++ b/tokio-sync/src/mpsc/bounded.rs
@@ -132,7 +132,14 @@ impl<T> Receiver<T> {
}
/// TODO: Dox
- pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ pub async fn recv(&mut self) -> Option<T> {
+ use async_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ /// TODO: Dox
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -150,7 +157,7 @@ impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- Receiver::poll_next(self.get_mut(), cx)
+ self.get_mut().poll_recv(cx)
}
}
@@ -189,6 +196,21 @@ impl<T> Sender<T> {
self.chan.try_send(message)?;
Ok(())
}
+
+ /// Send a value, waiting until there is capacity.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ```
+ pub async fn send(&mut self, value: T) -> Result<(), SendError> {
+ use async_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_ready(cx)).await?;
+
+ self.try_send(value).map_err(|_| SendError(()))
+ }
}
#[cfg(feature = "async-traits")]
diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs
index 960bee41..4f7c6b26 100644
--- a/tokio-sync/src/mpsc/unbounded.rs
+++ b/tokio-sync/src/mpsc/unbounded.rs
@@ -88,10 +88,17 @@ impl<T> UnboundedReceiver<T> {
}
/// TODO: dox
- pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
+ /// TODO: Dox
+ pub async fn recv(&mut self) -> Option<T> {
+ use async_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs
index c38bb0ce..8e01b3f0 100644
--- a/tokio-sync/src/oneshot.rs
+++ b/tokio-sync/src/oneshot.rs
@@ -217,6 +217,25 @@ impl<T> Sender<T> {
Pending
}
+ /// Wait for the associated [`Receiver`] handle to drop.
+ ///
+ /// # Return
+ ///
+ /// Returns a `Future` which must be awaited on.
+ ///
+ /// [`Receiver`]: struct.Receiver.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ```
+ pub async fn closed(&mut self) {
+ use async_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_close(cx)).await
+ }
+
/// Check if the associated [`Receiver`] handle has been dropped.
///
/// Unlike [`poll_close`], this function does not register a task for
diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio-sync/tests/fuzz_mpsc.rs
index 4a0b6441..57074ea8 100644
--- a/tokio-sync/tests/fuzz_mpsc.rs
+++ b/tokio-sync/tests/fuzz_mpsc.rs
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
+#![feature(async_await)]
#[macro_use]
extern crate loom;
@@ -32,10 +33,10 @@ fn closing_tx() {
drop(tx);
});
- let v = block_on(poll_fn(|cx| rx.poll_next(cx)));
+ let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
assert!(v.is_some());
- let v = block_on(poll_fn(|cx| rx.poll_next(cx)));
+ let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
assert!(v.is_none());
});
}
diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio-sync/tests/fuzz_oneshot.rs
index 9e0a1c40..1872efe0 100644
--- a/tokio-sync/tests/fuzz_oneshot.rs
+++ b/tokio-sync/tests/fuzz_oneshot.rs
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
+#![feature(async_await)]
/// Unwrap a ready value or propagate `Async::Pending`.
#[macro_export]
diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs
index 17dd3e64..e6b61854 100644
--- a/tokio-sync/tests/mpsc.rs
+++ b/tokio-sync/tests/mpsc.rs
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
+#![feature(async_await)]
use tokio_sync::mpsc;
use tokio_test::task::MockTask;
@@ -28,16 +29,30 @@ fn send_recv_with_buffer() {
drop(tx);
- let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
- let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_none());
}
+#[tokio::test]
+async fn async_send_recv_with_buffer() {
+ let (mut tx, mut rx) = mpsc::channel(16);
+
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1).await);
+ assert_ok!(tx.send(2).await);
+ });
+
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
+}
+
#[test]
#[cfg(feature = "async-traits")]
fn send_sink_recv_with_buffer() {
@@ -65,13 +80,13 @@ fn send_sink_recv_with_buffer() {
t1.enter(|cx| {
pin_mut!(rx);
- let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));
- let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(2));
- let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ let val = assert_ready!(rx.as_mut().poll_next(cx));
assert!(val.is_none());
});
}
@@ -97,7 +112,7 @@ fn start_send_past_cap() {
drop(tx1);
- let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx)));
+ let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_some());
assert!(t2.is_woken());
@@ -105,7 +120,7 @@ fn start_send_past_cap() {
drop(tx2);
- let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx)));
+ let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_none());
}
@@ -125,18 +140,32 @@ fn send_recv_unbounded() {
assert_ok!(tx.try_send(1));
assert_ok!(tx.try_send(2));
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
drop(tx);
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_none());
}
+#[tokio::test]
+async fn async_send_recv_unbounded() {
+ let (mut tx, mut rx) = mpsc::unbounded_channel();
+
+ tokio::spawn(async move {
+ assert_ok!(tx.try_send(1));
+ assert_ok!(tx.try_send(2));
+ });
+
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
+}
+
#[test]
#[cfg(feature = "async-traits")]
fn sink_send_recv_unbounded() {
@@ -164,13 +193,13 @@ fn sink_send_recv_unbounded() {
t1.enter(|cx| {
pin_mut!(rx);
- let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));
- let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(2));
- let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ let val = assert_ready!(rx.as_mut().poll_next(cx));
assert!(val.is_none());
});
}
@@ -189,7 +218,7 @@ fn no_t_bounds_buffer() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_some());
}
@@ -207,7 +236,7 @@ fn no_t_bounds_unbounded() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_some());
}
@@ -234,7 +263,7 @@ fn send_recv_buffer_limited() {
t2.enter(|cx| {
// Take the value
- let val = assert_ready!(rx.poll_next(cx));
+ let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(1), val);
});
@@ -251,7 +280,7 @@ fn send_recv_buffer_limited() {
t2.enter(|cx| {
// Take the value
- let val = assert_ready!(rx.poll_next(cx));
+ let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(2), val);
});
@@ -269,7 +298,7 @@ fn recv_close_gets_none_idle() {
rx.close();
t1.enter(|cx| {
- let val = assert_ready!(rx.poll_next(cx));
+ let val = assert_ready!(rx.poll_recv(cx));
assert!(val.is_none());
assert_ready_err!(tx.poll_ready(cx));
});
@@ -298,7 +327,7 @@ fn recv_close_gets_none_reserved() {
assert_ready_err!(tx2.poll_ready(cx));
});
- t3.enter(|cx| assert_pending!(rx.poll_next(cx)));
+ t3.enter(|cx| assert_pending!(rx.poll_recv(cx)));
assert!(!t1.is_woken());
assert!(!t2.is_woken());
@@ -308,10 +337,10 @@ fn recv_close_gets_none_reserved() {
assert!(t3.is_woken());
t3.enter(|cx| {
- let v = assert_ready!(rx.poll_next(cx));
+ let v = assert_ready!(rx.poll_recv(cx));
assert_eq!(v, Some(123));
- let v = assert_ready!(rx.poll_next(cx));
+ let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
}
@@ -324,7 +353,7 @@ fn tx_close_gets_none() {
// Run on a task context
t1.enter(|cx| {
- let v = assert_ready!(rx.poll_next(cx));
+ let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
}
@@ -341,16 +370,16 @@ fn try_send_fail() {
let err = assert_err!(tx.try_send("fail"));
assert!(err.is_full());
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
assert_ok!(tx.try_send("goodbye"));
drop(tx);
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert_eq!(val, Some("goodbye"));
- let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
assert!(val.is_none());
}
diff --git a/tokio-sync/tests/oneshot.rs b/tokio-sync/tests/oneshot.rs
index fc78bd2e..d19fe977 100644
--- a/tokio-sync/tests/oneshot.rs
+++ b/tokio-sync/tests/oneshot.rs
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
+#![feature(async_await)]
use tokio_sync::oneshot;
use tokio_test::task::MockTask;
@@ -23,6 +24,14 @@ fn send_recv() {
assert_eq!(val, 1);
}
+#[tokio::test]
+async fn async_send_recv() {
+ let (tx, rx) = oneshot::channel();
+
+ assert_ok!(tx.send(1));
+ assert_eq!(1, assert_ok!(rx.await));
+}
+
#[test]
fn close_tx() {
let (tx, mut rx) = oneshot::channel::<i32>();
@@ -60,6 +69,17 @@ fn close_rx() {
assert_err!(tx.send(1));
}
+#[tokio::test]
+async fn async_rx_closed() {
+ let (mut tx, rx) = oneshot::channel::<()>();
+
+ tokio::spawn(async move {
+ drop(rx);
+ });
+
+ tx.closed().await;
+}
+
#[test]
fn explicit_close_poll() {
// First, with message sent