summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_mpsc.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/tests/sync_mpsc.rs
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r--tokio/tests/sync_mpsc.rs113
1 files changed, 42 insertions, 71 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index f724c564..040904e4 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -38,47 +39,33 @@ fn send_recv_with_buffer() {
}
#[tokio::test]
-async fn async_send_recv_with_buffer() {
- let (mut tx, mut rx) = mpsc::channel(16);
+async fn send_recv_stream_with_buffer() {
+ use futures::StreamExt;
+
+ let (mut tx, mut rx) = mpsc::channel::<i32>(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
assert_ok!(tx.send(2).await);
});
- assert_eq!(Some(1), rx.recv().await);
- assert_eq!(Some(2), rx.recv().await);
- assert_eq!(None, rx.recv().await);
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
}
-#[test]
-fn send_sink_recv_with_buffer() {
- use futures_core::Stream;
- use futures_sink::Sink;
-
- let (tx, rx) = mpsc::channel::<i32>(16);
-
- task::spawn(tx).enter(|cx, mut tx| {
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(1));
-
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(2));
+#[tokio::test]
+async fn async_send_recv_with_buffer() {
+ let (mut tx, mut rx) = mpsc::channel(16);
- assert_ready_ok!(tx.as_mut().poll_flush(cx));
- assert_ready_ok!(tx.as_mut().poll_close(cx));
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1).await);
+ assert_ok!(tx.send(2).await);
});
- task::spawn(rx).enter(|cx, mut rx| {
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(2));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert!(val.is_none());
- });
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
}
#[test]
@@ -124,11 +111,11 @@ fn buffer_gteq_one() {
fn send_recv_unbounded() {
let mut t1 = task::spawn(());
- let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
- assert_ok!(tx.try_send(1));
- assert_ok!(tx.try_send(2));
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
@@ -144,11 +131,11 @@ fn send_recv_unbounded() {
#[tokio::test]
async fn async_send_recv_unbounded() {
- let (mut tx, mut rx) = mpsc::unbounded_channel();
+ let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
- assert_ok!(tx.try_send(1));
- assert_ok!(tx.try_send(2));
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
});
assert_eq!(Some(1), rx.recv().await);
@@ -156,41 +143,20 @@ async fn async_send_recv_unbounded() {
assert_eq!(None, rx.recv().await);
}
-#[test]
-fn sink_send_recv_unbounded() {
- use futures_core::Stream;
- use futures_sink::Sink;
- use futures_util::pin_mut;
-
- let mut t1 = task::spawn(());
-
- let (tx, rx) = mpsc::unbounded_channel::<i32>();
-
- t1.enter(|cx, _| {
- pin_mut!(tx);
-
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(1));
+#[tokio::test]
+async fn send_recv_stream_unbounded() {
+ use futures::StreamExt;
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(2));
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
- assert_ready_ok!(tx.as_mut().poll_flush(cx));
- assert_ready_ok!(tx.as_mut().poll_close(cx));
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
});
- t1.enter(|cx, _| {
- pin_mut!(rx);
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(2));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert!(val.is_none());
- });
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
}
#[test]
@@ -223,7 +189,7 @@ fn no_t_bounds_unbounded() {
// same with Receiver
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
- assert!(tx.clone().try_send(NoImpls).is_ok());
+ assert!(tx.clone().send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
@@ -356,8 +322,10 @@ fn try_send_fail() {
tx.try_send("hello").unwrap();
// This should fail
- let err = assert_err!(tx.try_send("fail"));
- assert!(err.is_full());
+ match assert_err!(tx.try_send("fail")) {
+ TrySendError::Full(..) => {}
+ _ => panic!(),
+ }
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
@@ -421,7 +389,10 @@ fn dropping_rx_closes_channel_for_try() {
{
let err = assert_err!(tx.try_send(msg.clone()));
- assert!(err.is_closed());
+ match err {
+ TrySendError::Closed(..) => {}
+ _ => panic!(),
+ }
}
assert_eq!(1, Arc::strong_count(&msg));