diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/tests/sync_mpsc.rs | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/tests/sync_mpsc.rs')
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 113 |
1 files changed, 42 insertions, 71 deletions
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f724c564..040904e4 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -38,47 +39,33 @@ fn send_recv_with_buffer() { } #[tokio::test] -async fn async_send_recv_with_buffer() { - let (mut tx, mut rx) = mpsc::channel(16); +async fn send_recv_stream_with_buffer() { + use futures::StreamExt; + + let (mut tx, mut rx) = mpsc::channel::<i32>(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); assert_ok!(tx.send(2).await); }); - assert_eq!(Some(1), rx.recv().await); - assert_eq!(Some(2), rx.recv().await); - assert_eq!(None, rx.recv().await); + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); } -#[test] -fn send_sink_recv_with_buffer() { - use futures_core::Stream; - use futures_sink::Sink; - - let (tx, rx) = mpsc::channel::<i32>(16); - - task::spawn(tx).enter(|cx, mut tx| { - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(1)); - - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(2)); +#[tokio::test] +async fn async_send_recv_with_buffer() { + let (mut tx, mut rx) = mpsc::channel(16); - assert_ready_ok!(tx.as_mut().poll_flush(cx)); - assert_ready_ok!(tx.as_mut().poll_close(cx)); + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); }); - task::spawn(rx).enter(|cx, mut rx| { - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(1)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(2)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert!(val.is_none()); - }); + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); } #[test] @@ -124,11 +111,11 @@ fn buffer_gteq_one() { fn send_recv_unbounded() { let mut t1 = task::spawn(()); - let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>(); + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); // Using `try_send` - assert_ok!(tx.try_send(1)); - assert_ok!(tx.try_send(2)); + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(1)); @@ -144,11 +131,11 @@ fn send_recv_unbounded() { #[tokio::test] async fn async_send_recv_unbounded() { - let (mut tx, mut rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { - assert_ok!(tx.try_send(1)); - assert_ok!(tx.try_send(2)); + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); }); assert_eq!(Some(1), rx.recv().await); @@ -156,41 +143,20 @@ async fn async_send_recv_unbounded() { assert_eq!(None, rx.recv().await); } -#[test] -fn sink_send_recv_unbounded() { - use futures_core::Stream; - use futures_sink::Sink; - use futures_util::pin_mut; - - let mut t1 = task::spawn(()); - - let (tx, rx) = mpsc::unbounded_channel::<i32>(); - - t1.enter(|cx, _| { - pin_mut!(tx); - - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(1)); +#[tokio::test] +async fn send_recv_stream_unbounded() { + use futures::StreamExt; - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(2)); + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); - assert_ready_ok!(tx.as_mut().poll_flush(cx)); - assert_ready_ok!(tx.as_mut().poll_close(cx)); + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); }); - t1.enter(|cx, _| { - pin_mut!(rx); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(1)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(2)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert!(val.is_none()); - }); + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); } #[test] @@ -223,7 +189,7 @@ fn no_t_bounds_unbounded() { // same with Receiver println!("{:?}", rx); // and sender should be Clone even though T isn't Clone - assert!(tx.clone().try_send(NoImpls).is_ok()); + assert!(tx.clone().send(NoImpls).is_ok()); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); @@ -356,8 +322,10 @@ fn try_send_fail() { tx.try_send("hello").unwrap(); // This should fail - let err = assert_err!(tx.try_send("fail")); - assert!(err.is_full()); + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("hello")); @@ -421,7 +389,10 @@ fn dropping_rx_closes_channel_for_try() { { let err = assert_err!(tx.try_send(msg.clone())); - assert!(err.is_closed()); + match err { + TrySendError::Closed(..) => {} + _ => panic!(), + } } assert_eq!(1, Arc::strong_count(&msg)); |