summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/sync_watch.rs')
-rw-r--r--tokio/tests/sync_watch.rs154
1 files changed, 47 insertions, 107 deletions
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
index 5d550443..9dcb0c53 100644
--- a/tokio/tests/sync_watch.rs
+++ b/tokio/tests/sync_watch.rs
@@ -4,41 +4,41 @@
use tokio::sync::watch;
use tokio_test::task::spawn;
-use tokio_test::{assert_pending, assert_ready};
+use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
#[test]
fn single_rx_recv() {
let (tx, mut rx) = watch::channel("one");
{
- let mut t = spawn(rx.recv());
- let v = assert_ready!(t.poll());
- assert_eq!(v, "one");
+ // Not initially notified
+ let mut t = spawn(rx.changed());
+ assert_pending!(t.poll());
}
+ assert_eq!(*rx.borrow(), "one");
{
- let mut t = spawn(rx.recv());
-
+ let mut t = spawn(rx.changed());
assert_pending!(t.poll());
tx.send("two").unwrap();
assert!(t.is_woken());
- let v = assert_ready!(t.poll());
- assert_eq!(v, "two");
+ assert_ready_ok!(t.poll());
}
+ assert_eq!(*rx.borrow(), "two");
{
- let mut t = spawn(rx.recv());
-
+ let mut t = spawn(rx.changed());
assert_pending!(t.poll());
drop(tx);
- let res = assert_ready!(t.poll());
- assert_eq!(res, "two");
+ assert!(t.is_woken());
+ assert_ready_err!(t.poll());
}
+ assert_eq!(*rx.borrow(), "two");
}
#[test]
@@ -47,20 +47,19 @@ fn multi_rx() {
let mut rx2 = rx1.clone();
{
- let mut t1 = spawn(rx1.recv());
- let mut t2 = spawn(rx2.recv());
-
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "one");
+ let mut t1 = spawn(rx1.changed());
+ let mut t2 = spawn(rx2.changed());
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "one");
+ assert_pending!(t1.poll());
+ assert_pending!(t2.poll());
}
+ assert_eq!(*rx1.borrow(), "one");
+ assert_eq!(*rx2.borrow(), "one");
- let mut t2 = spawn(rx2.recv());
+ let mut t2 = spawn(rx2.changed());
{
- let mut t1 = spawn(rx1.recv());
+ let mut t1 = spawn(rx1.changed());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -70,12 +69,12 @@ fn multi_rx() {
assert!(t1.is_woken());
assert!(t2.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "two");
+ assert_ready_ok!(t1.poll());
}
+ assert_eq!(*rx1.borrow(), "two");
{
- let mut t1 = spawn(rx1.recv());
+ let mut t1 = spawn(rx1.changed());
assert_pending!(t1.poll());
@@ -84,45 +83,29 @@ fn multi_rx() {
assert!(t1.is_woken());
assert!(t2.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "three");
-
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "three");
+ assert_ready_ok!(t1.poll());
+ assert_ready_ok!(t2.poll());
}
+ assert_eq!(*rx1.borrow(), "three");
drop(t2);
+ assert_eq!(*rx2.borrow(), "three");
+
{
- let mut t1 = spawn(rx1.recv());
- let mut t2 = spawn(rx2.recv());
+ let mut t1 = spawn(rx1.changed());
+ let mut t2 = spawn(rx2.changed());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
tx.send("four").unwrap();
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "four");
- drop(t1);
-
- let mut t1 = spawn(rx1.recv());
- assert_pending!(t1.poll());
-
- drop(tx);
-
- assert!(t1.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "four");
-
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "four");
-
- drop(t2);
- let mut t2 = spawn(rx2.recv());
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "four");
+ assert_ready_ok!(t1.poll());
+ assert_ready_ok!(t2.poll());
}
+ assert_eq!(*rx1.borrow(), "four");
+ assert_eq!(*rx2.borrow(), "four");
}
#[test]
@@ -133,16 +116,10 @@ fn rx_observes_final_value() {
drop(tx);
{
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "one");
- }
-
- {
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "one");
+ let mut t1 = spawn(rx.changed());
+ assert_ready_err!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "one");
// Sending a value
@@ -151,13 +128,13 @@ fn rx_observes_final_value() {
tx.send("two").unwrap();
{
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "two");
+ let mut t1 = spawn(rx.changed());
+ assert_ready_ok!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "two");
{
- let mut t1 = spawn(rx.recv());
+ let mut t1 = spawn(rx.changed());
assert_pending!(t1.poll());
tx.send("three").unwrap();
@@ -165,20 +142,20 @@ fn rx_observes_final_value() {
assert!(t1.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "three");
+ assert_ready_ok!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "three");
{
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "three");
+ let mut t1 = spawn(rx.changed());
+ assert_ready_err!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "three");
}
#[test]
fn poll_close() {
- let (mut tx, rx) = watch::channel("one");
+ let (tx, rx) = watch::channel("one");
{
let mut t = spawn(tx.closed());
@@ -192,40 +169,3 @@ fn poll_close() {
assert!(tx.send("two").is_err());
}
-
-#[test]
-fn stream_impl() {
- use tokio::stream::StreamExt;
-
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.next());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "one");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- tx.send("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "two");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert_eq!(res, Some("two"));
- }
-}