summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_watch.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/tests/sync_watch.rs
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/tests/sync_watch.rs')
-rw-r--r--tokio/tests/sync_watch.rs157
1 files changed, 61 insertions, 96 deletions
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
index 4d73bc81..7ccad5c2 100644
--- a/tokio/tests/sync_watch.rs
+++ b/tokio/tests/sync_watch.rs
@@ -5,41 +5,6 @@ use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
#[test]
-fn single_rx_recv_ref() {
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.recv_ref());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(*v, "one");
- }
-
- {
- let mut t = spawn(rx.recv_ref());
-
- assert_pending!(t.poll());
-
- tx.broadcast("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(*v, "two");
- }
-
- {
- let mut t = spawn(rx.recv_ref());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert!(res.is_none());
- }
-}
-
-#[test]
fn single_rx_recv() {
let (tx, mut rx) = watch::channel("one");
@@ -75,62 +40,25 @@ fn single_rx_recv() {
}
#[test]
-fn stream_impl() {
- use tokio::prelude::*;
-
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.next());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "one");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- tx.broadcast("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "two");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert!(res.is_none());
- }
-}
-
-#[test]
fn multi_rx() {
let (tx, mut rx1) = watch::channel("one");
let mut rx2 = rx1.clone();
{
- let mut t1 = spawn(rx1.recv_ref());
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t1 = spawn(rx1.recv());
+ let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
}
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t2 = spawn(rx2.recv());
{
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -141,11 +69,11 @@ fn multi_rx() {
assert!(t2.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "two");
+ assert_eq!(res.unwrap(), "two");
}
{
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
@@ -155,17 +83,17 @@ fn multi_rx() {
assert!(t2.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
}
drop(t2);
{
- let mut t1 = spawn(rx1.recv_ref());
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t1 = spawn(rx1.recv());
+ let mut t2 = spawn(rx2.recv());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -173,10 +101,10 @@ fn multi_rx() {
tx.broadcast("four").unwrap();
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "four");
+ assert_eq!(res.unwrap(), "four");
drop(t1);
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
drop(tx);
@@ -186,10 +114,10 @@ fn multi_rx() {
assert!(res.is_none());
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "four");
+ assert_eq!(res.unwrap(), "four");
drop(t2);
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t2.poll());
assert!(res.is_none());
}
@@ -203,13 +131,13 @@ fn rx_observes_final_value() {
drop(tx);
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
}
@@ -221,13 +149,13 @@ fn rx_observes_final_value() {
tx.broadcast("two").unwrap();
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "two");
+ assert_eq!(res.unwrap(), "two");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
assert_pending!(t1.poll());
tx.broadcast("three").unwrap();
@@ -236,11 +164,11 @@ fn rx_observes_final_value() {
assert!(t1.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
}
@@ -262,3 +190,40 @@ fn poll_close() {
assert!(tx.broadcast("two").is_err());
}
+
+#[test]
+fn stream_impl() {
+ use futures::StreamExt;
+
+ let (tx, mut rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(rx.next());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "one");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "two");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
+ assert!(res.is_none());
+ }
+}