summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-11 15:14:45 -0700
committerGitHub <noreply@github.com>2020-09-11 15:14:45 -0700
commit2bc9a4815259c8ff4daa5e24f128ec826970d17f (patch)
treec075e4d97a145ce104cfc8ee39d8d06acece5c13 /tokio/tests
parentc5a9ede157691ac5ca15283735bd666c6b016188 (diff)
sync: tweak `watch` API (#2814)
Decouples getting the latest `watch` value from receiving the change notification. The `Receiver` async method becomes `Receiver::changed()`. The latest value is obtained from `Receiver::borrow()`. The implementation is updated to use `Notify`. This requires adding `Notify::notify_waiters`. This method is generally useful but is kept private for now.
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/async_send_sync.rs6
-rw-r--r--tokio/tests/sync_watch.rs154
2 files changed, 49 insertions, 111 deletions
diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs
index f1eed0e4..e7011e3b 100644
--- a/tokio/tests/async_send_sync.rs
+++ b/tokio/tests/async_send_sync.rs
@@ -205,7 +205,7 @@ async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock(_): !Send & !Sync);
async_assert_fn!(tokio::sync::Mutex<u8>::lock_owned(_): Send & Sync);
async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock_owned(_): Send & Sync);
async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock_owned(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync);
+async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync);
async_assert_fn!(tokio::sync::RwLock<u8>::read(_): Send & Sync);
async_assert_fn!(tokio::sync::RwLock<u8>::write(_): Send & Sync);
async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::read(_): !Send & !Sync);
@@ -229,9 +229,7 @@ async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<u8>::recv(_): Send & Sync)
async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Cell<u8>>::recv(_): Send & Sync);
async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Rc<u8>>::recv(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::watch::Receiver<u8>::recv(_): Send & Sync);
-async_assert_fn!(tokio::sync::watch::Receiver<Cell<u8>>::recv(_): !Send & !Sync);
-async_assert_fn!(tokio::sync::watch::Receiver<Rc<u8>>::recv(_): !Send & !Sync);
+async_assert_fn!(tokio::sync::watch::Receiver<u8>::changed(_): Send & Sync);
async_assert_fn!(tokio::sync::watch::Sender<u8>::closed(_): Send & Sync);
async_assert_fn!(tokio::sync::watch::Sender<Cell<u8>>::closed(_): !Send & !Sync);
async_assert_fn!(tokio::sync::watch::Sender<Rc<u8>>::closed(_): !Send & !Sync);
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
index 5d550443..9dcb0c53 100644
--- a/tokio/tests/sync_watch.rs
+++ b/tokio/tests/sync_watch.rs
@@ -4,41 +4,41 @@
use tokio::sync::watch;
use tokio_test::task::spawn;
-use tokio_test::{assert_pending, assert_ready};
+use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
#[test]
fn single_rx_recv() {
let (tx, mut rx) = watch::channel("one");
{
- let mut t = spawn(rx.recv());
- let v = assert_ready!(t.poll());
- assert_eq!(v, "one");
+ // Not initially notified
+ let mut t = spawn(rx.changed());
+ assert_pending!(t.poll());
}
+ assert_eq!(*rx.borrow(), "one");
{
- let mut t = spawn(rx.recv());
-
+ let mut t = spawn(rx.changed());
assert_pending!(t.poll());
tx.send("two").unwrap();
assert!(t.is_woken());
- let v = assert_ready!(t.poll());
- assert_eq!(v, "two");
+ assert_ready_ok!(t.poll());
}
+ assert_eq!(*rx.borrow(), "two");
{
- let mut t = spawn(rx.recv());
-
+ let mut t = spawn(rx.changed());
assert_pending!(t.poll());
drop(tx);
- let res = assert_ready!(t.poll());
- assert_eq!(res, "two");
+ assert!(t.is_woken());
+ assert_ready_err!(t.poll());
}
+ assert_eq!(*rx.borrow(), "two");
}
#[test]
@@ -47,20 +47,19 @@ fn multi_rx() {
let mut rx2 = rx1.clone();
{
- let mut t1 = spawn(rx1.recv());
- let mut t2 = spawn(rx2.recv());
-
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "one");
+ let mut t1 = spawn(rx1.changed());
+ let mut t2 = spawn(rx2.changed());
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "one");
+ assert_pending!(t1.poll());
+ assert_pending!(t2.poll());
}
+ assert_eq!(*rx1.borrow(), "one");
+ assert_eq!(*rx2.borrow(), "one");
- let mut t2 = spawn(rx2.recv());
+ let mut t2 = spawn(rx2.changed());
{
- let mut t1 = spawn(rx1.recv());
+ let mut t1 = spawn(rx1.changed());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -70,12 +69,12 @@ fn multi_rx() {
assert!(t1.is_woken());
assert!(t2.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "two");
+ assert_ready_ok!(t1.poll());
}
+ assert_eq!(*rx1.borrow(), "two");
{
- let mut t1 = spawn(rx1.recv());
+ let mut t1 = spawn(rx1.changed());
assert_pending!(t1.poll());
@@ -84,45 +83,29 @@ fn multi_rx() {
assert!(t1.is_woken());
assert!(t2.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "three");
-
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "three");
+ assert_ready_ok!(t1.poll());
+ assert_ready_ok!(t2.poll());
}
+ assert_eq!(*rx1.borrow(), "three");
drop(t2);
+ assert_eq!(*rx2.borrow(), "three");
+
{
- let mut t1 = spawn(rx1.recv());
- let mut t2 = spawn(rx2.recv());
+ let mut t1 = spawn(rx1.changed());
+ let mut t2 = spawn(rx2.changed());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
tx.send("four").unwrap();
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "four");
- drop(t1);
-
- let mut t1 = spawn(rx1.recv());
- assert_pending!(t1.poll());
-
- drop(tx);
-
- assert!(t1.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "four");
-
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "four");
-
- drop(t2);
- let mut t2 = spawn(rx2.recv());
- let res = assert_ready!(t2.poll());
- assert_eq!(res, "four");
+ assert_ready_ok!(t1.poll());
+ assert_ready_ok!(t2.poll());
}
+ assert_eq!(*rx1.borrow(), "four");
+ assert_eq!(*rx2.borrow(), "four");
}
#[test]
@@ -133,16 +116,10 @@ fn rx_observes_final_value() {
drop(tx);
{
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "one");
- }
-
- {
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "one");
+ let mut t1 = spawn(rx.changed());
+ assert_ready_err!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "one");
// Sending a value
@@ -151,13 +128,13 @@ fn rx_observes_final_value() {
tx.send("two").unwrap();
{
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "two");
+ let mut t1 = spawn(rx.changed());
+ assert_ready_ok!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "two");
{
- let mut t1 = spawn(rx.recv());
+ let mut t1 = spawn(rx.changed());
assert_pending!(t1.poll());
tx.send("three").unwrap();
@@ -165,20 +142,20 @@ fn rx_observes_final_value() {
assert!(t1.is_woken());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "three");
+ assert_ready_ok!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "three");
{
- let mut t1 = spawn(rx.recv());
- let res = assert_ready!(t1.poll());
- assert_eq!(res, "three");
+ let mut t1 = spawn(rx.changed());
+ assert_ready_err!(t1.poll());
}
+ assert_eq!(*rx.borrow(), "three");
}
#[test]
fn poll_close() {
- let (mut tx, rx) = watch::channel("one");
+ let (tx, rx) = watch::channel("one");
{
let mut t = spawn(tx.closed());
@@ -192,40 +169,3 @@ fn poll_close() {
assert!(tx.send("two").is_err());
}
-
-#[test]
-fn stream_impl() {
- use tokio::stream::StreamExt;
-
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.next());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "one");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- tx.send("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "two");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert_eq!(res, Some("two"));
- }
-}