summaryrefslogtreecommitdiffstats
path: root/tokio/tests/sync_watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/sync_watch.rs')
-rw-r--r--tokio/tests/sync_watch.rs264
1 files changed, 264 insertions, 0 deletions
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
new file mode 100644
index 00000000..4d73bc81
--- /dev/null
+++ b/tokio/tests/sync_watch.rs
@@ -0,0 +1,264 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::sync::watch;
+use tokio_test::task::spawn;
+use tokio_test::{assert_pending, assert_ready};
+
+#[test]
+fn single_rx_recv_ref() {
+ let (tx, mut rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(rx.recv_ref());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(*v, "one");
+ }
+
+ {
+ let mut t = spawn(rx.recv_ref());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(*v, "two");
+ }
+
+ {
+ let mut t = spawn(rx.recv_ref());
+
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
+ assert!(res.is_none());
+ }
+}
+
+#[test]
+fn single_rx_recv() {
+ let (tx, mut rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(rx.recv());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "one");
+ }
+
+ {
+ let mut t = spawn(rx.recv());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "two");
+ }
+
+ {
+ let mut t = spawn(rx.recv());
+
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
+ assert!(res.is_none());
+ }
+}
+
+#[test]
+fn stream_impl() {
+ use tokio::prelude::*;
+
+ let (tx, mut rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(rx.next());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "one");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "two");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
+ assert!(res.is_none());
+ }
+}
+
+#[test]
+fn multi_rx() {
+ let (tx, mut rx1) = watch::channel("one");
+ let mut rx2 = rx1.clone();
+
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+ let mut t2 = spawn(rx2.recv_ref());
+
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "one");
+
+ let res = assert_ready!(t2.poll());
+ assert_eq!(*res.unwrap(), "one");
+ }
+
+ let mut t2 = spawn(rx2.recv_ref());
+
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+
+ assert_pending!(t1.poll());
+ assert_pending!(t2.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t1.is_woken());
+ assert!(t2.is_woken());
+
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "two");
+ }
+
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+
+ assert_pending!(t1.poll());
+
+ tx.broadcast("three").unwrap();
+
+ assert!(t1.is_woken());
+ assert!(t2.is_woken());
+
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "three");
+
+ let res = assert_ready!(t2.poll());
+ assert_eq!(*res.unwrap(), "three");
+ }
+
+ drop(t2);
+
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+ let mut t2 = spawn(rx2.recv_ref());
+
+ assert_pending!(t1.poll());
+ assert_pending!(t2.poll());
+
+ tx.broadcast("four").unwrap();
+
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "four");
+ drop(t1);
+
+ let mut t1 = spawn(rx1.recv_ref());
+ assert_pending!(t1.poll());
+
+ drop(tx);
+
+ assert!(t1.is_woken());
+ let res = assert_ready!(t1.poll());
+ assert!(res.is_none());
+
+ let res = assert_ready!(t2.poll());
+ assert_eq!(*res.unwrap(), "four");
+
+ drop(t2);
+ let mut t2 = spawn(rx2.recv_ref());
+ let res = assert_ready!(t2.poll());
+ assert!(res.is_none());
+ }
+}
+
+#[test]
+fn rx_observes_final_value() {
+ // Initial value
+
+ let (tx, mut rx) = watch::channel("one");
+ drop(tx);
+
+ {
+ let mut t1 = spawn(rx.recv_ref());
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "one");
+ }
+
+ {
+ let mut t1 = spawn(rx.recv_ref());
+ let res = assert_ready!(t1.poll());
+ assert!(res.is_none());
+ }
+
+ // Sending a value
+
+ let (tx, mut rx) = watch::channel("one");
+
+ tx.broadcast("two").unwrap();
+
+ {
+ let mut t1 = spawn(rx.recv_ref());
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "two");
+ }
+
+ {
+ let mut t1 = spawn(rx.recv_ref());
+ assert_pending!(t1.poll());
+
+ tx.broadcast("three").unwrap();
+ drop(tx);
+
+ assert!(t1.is_woken());
+
+ let res = assert_ready!(t1.poll());
+ assert_eq!(*res.unwrap(), "three");
+ }
+
+ {
+ let mut t1 = spawn(rx.recv_ref());
+ let res = assert_ready!(t1.poll());
+ assert!(res.is_none());
+ }
+}
+
+#[test]
+fn poll_close() {
+ let (mut tx, rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(tx.closed());
+ assert_pending!(t.poll());
+
+ drop(rx);
+
+ assert!(t.is_woken());
+ assert_ready!(t.poll());
+ }
+
+ assert!(tx.broadcast("two").is_err());
+}