summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/sync_watch.rs')
-rw-r--r--tokio/tests/sync_watch.rs157
1 files changed, 61 insertions, 96 deletions
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
index 4d73bc81..7ccad5c2 100644
--- a/tokio/tests/sync_watch.rs
+++ b/tokio/tests/sync_watch.rs
@@ -5,41 +5,6 @@ use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
#[test]
-fn single_rx_recv_ref() {
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.recv_ref());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(*v, "one");
- }
-
- {
- let mut t = spawn(rx.recv_ref());
-
- assert_pending!(t.poll());
-
- tx.broadcast("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(*v, "two");
- }
-
- {
- let mut t = spawn(rx.recv_ref());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert!(res.is_none());
- }
-}
-
-#[test]
fn single_rx_recv() {
let (tx, mut rx) = watch::channel("one");
@@ -75,62 +40,25 @@ fn single_rx_recv() {
}
#[test]
-fn stream_impl() {
- use tokio::prelude::*;
-
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.next());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "one");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- tx.broadcast("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "two");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert!(res.is_none());
- }
-}
-
-#[test]
fn multi_rx() {
let (tx, mut rx1) = watch::channel("one");
let mut rx2 = rx1.clone();
{
- let mut t1 = spawn(rx1.recv_ref());
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t1 = spawn(rx1.recv());
+ let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
}
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t2 = spawn(rx2.recv());
{
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -141,11 +69,11 @@ fn multi_rx() {
assert!(t2.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "two");
+ assert_eq!(res.unwrap(), "two");
}
{
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
@@ -155,17 +83,17 @@ fn multi_rx() {
assert!(t2.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
}
drop(t2);
{
- let mut t1 = spawn(rx1.recv_ref());
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t1 = spawn(rx1.recv());
+ let mut t2 = spawn(rx2.recv());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -173,10 +101,10 @@ fn multi_rx() {
tx.broadcast("four").unwrap();
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "four");
+ assert_eq!(res.unwrap(), "four");
drop(t1);
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
drop(tx);
@@ -186,10 +114,10 @@ fn multi_rx() {
assert!(res.is_none());
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "four");
+ assert_eq!(res.unwrap(), "four");
drop(t2);
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t2.poll());
assert!(res.is_none());
}
@@ -203,13 +131,13 @@ fn rx_observes_final_value() {
drop(tx);
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
}
@@ -221,13 +149,13 @@ fn rx_observes_final_value() {
tx.broadcast("two").unwrap();
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "two");
+ assert_eq!(res.unwrap(), "two");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
assert_pending!(t1.poll());
tx.broadcast("three").unwrap();
@@ -236,11 +164,11 @@ fn rx_observes_final_value() {
assert!(t1.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
}
@@ -262,3 +190,40 @@ fn poll_close() {
assert!(tx.broadcast("two").is_err());
}
+
+#[test]
+fn stream_impl() {
+ use futures::StreamExt;
+
+ let (tx, mut rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(rx.next());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "one");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "two");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
+ assert!(res.is_none());
+ }
+}